Browse Source

Inlined the session creation logic to hold the lock continuously

pull/7329/head
chrislu 1 week ago
parent
commit
bc7e015a41
  1. 70
      weed/mq/kafka/integration/broker_client_subscribe.go

70
weed/mq/kafka/integration/broker_client_subscribe.go

@ -371,14 +371,78 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
delete(bc.subscribers, key)
glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key)
}
bc.subscribersLock.Unlock()
// CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session
// to prevent other threads from interfering. We'll create the session inline.
// bc.subscribersLock.Unlock() - REMOVED to fix race condition
// Create a completely fresh subscriber at the requested offset
newSession, err := bc.GetOrCreateSubscriber(topic, partition, requestedOffset, consumerGroup, consumerID)
// INLINE SESSION CREATION to hold the lock continuously
subscriberCtx := context.Background()
subscriberCancel := func() {} // No-op cancel
stream, err := bc.client.SubscribeMessage(subscriberCtx)
if err != nil {
return nil, fmt.Errorf("failed to create fresh subscriber at offset %d: %w", requestedOffset, err)
bc.subscribersLock.Unlock()
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Get the actual partition assignment from the broker
actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
if err != nil {
bc.subscribersLock.Unlock()
_ = stream.CloseSend()
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
}
// Use EXACT_OFFSET to position subscriber at the exact Kafka offset
offsetType := schema_pb.OffsetType_EXACT_OFFSET
startTimestamp := int64(0)
startOffsetValue := requestedOffset
glog.V(1).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d",
topic, partition, requestedOffset)
// Send init message using the actual partition structure
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: consumerGroup,
ConsumerId: consumerID,
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: actualPartition,
StartTsNs: startTimestamp,
StartOffset: startOffsetValue,
},
OffsetType: offsetType,
SlidingWindowSize: 10,
},
},
}); err != nil {
bc.subscribersLock.Unlock()
_ = stream.CloseSend()
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
newSession := &BrokerSubscriberSession{
Topic: topic,
Partition: partition,
Stream: stream,
StartOffset: requestedOffset,
ConsumerGroup: consumerGroup,
ConsumerID: consumerID,
Ctx: subscriberCtx,
Cancel: subscriberCancel,
}
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Read from fresh subscriber
return bc.ReadRecords(ctx, newSession, maxRecords)
}

Loading…
Cancel
Save