diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index d1cdefb5d..35bfdacdb 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -371,14 +371,78 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok delete(bc.subscribers, key) glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key) } - bc.subscribersLock.Unlock() + // CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session + // to prevent other threads from interfering. We'll create the session inline. + // bc.subscribersLock.Unlock() - REMOVED to fix race condition // Create a completely fresh subscriber at the requested offset - newSession, err := bc.GetOrCreateSubscriber(topic, partition, requestedOffset, consumerGroup, consumerID) + // INLINE SESSION CREATION to hold the lock continuously + subscriberCtx := context.Background() + subscriberCancel := func() {} // No-op cancel + + stream, err := bc.client.SubscribeMessage(subscriberCtx) if err != nil { - return nil, fmt.Errorf("failed to create fresh subscriber at offset %d: %w", requestedOffset, err) + bc.subscribersLock.Unlock() + return nil, fmt.Errorf("failed to create subscribe stream: %v", err) } + // Get the actual partition assignment from the broker + actualPartition, err := bc.getActualPartitionAssignment(topic, partition) + if err != nil { + bc.subscribersLock.Unlock() + _ = stream.CloseSend() + return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) + } + + // Use EXACT_OFFSET to position subscriber at the exact Kafka offset + offsetType := schema_pb.OffsetType_EXACT_OFFSET + startTimestamp := int64(0) + startOffsetValue := requestedOffset + + glog.V(1).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", + topic, partition, requestedOffset) + + // Send init message using the actual partition structure + if err := stream.Send(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: consumerGroup, + ConsumerId: consumerID, + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: actualPartition, + StartTsNs: startTimestamp, + StartOffset: startOffsetValue, + }, + OffsetType: offsetType, + SlidingWindowSize: 10, + }, + }, + }); err != nil { + bc.subscribersLock.Unlock() + _ = stream.CloseSend() + return nil, fmt.Errorf("failed to send subscribe init: %v", err) + } + + newSession := &BrokerSubscriberSession{ + Topic: topic, + Partition: partition, + Stream: stream, + StartOffset: requestedOffset, + ConsumerGroup: consumerGroup, + ConsumerID: consumerID, + Ctx: subscriberCtx, + Cancel: subscriberCancel, + } + + bc.subscribers[key] = newSession + bc.subscribersLock.Unlock() + glog.V(1).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) + // Read from fresh subscriber return bc.ReadRecords(ctx, newSession, maxRecords) }