You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							253 lines
						
					
					
						
							8.3 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							253 lines
						
					
					
						
							8.3 KiB
						
					
					
				
								package broker
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/offset"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// SubscribeWithOffset handles subscription requests with offset-based positioning
							 | 
						|
								// TODO: This extends the broker with offset-aware subscription support
							 | 
						|
								// ASSUMPTION: This will eventually be integrated into the main SubscribeMessage method
							 | 
						|
								func (b *MessageQueueBroker) SubscribeWithOffset(
							 | 
						|
									ctx context.Context,
							 | 
						|
									req *mq_pb.SubscribeMessageRequest,
							 | 
						|
									stream mq_pb.SeaweedMessaging_SubscribeMessageServer,
							 | 
						|
									offsetType schema_pb.OffsetType,
							 | 
						|
									startOffset int64,
							 | 
						|
								) error {
							 | 
						|
								
							 | 
						|
									initMessage := req.GetInit()
							 | 
						|
									if initMessage == nil {
							 | 
						|
										return fmt.Errorf("missing init message")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Extract partition information from the request
							 | 
						|
									t := topic.FromPbTopic(initMessage.Topic)
							 | 
						|
								
							 | 
						|
									// Get partition from the request's partition_offset field
							 | 
						|
									if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil {
							 | 
						|
										return fmt.Errorf("missing partition information in request")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Use the partition information from the request
							 | 
						|
									p := topic.Partition{
							 | 
						|
										RingSize:   initMessage.PartitionOffset.Partition.RingSize,
							 | 
						|
										RangeStart: initMessage.PartitionOffset.Partition.RangeStart,
							 | 
						|
										RangeStop:  initMessage.PartitionOffset.Partition.RangeStop,
							 | 
						|
										UnixTimeNs: initMessage.PartitionOffset.Partition.UnixTimeNs,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create offset-based subscription
							 | 
						|
									subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset)
							 | 
						|
									subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create offset subscription: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									defer func() {
							 | 
						|
										if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil {
							 | 
						|
											glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr)
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									// Get local partition for reading
							 | 
						|
									localTopicPartition, err := b.GetOrGenerateLocalPartition(t, p)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("topic %v partition %v not found: %v", t, p, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Subscribe to messages using offset-based positioning
							 | 
						|
									return b.subscribeWithOffsetSubscription(ctx, localTopicPartition, subscription, stream, initMessage)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// subscribeWithOffsetSubscription handles the actual message consumption with offset tracking
							 | 
						|
								func (b *MessageQueueBroker) subscribeWithOffsetSubscription(
							 | 
						|
									ctx context.Context,
							 | 
						|
									localPartition *topic.LocalPartition,
							 | 
						|
									subscription *offset.OffsetSubscription,
							 | 
						|
									stream mq_pb.SeaweedMessaging_SubscribeMessageServer,
							 | 
						|
									initMessage *mq_pb.SubscribeMessageRequest_InitMessage,
							 | 
						|
								) error {
							 | 
						|
								
							 | 
						|
									clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId)
							 | 
						|
								
							 | 
						|
									// TODO: Implement offset-based message reading
							 | 
						|
									// ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately
							 | 
						|
									// This should be replaced with proper offset-based reading from storage
							 | 
						|
								
							 | 
						|
									// Convert the subscription's current offset to a proper MessagePosition
							 | 
						|
									startPosition, err := b.convertOffsetToMessagePosition(subscription)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to convert offset to message position: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(0).Infof("[%s] Starting Subscribe for topic %s partition %d-%d at offset %d",
							 | 
						|
										clientName, subscription.TopicName, subscription.Partition.RangeStart, subscription.Partition.RangeStop, subscription.CurrentOffset)
							 | 
						|
								
							 | 
						|
									return localPartition.Subscribe(clientName,
							 | 
						|
										startPosition,
							 | 
						|
										func() bool {
							 | 
						|
											// Check if context is cancelled (client disconnected)
							 | 
						|
											select {
							 | 
						|
											case <-ctx.Done():
							 | 
						|
												glog.V(0).Infof("[%s] Context cancelled, stopping", clientName)
							 | 
						|
												return false
							 | 
						|
											default:
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Check if subscription is still active and not at end
							 | 
						|
											if !subscription.IsActive {
							 | 
						|
												glog.V(0).Infof("[%s] Subscription not active, stopping", clientName)
							 | 
						|
												return false
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											atEnd, err := subscription.IsAtEnd()
							 | 
						|
											if err != nil {
							 | 
						|
												glog.V(0).Infof("[%s] Error checking if subscription at end: %v", clientName, err)
							 | 
						|
												return false
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											if atEnd {
							 | 
						|
												glog.V(4).Infof("[%s] At end of subscription, stopping", clientName)
							 | 
						|
												return false
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Add a small sleep to avoid CPU busy-wait when checking for new data
							 | 
						|
											time.Sleep(10 * time.Millisecond)
							 | 
						|
											return true
							 | 
						|
										},
							 | 
						|
										func(logEntry *filer_pb.LogEntry) (bool, error) {
							 | 
						|
											// Check if this message matches our offset requirements
							 | 
						|
											currentOffset := subscription.GetNextOffset()
							 | 
						|
								
							 | 
						|
											if logEntry.Offset < currentOffset {
							 | 
						|
												// Skip messages before our current offset
							 | 
						|
												return false, nil
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Send message to client
							 | 
						|
											if err := stream.Send(&mq_pb.SubscribeMessageResponse{
							 | 
						|
												Message: &mq_pb.SubscribeMessageResponse_Data{
							 | 
						|
													Data: &mq_pb.DataMessage{
							 | 
						|
														Key:   logEntry.Key,
							 | 
						|
														Value: logEntry.Data,
							 | 
						|
														TsNs:  logEntry.TsNs,
							 | 
						|
													},
							 | 
						|
												},
							 | 
						|
											}); err != nil {
							 | 
						|
												glog.Errorf("Error sending data to %s: %v", clientName, err)
							 | 
						|
												return false, err
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Advance subscription offset
							 | 
						|
											subscription.AdvanceOffset()
							 | 
						|
								
							 | 
						|
											// Check context for cancellation
							 | 
						|
											select {
							 | 
						|
											case <-ctx.Done():
							 | 
						|
												return true, ctx.Err()
							 | 
						|
											default:
							 | 
						|
												return false, nil
							 | 
						|
											}
							 | 
						|
										})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetSubscriptionInfo returns information about an active subscription
							 | 
						|
								func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[string]interface{}, error) {
							 | 
						|
									subscription, err := b.offsetManager.GetSubscription(subscriptionID)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									lag, err := subscription.GetLag()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									atEnd, err := subscription.IsAtEnd()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return map[string]interface{}{
							 | 
						|
										"subscription_id": subscription.ID,
							 | 
						|
										"start_offset":    subscription.StartOffset,
							 | 
						|
										"current_offset":  subscription.CurrentOffset,
							 | 
						|
										"offset_type":     subscription.OffsetType.String(),
							 | 
						|
										"is_active":       subscription.IsActive,
							 | 
						|
										"lag":             lag,
							 | 
						|
										"at_end":          atEnd,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ListActiveSubscriptions returns information about all active subscriptions
							 | 
						|
								func (b *MessageQueueBroker) ListActiveSubscriptions() ([]map[string]interface{}, error) {
							 | 
						|
									subscriptions, err := b.offsetManager.ListActiveSubscriptions()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									result := make([]map[string]interface{}, len(subscriptions))
							 | 
						|
									for i, subscription := range subscriptions {
							 | 
						|
										lag, _ := subscription.GetLag()
							 | 
						|
										atEnd, _ := subscription.IsAtEnd()
							 | 
						|
								
							 | 
						|
										result[i] = map[string]interface{}{
							 | 
						|
											"subscription_id": subscription.ID,
							 | 
						|
											"start_offset":    subscription.StartOffset,
							 | 
						|
											"current_offset":  subscription.CurrentOffset,
							 | 
						|
											"offset_type":     subscription.OffsetType.String(),
							 | 
						|
											"is_active":       subscription.IsActive,
							 | 
						|
											"lag":             lag,
							 | 
						|
											"at_end":          atEnd,
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return result, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SeekSubscription seeks an existing subscription to a specific offset
							 | 
						|
								func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int64) error {
							 | 
						|
									subscription, err := b.offsetManager.GetSubscription(subscriptionID)
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return subscription.SeekToOffset(offset)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertOffsetToMessagePosition converts a subscription's current offset to a MessagePosition for log_buffer
							 | 
						|
								func (b *MessageQueueBroker) convertOffsetToMessagePosition(subscription *offset.OffsetSubscription) (log_buffer.MessagePosition, error) {
							 | 
						|
									currentOffset := subscription.GetNextOffset()
							 | 
						|
								
							 | 
						|
									// Handle special offset cases
							 | 
						|
									switch subscription.OffsetType {
							 | 
						|
									case schema_pb.OffsetType_RESET_TO_EARLIEST:
							 | 
						|
										return log_buffer.NewMessagePosition(1, -3), nil
							 | 
						|
								
							 | 
						|
									case schema_pb.OffsetType_RESET_TO_LATEST:
							 | 
						|
										return log_buffer.NewMessagePosition(time.Now().UnixNano(), -4), nil
							 | 
						|
								
							 | 
						|
									case schema_pb.OffsetType_EXACT_OFFSET:
							 | 
						|
										// Use proper offset-based positioning that provides consistent results
							 | 
						|
										// This uses the same approach as the main subscription handler in broker_grpc_sub.go
							 | 
						|
										return log_buffer.NewMessagePositionFromOffset(currentOffset), nil
							 | 
						|
								
							 | 
						|
									case schema_pb.OffsetType_EXACT_TS_NS:
							 | 
						|
										// For exact timestamps, use the timestamp directly
							 | 
						|
										return log_buffer.NewMessagePosition(currentOffset, -2), nil
							 | 
						|
								
							 | 
						|
									default:
							 | 
						|
										// Default to starting from current time for unknown offset types
							 | 
						|
										return log_buffer.NewMessagePosition(time.Now().UnixNano(), -2), nil
							 | 
						|
									}
							 | 
						|
								}
							 |