You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1232 lines
						
					
					
						
							44 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							1232 lines
						
					
					
						
							44 KiB
						
					
					
				| package integration | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"fmt" | |
| 	"io" | |
| 	"time" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | |
| ) | |
| 
 | |
| // createSubscribeInitMessage creates a subscribe init message with the given parameters | |
| func createSubscribeInitMessage(topic string, actualPartition *schema_pb.Partition, startOffset int64, offsetType schema_pb.OffsetType, consumerGroup string, consumerID string) *mq_pb.SubscribeMessageRequest { | |
| 	return &mq_pb.SubscribeMessageRequest{ | |
| 		Message: &mq_pb.SubscribeMessageRequest_Init{ | |
| 			Init: &mq_pb.SubscribeMessageRequest_InitMessage{ | |
| 				ConsumerGroup: consumerGroup, | |
| 				ConsumerId:    consumerID, | |
| 				ClientId:      "kafka-gateway", | |
| 				Topic: &schema_pb.Topic{ | |
| 					Namespace: "kafka", | |
| 					Name:      topic, | |
| 				}, | |
| 				PartitionOffset: &schema_pb.PartitionOffset{ | |
| 					Partition:   actualPartition, | |
| 					StartTsNs:   0, | |
| 					StartOffset: startOffset, | |
| 				}, | |
| 				OffsetType:        offsetType, | |
| 				SlidingWindowSize: 10, | |
| 			}, | |
| 		}, | |
| 	} | |
| } | |
| 
 | |
| // CreateFreshSubscriber creates a new subscriber session without caching | |
| // This ensures each fetch gets fresh data from the requested offset | |
| // consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ | |
| func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { | |
| 	// Use BrokerClient's context so subscriber is cancelled when connection closes | |
| 	subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) | |
| 
 | |
| 	stream, err := bc.client.SubscribeMessage(subscriberCtx) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to create subscribe stream: %v", err) | |
| 	} | |
| 
 | |
| 	// Get the actual partition assignment from the broker | |
| 	actualPartition, err := bc.getActualPartitionAssignment(topic, partition) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) | |
| 	} | |
| 
 | |
| 	// Use EXACT_OFFSET to read from the specific offset | |
| 	offsetType := schema_pb.OffsetType_EXACT_OFFSET | |
| 
 | |
| 	// Send init message to start subscription with Kafka client's consumer group and ID | |
| 	initReq := createSubscribeInitMessage(topic, actualPartition, startOffset, offsetType, consumerGroup, consumerID) | |
| 
 | |
| 	glog.V(4).Infof("[SUBSCRIBE-INIT] CreateFreshSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", | |
| 		topic, partition, startOffset, offsetType, consumerGroup, consumerID) | |
| 
 | |
| 	if err := stream.Send(initReq); err != nil { | |
| 		return nil, fmt.Errorf("failed to send subscribe init: %v", err) | |
| 	} | |
| 
 | |
| 	// IMPORTANT: Don't wait for init response here! | |
| 	// The broker may send the first data record as the "init response" | |
| 	// If we call Recv() here, we'll consume that first record and ReadRecords will block | |
| 	// waiting for the second record, causing a 30-second timeout. | |
| 	// Instead, let ReadRecords handle all Recv() calls. | |
|  | |
| 	session := &BrokerSubscriberSession{ | |
| 		Stream:        stream, | |
| 		Topic:         topic, | |
| 		Partition:     partition, | |
| 		StartOffset:   startOffset, | |
| 		ConsumerGroup: consumerGroup, | |
| 		ConsumerID:    consumerID, | |
| 		Ctx:           subscriberCtx, | |
| 		Cancel:        subscriberCancel, | |
| 	} | |
| 
 | |
| 	return session, nil | |
| } | |
| 
 | |
| // GetOrCreateSubscriber gets or creates a subscriber for offset tracking | |
| func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { | |
| 	// Create a temporary session to generate the key | |
| 	tempSession := &BrokerSubscriberSession{ | |
| 		Topic:         topic, | |
| 		Partition:     partition, | |
| 		ConsumerGroup: consumerGroup, | |
| 		ConsumerID:    consumerID, | |
| 	} | |
| 	key := tempSession.Key() | |
| 
 | |
| 	bc.subscribersLock.RLock() | |
| 	if session, exists := bc.subscribers[key]; exists { | |
| 		// Check if we can reuse the existing session | |
| 		session.mu.Lock() | |
| 		currentOffset := session.StartOffset | |
| 
 | |
| 		// Check cache to see what offsets are available | |
| 		canUseCache := false | |
| 		if len(session.consumedRecords) > 0 { | |
| 			cacheStartOffset := session.consumedRecords[0].Offset | |
| 			cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset | |
| 			if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset { | |
| 				canUseCache = true | |
| 			} | |
| 		} | |
| 		session.mu.Unlock() | |
| 
 | |
| 		// With seekable broker: Always reuse existing session | |
| 		// Any offset mismatch will be handled by FetchRecords via SeekMessage | |
| 		// This includes: | |
| 		// 1. Forward read: Natural continuation | |
| 		// 2. Backward read with cache hit: Serve from cache | |
| 		// 3. Backward read without cache: Send seek message to broker | |
| 		// No need for stream recreation - broker repositions internally | |
|  | |
| 		bc.subscribersLock.RUnlock() | |
| 
 | |
| 		if canUseCache { | |
| 			glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (cached)", | |
| 				key, currentOffset, startOffset) | |
| 		} else if startOffset >= currentOffset { | |
| 			glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (forward read)", | |
| 				key, currentOffset, startOffset) | |
| 		} else { | |
| 			glog.V(4).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will seek backward)", | |
| 				key, currentOffset, startOffset) | |
| 		} | |
| 		return session, nil | |
| 	} | |
| 
 | |
| 	// Session doesn't exist - need to create one | |
| 	bc.subscribersLock.RUnlock() | |
| 
 | |
| 	// Create new subscriber stream | |
| 	// Need to acquire write lock since we don't have it from the paths above | |
| 	bc.subscribersLock.Lock() | |
| 	defer bc.subscribersLock.Unlock() | |
| 
 | |
| 	// Double-check if session was created by another thread while we were acquiring the lock | |
| 	if session, exists := bc.subscribers[key]; exists { | |
| 		// With seekable broker, always reuse existing session | |
| 		// FetchRecords will handle any offset mismatch via seek | |
| 		session.mu.Lock() | |
| 		existingOffset := session.StartOffset | |
| 		session.mu.Unlock() | |
| 
 | |
| 		glog.V(3).Infof("[FETCH] Session created concurrently at offset %d (requested %d), reusing", existingOffset, startOffset) | |
| 		return session, nil | |
| 	} | |
| 
 | |
| 	// Use BrokerClient's context so subscribers are automatically cancelled when connection closes | |
| 	// This ensures proper cleanup without artificial timeouts | |
| 	subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) | |
| 
 | |
| 	stream, err := bc.client.SubscribeMessage(subscriberCtx) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to create subscribe stream: %v", err) | |
| 	} | |
| 
 | |
| 	// Get the actual partition assignment from the broker instead of using Kafka partition mapping | |
| 	actualPartition, err := bc.getActualPartitionAssignment(topic, partition) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) | |
| 	} | |
| 
 | |
| 	// Convert Kafka offset to appropriate SeaweedMQ OffsetType | |
| 	var offsetType schema_pb.OffsetType | |
| 	var offsetValue int64 | |
| 
 | |
| 	if startOffset == -1 { | |
| 		// Kafka offset -1 typically means "latest" | |
| 		offsetType = schema_pb.OffsetType_RESET_TO_LATEST | |
| 		offsetValue = 0 // Not used with RESET_TO_LATEST | |
| 		glog.V(2).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") | |
| 	} else { | |
| 		// CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset | |
| 		// This allows the subscriber to read from both buffer and disk at the correct position | |
| 		offsetType = schema_pb.OffsetType_EXACT_OFFSET | |
| 		offsetValue = startOffset // Use the exact Kafka offset | |
| 		glog.V(2).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) | |
| 	} | |
| 
 | |
| 	glog.V(2).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s", | |
| 		topic, partition, startOffset, offsetType) | |
| 
 | |
| 	glog.V(4).Infof("[SUBSCRIBE-INIT] GetOrCreateSubscriber sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", | |
| 		topic, partition, offsetValue, offsetType, consumerGroup, consumerID) | |
| 
 | |
| 	// Send init message using the actual partition structure that the broker allocated | |
| 	initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID) | |
| 	if err := stream.Send(initReq); err != nil { | |
| 		return nil, fmt.Errorf("failed to send subscribe init: %v", err) | |
| 	} | |
| 
 | |
| 	session := &BrokerSubscriberSession{ | |
| 		Topic:         topic, | |
| 		Partition:     partition, | |
| 		Stream:        stream, | |
| 		StartOffset:   startOffset, | |
| 		ConsumerGroup: consumerGroup, | |
| 		ConsumerID:    consumerID, | |
| 		Ctx:           subscriberCtx, | |
| 		Cancel:        subscriberCancel, | |
| 	} | |
| 
 | |
| 	bc.subscribers[key] = session | |
| 	glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key) | |
| 	return session, nil | |
| } | |
| 
 | |
| // createTemporarySubscriber creates a fresh subscriber for a single fetch operation | |
| // This is used by the stateless fetch approach to eliminate concurrent access issues | |
| // The subscriber is NOT stored in bc.subscribers and must be cleaned up by the caller | |
| func (bc *BrokerClient) createTemporarySubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { | |
| 	glog.V(4).Infof("[STATELESS] Creating temporary subscriber for %s-%d at offset %d", topic, partition, startOffset) | |
| 
 | |
| 	// Create context for this temporary subscriber | |
| 	ctx, cancel := context.WithCancel(bc.ctx) | |
| 
 | |
| 	// Create gRPC stream | |
| 	stream, err := bc.client.SubscribeMessage(ctx) | |
| 	if err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to create subscribe stream: %v", err) | |
| 	} | |
| 
 | |
| 	// Get the actual partition assignment from the broker | |
| 	actualPartition, err := bc.getActualPartitionAssignment(topic, partition) | |
| 	if err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) | |
| 	} | |
| 
 | |
| 	// Convert Kafka offset to appropriate SeaweedMQ OffsetType | |
| 	var offsetType schema_pb.OffsetType | |
| 	var offsetValue int64 | |
| 
 | |
| 	if startOffset == -1 { | |
| 		offsetType = schema_pb.OffsetType_RESET_TO_LATEST | |
| 		offsetValue = 0 | |
| 		glog.V(4).Infof("[STATELESS] Using RESET_TO_LATEST for Kafka offset -1") | |
| 	} else { | |
| 		offsetType = schema_pb.OffsetType_EXACT_OFFSET | |
| 		offsetValue = startOffset | |
| 		glog.V(4).Infof("[STATELESS] Using EXACT_OFFSET for Kafka offset %d", startOffset) | |
| 	} | |
| 
 | |
| 	// Send init message | |
| 	initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID) | |
| 	if err := stream.Send(initReq); err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to send subscribe init: %v", err) | |
| 	} | |
| 
 | |
| 	// Create temporary session (not stored in bc.subscribers) | |
| 	session := &BrokerSubscriberSession{ | |
| 		Topic:         topic, | |
| 		Partition:     partition, | |
| 		Stream:        stream, | |
| 		StartOffset:   startOffset, | |
| 		ConsumerGroup: consumerGroup, | |
| 		ConsumerID:    consumerID, | |
| 		Ctx:           ctx, | |
| 		Cancel:        cancel, | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[STATELESS] Created temporary subscriber for %s-%d starting at offset %d", topic, partition, startOffset) | |
| 	return session, nil | |
| } | |
| 
 | |
| // createSubscriberSession creates a new subscriber session with proper initialization | |
| // This is used by the hybrid approach for initial connections and backward seeks | |
| func (bc *BrokerClient) createSubscriberSession(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { | |
| 	glog.V(4).Infof("[HYBRID-SESSION] Creating subscriber session for %s-%d at offset %d", topic, partition, startOffset) | |
| 
 | |
| 	// Create context for this subscriber | |
| 	ctx, cancel := context.WithCancel(bc.ctx) | |
| 
 | |
| 	// Create gRPC stream | |
| 	stream, err := bc.client.SubscribeMessage(ctx) | |
| 	if err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to create subscribe stream: %v", err) | |
| 	} | |
| 
 | |
| 	// Get the actual partition assignment from the broker | |
| 	actualPartition, err := bc.getActualPartitionAssignment(topic, partition) | |
| 	if err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) | |
| 	} | |
| 
 | |
| 	// Convert Kafka offset to appropriate SeaweedMQ OffsetType | |
| 	var offsetType schema_pb.OffsetType | |
| 	var offsetValue int64 | |
| 
 | |
| 	if startOffset == -1 { | |
| 		offsetType = schema_pb.OffsetType_RESET_TO_LATEST | |
| 		offsetValue = 0 | |
| 		glog.V(4).Infof("[HYBRID-SESSION] Using RESET_TO_LATEST for Kafka offset -1") | |
| 	} else { | |
| 		offsetType = schema_pb.OffsetType_EXACT_OFFSET | |
| 		offsetValue = startOffset | |
| 		glog.V(4).Infof("[HYBRID-SESSION] Using EXACT_OFFSET for Kafka offset %d", startOffset) | |
| 	} | |
| 
 | |
| 	// Send init message | |
| 	initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID) | |
| 	if err := stream.Send(initReq); err != nil { | |
| 		cancel() | |
| 		return nil, fmt.Errorf("failed to send subscribe init: %v", err) | |
| 	} | |
| 
 | |
| 	// Create session with proper initialization | |
| 	session := &BrokerSubscriberSession{ | |
| 		Topic:            topic, | |
| 		Partition:        partition, | |
| 		Stream:           stream, | |
| 		StartOffset:      startOffset, | |
| 		ConsumerGroup:    consumerGroup, | |
| 		ConsumerID:       consumerID, | |
| 		Ctx:              ctx, | |
| 		Cancel:           cancel, | |
| 		consumedRecords:  nil, | |
| 		nextOffsetToRead: startOffset, | |
| 		lastReadOffset:   startOffset - 1, // Will be updated after first read | |
| 		initialized:      false, | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[HYBRID-SESSION] Created subscriber session for %s-%d starting at offset %d", topic, partition, startOffset) | |
| 	return session, nil | |
| } | |
| 
 | |
| // serveFromCache serves records from the session's cache | |
| func (bc *BrokerClient) serveFromCache(session *BrokerSubscriberSession, requestedOffset int64, maxRecords int) []*SeaweedRecord { | |
| 	// Find the start index in cache | |
| 	startIdx := -1 | |
| 	for i, record := range session.consumedRecords { | |
| 		if record.Offset == requestedOffset { | |
| 			startIdx = i | |
| 			break | |
| 		} | |
| 	} | |
| 
 | |
| 	if startIdx == -1 { | |
| 		// Offset not found in cache (shouldn't happen if caller checked properly) | |
| 		return nil | |
| 	} | |
| 
 | |
| 	// Calculate end index | |
| 	endIdx := startIdx + maxRecords | |
| 	if endIdx > len(session.consumedRecords) { | |
| 		endIdx = len(session.consumedRecords) | |
| 	} | |
| 
 | |
| 	// Return slice from cache | |
| 	result := session.consumedRecords[startIdx:endIdx] | |
| 	glog.V(4).Infof("[HYBRID-CACHE] Served %d records from cache (requested %d, offset %d)", | |
| 		len(result), maxRecords, requestedOffset) | |
| 	return result | |
| } | |
| 
 | |
| // readRecordsFromSession reads records from the session's stream | |
| func (bc *BrokerClient) readRecordsFromSession(ctx context.Context, session *BrokerSubscriberSession, startOffset int64, maxRecords int) ([]*SeaweedRecord, error) { | |
| 	glog.V(4).Infof("[HYBRID-READ] Reading from stream: offset=%d maxRecords=%d", startOffset, maxRecords) | |
| 
 | |
| 	records := make([]*SeaweedRecord, 0, maxRecords) | |
| 	currentOffset := startOffset | |
| 
 | |
| 	// Read until we have enough records or timeout | |
| 	for len(records) < maxRecords { | |
| 		// Check context timeout | |
| 		select { | |
| 		case <-ctx.Done(): | |
| 			// Timeout or cancellation - return what we have | |
| 			glog.V(4).Infof("[HYBRID-READ] Context done, returning %d records", len(records)) | |
| 			return records, nil | |
| 		default: | |
| 		} | |
| 
 | |
| 		// Read from stream with timeout | |
| 		resp, err := session.Stream.Recv() | |
| 		if err != nil { | |
| 			if err == io.EOF { | |
| 				glog.V(4).Infof("[HYBRID-READ] Stream closed (EOF), returning %d records", len(records)) | |
| 				return records, nil | |
| 			} | |
| 			return nil, fmt.Errorf("failed to receive from stream: %v", err) | |
| 		} | |
| 
 | |
| 		// Handle data message | |
| 		if dataMsg := resp.GetData(); dataMsg != nil { | |
| 			record := &SeaweedRecord{ | |
| 				Key:       dataMsg.Key, | |
| 				Value:     dataMsg.Value, | |
| 				Timestamp: dataMsg.TsNs, | |
| 				Offset:    currentOffset, | |
| 			} | |
| 			records = append(records, record) | |
| 			currentOffset++ | |
| 
 | |
| 			// Auto-acknowledge to prevent throttling | |
| 			ackReq := &mq_pb.SubscribeMessageRequest{ | |
| 				Message: &mq_pb.SubscribeMessageRequest_Ack{ | |
| 					Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ | |
| 						Key:  dataMsg.Key, | |
| 						TsNs: dataMsg.TsNs, | |
| 					}, | |
| 				}, | |
| 			} | |
| 			if err := session.Stream.Send(ackReq); err != nil { | |
| 				if err != io.EOF { | |
| 					glog.Warningf("[HYBRID-READ] Failed to send ack (non-critical): %v", err) | |
| 				} | |
| 			} | |
| 		} | |
| 
 | |
| 		// Handle control messages | |
| 		if ctrlMsg := resp.GetCtrl(); ctrlMsg != nil { | |
| 			if ctrlMsg.Error != "" { | |
| 				// Error message from broker | |
| 				return nil, fmt.Errorf("broker error: %s", ctrlMsg.Error) | |
| 			} | |
| 			if ctrlMsg.IsEndOfStream { | |
| 				glog.V(4).Infof("[HYBRID-READ] End of stream, returning %d records", len(records)) | |
| 				return records, nil | |
| 			} | |
| 			if ctrlMsg.IsEndOfTopic { | |
| 				glog.V(4).Infof("[HYBRID-READ] End of topic, returning %d records", len(records)) | |
| 				return records, nil | |
| 			} | |
| 			// Empty control message (e.g., seek ack) - continue reading | |
| 			glog.V(4).Infof("[HYBRID-READ] Received control message (seek ack?), continuing") | |
| 			continue | |
| 		} | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[HYBRID-READ] Read %d records successfully", len(records)) | |
| 
 | |
| 	// Update cache | |
| 	session.consumedRecords = append(session.consumedRecords, records...) | |
| 	// Limit cache size to prevent unbounded growth | |
| 	const maxCacheSize = 10000 | |
| 	if len(session.consumedRecords) > maxCacheSize { | |
| 		// Keep only the most recent records | |
| 		session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-maxCacheSize:] | |
| 	} | |
| 
 | |
| 	return records, nil | |
| } | |
| 
 | |
| // FetchRecordsHybrid uses a hybrid approach: session reuse + proper offset tracking | |
| // - Fast path (95%): Reuse session for sequential reads | |
| // - Slow path (5%): Create new subscriber for backward seeks | |
| // This combines performance (connection reuse) with correctness (proper tracking) | |
| func (bc *BrokerClient) FetchRecordsHybrid(ctx context.Context, topic string, partition int32, requestedOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { | |
| 	glog.V(4).Infof("[FETCH-HYBRID] topic=%s partition=%d requestedOffset=%d maxRecords=%d", | |
| 		topic, partition, requestedOffset, maxRecords) | |
| 
 | |
| 	// Get or create session for this (topic, partition, consumerGroup, consumerID) | |
| 	key := fmt.Sprintf("%s-%d-%s-%s", topic, partition, consumerGroup, consumerID) | |
| 
 | |
| 	bc.subscribersLock.Lock() | |
| 	session, exists := bc.subscribers[key] | |
| 	if !exists { | |
| 		// No session - create one (this is initial fetch) | |
| 		glog.V(4).Infof("[FETCH-HYBRID] Creating initial session for %s at offset %d", key, requestedOffset) | |
| 		newSession, err := bc.createSubscriberSession(topic, partition, requestedOffset, consumerGroup, consumerID) | |
| 		if err != nil { | |
| 			bc.subscribersLock.Unlock() | |
| 			return nil, fmt.Errorf("failed to create initial session: %v", err) | |
| 		} | |
| 		bc.subscribers[key] = newSession | |
| 		session = newSession | |
| 	} | |
| 	bc.subscribersLock.Unlock() | |
| 
 | |
| 	// CRITICAL: Lock the session for the entire operation to serialize requests | |
| 	// This prevents concurrent access to the same stream | |
| 	session.mu.Lock() | |
| 	defer session.mu.Unlock() | |
| 
 | |
| 	// Check if we can serve from cache | |
| 	if len(session.consumedRecords) > 0 { | |
| 		cacheStart := session.consumedRecords[0].Offset | |
| 		cacheEnd := session.consumedRecords[len(session.consumedRecords)-1].Offset | |
| 
 | |
| 		if requestedOffset >= cacheStart && requestedOffset <= cacheEnd { | |
| 			// Serve from cache | |
| 			glog.V(4).Infof("[FETCH-HYBRID] FAST: Serving from cache for %s offset %d (cache: %d-%d)", | |
| 				key, requestedOffset, cacheStart, cacheEnd) | |
| 			return bc.serveFromCache(session, requestedOffset, maxRecords), nil | |
| 		} | |
| 	} | |
| 
 | |
| 	// Determine stream position | |
| 	// lastReadOffset tracks what we've actually read from the stream | |
| 	streamPosition := session.lastReadOffset + 1 | |
| 	if !session.initialized { | |
| 		streamPosition = session.StartOffset | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[FETCH-HYBRID] requestedOffset=%d streamPosition=%d lastReadOffset=%d", | |
| 		requestedOffset, streamPosition, session.lastReadOffset) | |
| 
 | |
| 	// Decision: Fast path or slow path? | |
| 	if requestedOffset < streamPosition { | |
| 		// SLOW PATH: Backward seek - need new subscriber | |
| 		glog.V(4).Infof("[FETCH-HYBRID] SLOW: Backward seek from %d to %d, creating new subscriber", | |
| 			streamPosition, requestedOffset) | |
| 
 | |
| 		// Close old session | |
| 		if session.Stream != nil { | |
| 			session.Stream.CloseSend() | |
| 		} | |
| 		if session.Cancel != nil { | |
| 			session.Cancel() | |
| 		} | |
| 
 | |
| 		// Create new subscriber at requested offset | |
| 		newSession, err := bc.createSubscriberSession(topic, partition, requestedOffset, consumerGroup, consumerID) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("failed to create subscriber for backward seek: %v", err) | |
| 		} | |
| 
 | |
| 		// Replace session in map | |
| 		bc.subscribersLock.Lock() | |
| 		bc.subscribers[key] = newSession | |
| 		bc.subscribersLock.Unlock() | |
| 
 | |
| 		// Update local reference and lock the new session | |
| 		session.Stream = newSession.Stream | |
| 		session.Ctx = newSession.Ctx | |
| 		session.Cancel = newSession.Cancel | |
| 		session.StartOffset = requestedOffset | |
| 		session.lastReadOffset = requestedOffset - 1 // Will be updated after read | |
| 		session.initialized = false | |
| 		session.consumedRecords = nil | |
| 
 | |
| 		streamPosition = requestedOffset | |
| 	} else if requestedOffset > streamPosition { | |
| 		// FAST PATH: Forward seek - use server-side seek | |
| 		seekOffset := requestedOffset | |
| 		glog.V(4).Infof("[FETCH-HYBRID] FAST: Forward seek from %d to %d using server-side seek", | |
| 			streamPosition, seekOffset) | |
| 
 | |
| 		// Send seek message to broker | |
| 		seekReq := &mq_pb.SubscribeMessageRequest{ | |
| 			Message: &mq_pb.SubscribeMessageRequest_Seek{ | |
| 				Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ | |
| 					Offset:     seekOffset, | |
| 					OffsetType: schema_pb.OffsetType_EXACT_OFFSET, | |
| 				}, | |
| 			}, | |
| 		} | |
| 
 | |
| 		if err := session.Stream.Send(seekReq); err != nil { | |
| 			if err == io.EOF { | |
| 				glog.V(4).Infof("[FETCH-HYBRID] Stream closed during seek, ignoring") | |
| 				return nil, nil | |
| 			} | |
| 			return nil, fmt.Errorf("failed to send seek request: %v", err) | |
| 		} | |
| 
 | |
| 		glog.V(4).Infof("[FETCH-HYBRID] Seek request sent, broker will reposition stream to offset %d", seekOffset) | |
| 		// NOTE: Don't wait for ack - the broker will restart Subscribe loop and send data | |
| 		// The ack will be handled inline with data messages in readRecordsFromSession | |
|  | |
| 		// Clear cache since we've skipped ahead | |
| 		session.consumedRecords = nil | |
| 		streamPosition = seekOffset | |
| 	} else { | |
| 		// FAST PATH: Sequential read - continue from current position | |
| 		glog.V(4).Infof("[FETCH-HYBRID] FAST: Sequential read at offset %d", requestedOffset) | |
| 	} | |
| 
 | |
| 	// Read records from stream | |
| 	records, err := bc.readRecordsFromSession(ctx, session, requestedOffset, maxRecords) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Update tracking | |
| 	if len(records) > 0 { | |
| 		session.lastReadOffset = records[len(records)-1].Offset | |
| 		session.initialized = true | |
| 		glog.V(4).Infof("[FETCH-HYBRID] Read %d records, lastReadOffset now %d", | |
| 			len(records), session.lastReadOffset) | |
| 	} | |
| 
 | |
| 	return records, nil | |
| } | |
| 
 | |
| // FetchRecordsWithDedup reads records with request deduplication to prevent duplicate concurrent fetches | |
| // DEPRECATED: Use FetchRecordsHybrid instead for better performance | |
| // ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) | |
| func (bc *BrokerClient) FetchRecordsWithDedup(ctx context.Context, topic string, partition int32, startOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { | |
| 	// Create key for this fetch request | |
| 	key := fmt.Sprintf("%s-%d-%d", topic, partition, startOffset) | |
| 
 | |
| 	glog.V(4).Infof("[FETCH-DEDUP] topic=%s partition=%d offset=%d maxRecords=%d key=%s", | |
| 		topic, partition, startOffset, maxRecords, key) | |
| 
 | |
| 	// Check if there's already a fetch in progress for this exact request | |
| 	bc.fetchRequestsLock.Lock() | |
| 
 | |
| 	if existing, exists := bc.fetchRequests[key]; exists { | |
| 		// Another fetch is in progress for this (topic, partition, offset) | |
| 		// Create a waiter channel and add it to the list | |
| 		waiter := make(chan FetchResult, 1) | |
| 		existing.mu.Lock() | |
| 		existing.waiters = append(existing.waiters, waiter) | |
| 		existing.mu.Unlock() | |
| 		bc.fetchRequestsLock.Unlock() | |
| 
 | |
| 		glog.V(4).Infof("[FETCH-DEDUP] Waiting for in-progress fetch: %s", key) | |
| 
 | |
| 		// Wait for the result from the in-progress fetch | |
| 		select { | |
| 		case result := <-waiter: | |
| 			glog.V(4).Infof("[FETCH-DEDUP] Received result from in-progress fetch: %s (records=%d, err=%v)", | |
| 				key, len(result.records), result.err) | |
| 			return result.records, result.err | |
| 		case <-ctx.Done(): | |
| 			return nil, ctx.Err() | |
| 		} | |
| 	} | |
| 
 | |
| 	// No fetch in progress - this request will do the fetch | |
| 	fetchReq := &FetchRequest{ | |
| 		topic:      topic, | |
| 		partition:  partition, | |
| 		offset:     startOffset, | |
| 		resultChan: make(chan FetchResult, 1), | |
| 		waiters:    []chan FetchResult{}, | |
| 		inProgress: true, | |
| 	} | |
| 	bc.fetchRequests[key] = fetchReq | |
| 	bc.fetchRequestsLock.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[FETCH-DEDUP] Starting new fetch: %s", key) | |
| 
 | |
| 	// Perform the actual fetch | |
| 	records, err := bc.fetchRecordsStatelessInternal(ctx, topic, partition, startOffset, maxRecords, consumerGroup, consumerID) | |
| 
 | |
| 	// Prepare result | |
| 	result := FetchResult{ | |
| 		records: records, | |
| 		err:     err, | |
| 	} | |
| 
 | |
| 	// Broadcast result to all waiters and clean up | |
| 	bc.fetchRequestsLock.Lock() | |
| 	fetchReq.mu.Lock() | |
| 	waiters := fetchReq.waiters | |
| 	fetchReq.mu.Unlock() | |
| 	delete(bc.fetchRequests, key) | |
| 	bc.fetchRequestsLock.Unlock() | |
| 
 | |
| 	// Send result to all waiters | |
| 	glog.V(4).Infof("[FETCH-DEDUP] Broadcasting result to %d waiters: %s (records=%d, err=%v)", | |
| 		len(waiters), key, len(records), err) | |
| 	for _, waiter := range waiters { | |
| 		waiter <- result | |
| 		close(waiter) | |
| 	} | |
| 
 | |
| 	return records, err | |
| } | |
| 
 | |
| // fetchRecordsStatelessInternal is the internal implementation of stateless fetch | |
| // This is called by FetchRecordsWithDedup and should not be called directly | |
| func (bc *BrokerClient) fetchRecordsStatelessInternal(ctx context.Context, topic string, partition int32, startOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { | |
| 	glog.V(4).Infof("[FETCH-STATELESS] topic=%s partition=%d offset=%d maxRecords=%d", | |
| 		topic, partition, startOffset, maxRecords) | |
| 
 | |
| 	// STATELESS APPROACH: Create a temporary subscriber just for this fetch | |
| 	// This eliminates concurrent access to shared offset state | |
| 	tempSubscriber, err := bc.createTemporarySubscriber(topic, partition, startOffset, consumerGroup, consumerID) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to create temporary subscriber: %v", err) | |
| 	} | |
| 
 | |
| 	// Ensure cleanup even if read fails | |
| 	defer func() { | |
| 		if tempSubscriber.Stream != nil { | |
| 			// Send close message | |
| 			tempSubscriber.Stream.CloseSend() | |
| 		} | |
| 		if tempSubscriber.Cancel != nil { | |
| 			tempSubscriber.Cancel() | |
| 		} | |
| 	}() | |
| 
 | |
| 	// Read records from the fresh subscriber (no seeking needed, it starts at startOffset) | |
| 	return bc.readRecordsFrom(ctx, tempSubscriber, startOffset, maxRecords) | |
| } | |
| 
 | |
| // FetchRecordsStateless reads records using a stateless approach (creates fresh subscriber per fetch) | |
| // DEPRECATED: Use FetchRecordsHybrid instead for better performance with session reuse | |
| // This eliminates concurrent access to shared offset state | |
| // ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) | |
| func (bc *BrokerClient) FetchRecordsStateless(ctx context.Context, topic string, partition int32, startOffset int64, maxRecords int, consumerGroup string, consumerID string) ([]*SeaweedRecord, error) { | |
| 	return bc.FetchRecordsHybrid(ctx, topic, partition, startOffset, maxRecords, consumerGroup, consumerID) | |
| } | |
| 
 | |
| // ReadRecordsFromOffset reads records starting from a specific offset using STATELESS approach | |
| // Creates a fresh subscriber for each fetch to eliminate concurrent access issues | |
| // ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) | |
| // DEPRECATED: Use FetchRecordsStateless instead for better API clarity | |
| func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *BrokerSubscriberSession, requestedOffset int64, maxRecords int) ([]*SeaweedRecord, error) { | |
| 	if session == nil { | |
| 		return nil, fmt.Errorf("subscriber session cannot be nil") | |
| 	} | |
| 
 | |
| 	return bc.FetchRecordsStateless(ctx, session.Topic, session.Partition, requestedOffset, maxRecords, session.ConsumerGroup, session.ConsumerID) | |
| } | |
| 
 | |
| // readRecordsFrom reads records from the stream, assigning offsets starting from startOffset | |
| // Uses a timeout-based approach to read multiple records without blocking indefinitely | |
| // ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) | |
| func (bc *BrokerClient) readRecordsFrom(ctx context.Context, session *BrokerSubscriberSession, startOffset int64, maxRecords int) ([]*SeaweedRecord, error) { | |
| 	if session == nil { | |
| 		return nil, fmt.Errorf("subscriber session cannot be nil") | |
| 	} | |
| 
 | |
| 	if session.Stream == nil { | |
| 		return nil, fmt.Errorf("subscriber session stream cannot be nil") | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[FETCH] readRecordsFrom: topic=%s partition=%d startOffset=%d maxRecords=%d", | |
| 		session.Topic, session.Partition, startOffset, maxRecords) | |
| 
 | |
| 	var records []*SeaweedRecord | |
| 	currentOffset := startOffset | |
| 
 | |
| 	// CRITICAL FIX: Return immediately if maxRecords is 0 or negative | |
| 	if maxRecords <= 0 { | |
| 		return records, nil | |
| 	} | |
| 
 | |
| 	// Note: Cache checking is done in ReadRecordsFromOffset, not here | |
| 	// This function is called only when we need to read new data from the stream | |
|  | |
| 	// Read first record with timeout (important for empty topics) | |
| 	// CRITICAL: For SMQ backend with consumer groups, we need adequate timeout for disk reads | |
| 	// When a consumer group resumes from a committed offset, the subscriber may need to: | |
| 	// 1. Connect to the broker (network latency) | |
| 	// 2. Seek to the correct offset in the log file (disk I/O) | |
| 	// 3. Read and deserialize the record (disk I/O) | |
| 	// Total latency can be 100-500ms for cold reads from disk | |
| 	// | |
| 	// CRITICAL: Use the context from the Kafka fetch request | |
| 	// The context timeout is set by the caller based on the Kafka fetch request's MaxWaitTime | |
| 	// This ensures we wait exactly as long as the client requested, not more or less | |
| 	// For in-memory reads (hot path), records arrive in <10ms | |
| 	// For low-volume topics (like _schemas), the caller sets longer timeout to keep subscriber alive | |
| 	// If no context provided, use a reasonable default timeout | |
| 	if ctx == nil { | |
| 		var cancel context.CancelFunc | |
| 		ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) | |
| 		defer cancel() | |
| 	} | |
| 
 | |
| 	// CRITICAL: Capture stream pointer while holding lock to prevent TOCTOU race | |
| 	// If we access session.Stream in the goroutine, it could become nil between check and use | |
| 	stream := session.Stream | |
| 	if stream == nil { | |
| 		glog.V(4).Infof("[FETCH] Stream is nil, cannot read") | |
| 		return records, nil | |
| 	} | |
| 
 | |
| 	type recvResult struct { | |
| 		resp *mq_pb.SubscribeMessageResponse | |
| 		err  error | |
| 	} | |
| 	recvChan := make(chan recvResult, 1) | |
| 
 | |
| 	// Try to receive first record using captured stream pointer | |
| 	go func() { | |
| 		// Recover from panics caused by stream being closed during Recv() | |
| 		defer func() { | |
| 			if r := recover(); r != nil { | |
| 				select { | |
| 				case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: | |
| 				case <-ctx.Done(): | |
| 				} | |
| 			} | |
| 		}() | |
| 		resp, err := stream.Recv() | |
| 		select { | |
| 		case recvChan <- recvResult{resp: resp, err: err}: | |
| 		case <-ctx.Done(): | |
| 			// Context cancelled, don't send (avoid blocking) | |
| 		} | |
| 	}() | |
| 
 | |
| 	select { | |
| 	case result := <-recvChan: | |
| 		if result.err != nil { | |
| 			glog.V(4).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) | |
| 			return records, nil // Return empty - no error for empty topic | |
| 		} | |
| 
 | |
| 		if dataMsg := result.resp.GetData(); dataMsg != nil { | |
| 			record := &SeaweedRecord{ | |
| 				Key:       dataMsg.Key, | |
| 				Value:     dataMsg.Value, | |
| 				Timestamp: dataMsg.TsNs, | |
| 				Offset:    currentOffset, | |
| 			} | |
| 			records = append(records, record) | |
| 			currentOffset++ | |
| 			glog.V(4).Infof("[FETCH] Received first record: offset=%d, keyLen=%d, valueLen=%d", | |
| 				record.Offset, len(record.Key), len(record.Value)) | |
| 
 | |
| 			// CRITICAL: Auto-acknowledge first message immediately for Kafka gateway | |
| 			// Kafka uses offset commits (not per-message acks) so we must ack to prevent | |
| 			// broker from blocking on in-flight messages waiting for acks that will never come | |
| 			ackMsg := &mq_pb.SubscribeMessageRequest{ | |
| 				Message: &mq_pb.SubscribeMessageRequest_Ack{ | |
| 					Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ | |
| 						Key:  dataMsg.Key, | |
| 						TsNs: dataMsg.TsNs, | |
| 					}, | |
| 				}, | |
| 			} | |
| 			if err := stream.Send(ackMsg); err != nil { | |
| 				glog.V(4).Infof("[FETCH] Failed to send ack for first record offset %d: %v (continuing)", record.Offset, err) | |
| 				// Don't fail the fetch if ack fails - continue reading | |
| 			} | |
| 		} | |
| 
 | |
| 	case <-ctx.Done(): | |
| 		// Timeout on first record - topic is empty or no data available | |
| 		glog.V(4).Infof("[FETCH] No data available (timeout on first record)") | |
| 		return records, nil | |
| 	} | |
| 
 | |
| 	// If we got the first record, try to get more with adaptive timeout | |
| 	// CRITICAL: Schema Registry catch-up scenario - give generous timeout for the first batch | |
| 	// Schema Registry needs to read multiple records quickly when catching up (e.g., offsets 3-6) | |
| 	// The broker may be reading from disk, which introduces 10-20ms delay between records | |
| 	// | |
| 	// Strategy: Start with generous timeout (1 second) for first 5 records to allow broker | |
| 	// to read from disk, then switch to fast mode (100ms) for streaming in-memory data | |
| 	consecutiveReads := 0 | |
| 
 | |
| 	for len(records) < maxRecords { | |
| 		// Adaptive timeout based on how many records we've already read | |
| 		var currentTimeout time.Duration | |
| 		if consecutiveReads < 5 { | |
| 			// First 5 records: generous timeout for disk reads + network delays | |
| 			currentTimeout = 1 * time.Second | |
| 		} else { | |
| 			// After 5 records: assume we're streaming from memory, use faster timeout | |
| 			currentTimeout = 100 * time.Millisecond | |
| 		} | |
| 
 | |
| 		readStart := time.Now() | |
| 		// CRITICAL: Use parent context (ctx) to respect client's MaxWaitTime deadline | |
| 		// The per-record timeout is combined with the overall fetch deadline | |
| 		ctx2, cancel2 := context.WithTimeout(ctx, currentTimeout) | |
| 		recvChan2 := make(chan recvResult, 1) | |
| 
 | |
| 		go func() { | |
| 			// Recover from panics caused by stream being closed during Recv() | |
| 			defer func() { | |
| 				if r := recover(); r != nil { | |
| 					select { | |
| 					case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream recv panicked: %v", r)}: | |
| 					case <-ctx2.Done(): | |
| 					} | |
| 				} | |
| 			}() | |
| 			// Use captured stream pointer to prevent TOCTOU race | |
| 			resp, err := stream.Recv() | |
| 			select { | |
| 			case recvChan2 <- recvResult{resp: resp, err: err}: | |
| 			case <-ctx2.Done(): | |
| 				// Context cancelled | |
| 			} | |
| 		}() | |
| 
 | |
| 		select { | |
| 		case result := <-recvChan2: | |
| 			cancel2() | |
| 			readDuration := time.Since(readStart) | |
| 
 | |
| 			if result.err != nil { | |
| 				glog.V(4).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) | |
| 				// Return what we have - cache will be updated at the end | |
| 				break | |
| 			} | |
| 
 | |
| 			if dataMsg := result.resp.GetData(); dataMsg != nil { | |
| 				record := &SeaweedRecord{ | |
| 					Key:       dataMsg.Key, | |
| 					Value:     dataMsg.Value, | |
| 					Timestamp: dataMsg.TsNs, | |
| 					Offset:    currentOffset, | |
| 				} | |
| 				records = append(records, record) | |
| 				currentOffset++ | |
| 				consecutiveReads++ // Track number of successful reads for adaptive timeout | |
|  | |
| 				glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", | |
| 					len(records), record.Offset, len(record.Key), len(record.Value), readDuration) | |
| 
 | |
| 				// CRITICAL: Auto-acknowledge message immediately for Kafka gateway | |
| 				// Kafka uses offset commits (not per-message acks) so we must ack to prevent | |
| 				// broker from blocking on in-flight messages waiting for acks that will never come | |
| 				ackMsg := &mq_pb.SubscribeMessageRequest{ | |
| 					Message: &mq_pb.SubscribeMessageRequest_Ack{ | |
| 						Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ | |
| 							Key:  dataMsg.Key, | |
| 							TsNs: dataMsg.TsNs, | |
| 						}, | |
| 					}, | |
| 				} | |
| 				if err := stream.Send(ackMsg); err != nil { | |
| 					glog.V(4).Infof("[FETCH] Failed to send ack for offset %d: %v (continuing)", record.Offset, err) | |
| 					// Don't fail the fetch if ack fails - continue reading | |
| 				} | |
| 			} | |
| 
 | |
| 		case <-ctx2.Done(): | |
| 			cancel2() | |
| 			// Timeout - return what we have | |
| 			glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) | |
| 			return records, nil | |
| 		} | |
| 	} | |
| 
 | |
| 	glog.V(4).Infof("[FETCH] Returning %d records (maxRecords reached)", len(records)) | |
| 	return records, nil | |
| } | |
| 
 | |
| // ReadRecords is a simplified version for deprecated code paths | |
| // It reads from wherever the stream currently is | |
| func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) { | |
| 	// Determine where stream is based on cache | |
| 	session.mu.Lock() | |
| 	var streamOffset int64 | |
| 	if len(session.consumedRecords) > 0 { | |
| 		streamOffset = session.consumedRecords[len(session.consumedRecords)-1].Offset + 1 | |
| 	} else { | |
| 		streamOffset = session.StartOffset | |
| 	} | |
| 	session.mu.Unlock() | |
| 
 | |
| 	return bc.readRecordsFrom(ctx, session, streamOffset, maxRecords) | |
| } | |
| 
 | |
| // CloseSubscriber closes and removes a subscriber session | |
| func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerGroup string, consumerID string) { | |
| 	tempSession := &BrokerSubscriberSession{ | |
| 		Topic:         topic, | |
| 		Partition:     partition, | |
| 		ConsumerGroup: consumerGroup, | |
| 		ConsumerID:    consumerID, | |
| 	} | |
| 	key := tempSession.Key() | |
| 
 | |
| 	bc.subscribersLock.Lock() | |
| 	defer bc.subscribersLock.Unlock() | |
| 
 | |
| 	if session, exists := bc.subscribers[key]; exists { | |
| 		// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls | |
| 		session.mu.Lock() | |
| 		if session.Stream != nil { | |
| 			_ = session.Stream.CloseSend() | |
| 		} | |
| 		if session.Cancel != nil { | |
| 			session.Cancel() | |
| 		} | |
| 		session.mu.Unlock() | |
| 		delete(bc.subscribers, key) | |
| 		glog.V(4).Infof("[FETCH] Closed subscriber for %s", key) | |
| 	} | |
| } | |
| 
 | |
| // NeedsRestart checks if the subscriber needs to restart to read from the given offset | |
| // Returns true if: | |
| // 1. Requested offset is before current position AND not in cache | |
| // 2. Stream is closed/invalid | |
| func (bc *BrokerClient) NeedsRestart(session *BrokerSubscriberSession, requestedOffset int64) bool { | |
| 	session.mu.Lock() | |
| 	defer session.mu.Unlock() | |
| 
 | |
| 	// Check if stream is still valid | |
| 	if session.Stream == nil || session.Ctx == nil { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// Check if we can serve from cache | |
| 	if len(session.consumedRecords) > 0 { | |
| 		cacheStart := session.consumedRecords[0].Offset | |
| 		cacheEnd := session.consumedRecords[len(session.consumedRecords)-1].Offset | |
| 		if requestedOffset >= cacheStart && requestedOffset <= cacheEnd { | |
| 			// Can serve from cache, no restart needed | |
| 			return false | |
| 		} | |
| 	} | |
| 
 | |
| 	// If requested offset is far behind current position, need restart | |
| 	if requestedOffset < session.StartOffset { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// Check if we're too far ahead (gap in cache) | |
| 	if requestedOffset > session.StartOffset+1000 { | |
| 		// Large gap - might be more efficient to restart | |
| 		return true | |
| 	} | |
| 
 | |
| 	return false | |
| } | |
| 
 | |
| // RestartSubscriber restarts an existing subscriber from a new offset | |
| // This is more efficient than closing and recreating the session | |
| func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newOffset int64, consumerGroup string, consumerID string) error { | |
| 	session.mu.Lock() | |
| 	defer session.mu.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", | |
| 		session.Topic, session.Partition, session.StartOffset, newOffset) | |
| 
 | |
| 	// Close existing stream | |
| 	if session.Stream != nil { | |
| 		_ = session.Stream.CloseSend() | |
| 	} | |
| 	if session.Cancel != nil { | |
| 		session.Cancel() | |
| 	} | |
| 
 | |
| 	// Clear cache since we're seeking to a different position | |
| 	session.consumedRecords = nil | |
| 	session.nextOffsetToRead = newOffset | |
| 
 | |
| 	// Create new stream from new offset | |
| 	subscriberCtx, cancel := context.WithCancel(bc.ctx) | |
| 
 | |
| 	stream, err := bc.client.SubscribeMessage(subscriberCtx) | |
| 	if err != nil { | |
| 		cancel() | |
| 		return fmt.Errorf("failed to create subscribe stream for restart: %v", err) | |
| 	} | |
| 
 | |
| 	// Get the actual partition assignment | |
| 	actualPartition, err := bc.getActualPartitionAssignment(session.Topic, session.Partition) | |
| 	if err != nil { | |
| 		cancel() | |
| 		_ = stream.CloseSend() | |
| 		return fmt.Errorf("failed to get actual partition assignment for restart: %v", err) | |
| 	} | |
| 
 | |
| 	// Send init message with new offset | |
| 	initReq := createSubscribeInitMessage(session.Topic, actualPartition, newOffset, schema_pb.OffsetType_EXACT_OFFSET, consumerGroup, consumerID) | |
| 
 | |
| 	if err := stream.Send(initReq); err != nil { | |
| 		cancel() | |
| 		_ = stream.CloseSend() | |
| 		return fmt.Errorf("failed to send subscribe init for restart: %v", err) | |
| 	} | |
| 
 | |
| 	// Update session with new stream and offset | |
| 	session.Stream = stream | |
| 	session.Cancel = cancel | |
| 	session.Ctx = subscriberCtx | |
| 	session.StartOffset = newOffset | |
| 
 | |
| 	glog.V(4).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", | |
| 		session.Topic, session.Partition, newOffset) | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // Seek helper methods for BrokerSubscriberSession | |
|  | |
| // SeekToOffset repositions the stream to read from a specific offset | |
| func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { | |
| 	// Skip seek if already at the requested offset | |
| 	session.mu.Lock() | |
| 	currentOffset := session.StartOffset | |
| 	session.mu.Unlock() | |
| 
 | |
| 	if currentOffset == offset { | |
| 		glog.V(4).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition) | |
| 		return nil | |
| 	} | |
| 
 | |
| 	seekMsg := &mq_pb.SubscribeMessageRequest{ | |
| 		Message: &mq_pb.SubscribeMessageRequest_Seek{ | |
| 			Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ | |
| 				Offset:     offset, | |
| 				OffsetType: schema_pb.OffsetType_EXACT_OFFSET, | |
| 			}, | |
| 		}, | |
| 	} | |
| 
 | |
| 	if err := session.Stream.Send(seekMsg); err != nil { | |
| 		// Handle graceful shutdown | |
| 		if err == io.EOF { | |
| 			glog.V(4).Infof("[SEEK] Stream closing during seek to offset %d for %s[%d]", offset, session.Topic, session.Partition) | |
| 			return nil // Not an error during shutdown | |
| 		} | |
| 		return fmt.Errorf("seek to offset %d failed: %v", offset, err) | |
| 	} | |
| 
 | |
| 	session.mu.Lock() | |
| 	session.StartOffset = offset | |
| 	// Only clear cache if seeking forward past cached data | |
| 	shouldClearCache := true | |
| 	if len(session.consumedRecords) > 0 { | |
| 		cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset | |
| 		if offset <= cacheEndOffset { | |
| 			shouldClearCache = false | |
| 		} | |
| 	} | |
| 	if shouldClearCache { | |
| 		session.consumedRecords = nil | |
| 	} | |
| 	session.mu.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition) | |
| 	return nil | |
| } | |
| 
 | |
| // SeekToTimestamp repositions the stream to read from messages at or after a specific timestamp | |
| // timestamp is in nanoseconds since Unix epoch | |
| // Note: We don't skip this operation even if we think we're at the right position because | |
| // we can't easily determine the offset corresponding to a timestamp without querying the broker | |
| func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error { | |
| 	seekMsg := &mq_pb.SubscribeMessageRequest{ | |
| 		Message: &mq_pb.SubscribeMessageRequest_Seek{ | |
| 			Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ | |
| 				Offset:     timestampNs, | |
| 				OffsetType: schema_pb.OffsetType_EXACT_TS_NS, | |
| 			}, | |
| 		}, | |
| 	} | |
| 
 | |
| 	if err := session.Stream.Send(seekMsg); err != nil { | |
| 		// Handle graceful shutdown | |
| 		if err == io.EOF { | |
| 			glog.V(4).Infof("[SEEK] Stream closing during seek to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) | |
| 			return nil // Not an error during shutdown | |
| 		} | |
| 		return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err) | |
| 	} | |
| 
 | |
| 	session.mu.Lock() | |
| 	// Note: We don't know the exact offset at this timestamp yet | |
| 	// It will be updated when we read the first message | |
| 	session.consumedRecords = nil | |
| 	session.mu.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[SEEK] Seeked to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) | |
| 	return nil | |
| } | |
| 
 | |
| // SeekToEarliest repositions the stream to the beginning of the partition | |
| // Note: We don't skip this operation even if StartOffset == 0 because the broker | |
| // may have a different notion of "earliest" (e.g., after compaction or retention) | |
| func (session *BrokerSubscriberSession) SeekToEarliest() error { | |
| 	seekMsg := &mq_pb.SubscribeMessageRequest{ | |
| 		Message: &mq_pb.SubscribeMessageRequest_Seek{ | |
| 			Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ | |
| 				Offset:     0, | |
| 				OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 			}, | |
| 		}, | |
| 	} | |
| 
 | |
| 	if err := session.Stream.Send(seekMsg); err != nil { | |
| 		// Handle graceful shutdown | |
| 		if err == io.EOF { | |
| 			glog.V(4).Infof("[SEEK] Stream closing during seek to earliest for %s[%d]", session.Topic, session.Partition) | |
| 			return nil // Not an error during shutdown | |
| 		} | |
| 		return fmt.Errorf("seek to earliest failed: %v", err) | |
| 	} | |
| 
 | |
| 	session.mu.Lock() | |
| 	session.StartOffset = 0 | |
| 	session.consumedRecords = nil | |
| 	session.mu.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[SEEK] Seeked to earliest for %s[%d]", session.Topic, session.Partition) | |
| 	return nil | |
| } | |
| 
 | |
| // SeekToLatest repositions the stream to the end of the partition (next new message) | |
| // Note: We don't skip this operation because "latest" is a moving target and we can't | |
| // reliably determine if we're already at the latest position without querying the broker | |
| func (session *BrokerSubscriberSession) SeekToLatest() error { | |
| 	seekMsg := &mq_pb.SubscribeMessageRequest{ | |
| 		Message: &mq_pb.SubscribeMessageRequest_Seek{ | |
| 			Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ | |
| 				Offset:     0, | |
| 				OffsetType: schema_pb.OffsetType_RESET_TO_LATEST, | |
| 			}, | |
| 		}, | |
| 	} | |
| 
 | |
| 	if err := session.Stream.Send(seekMsg); err != nil { | |
| 		// Handle graceful shutdown | |
| 		if err == io.EOF { | |
| 			glog.V(4).Infof("[SEEK] Stream closing during seek to latest for %s[%d]", session.Topic, session.Partition) | |
| 			return nil // Not an error during shutdown | |
| 		} | |
| 		return fmt.Errorf("seek to latest failed: %v", err) | |
| 	} | |
| 
 | |
| 	session.mu.Lock() | |
| 	// Offset will be set when we read the first new message | |
| 	session.consumedRecords = nil | |
| 	session.mu.Unlock() | |
| 
 | |
| 	glog.V(4).Infof("[SEEK] Seeked to latest for %s[%d]", session.Topic, session.Partition) | |
| 	return nil | |
| }
 |