diff --git a/weed/mq/broker/broker_grpc_sub_offset.go b/weed/mq/broker/broker_grpc_sub_offset.go index ec9cade90..861779789 100644 --- a/weed/mq/broker/broker_grpc_sub_offset.go +++ b/weed/mq/broker/broker_grpc_sub_offset.go @@ -24,29 +24,35 @@ func (b *MessageQueueBroker) SubscribeWithOffset( offsetType schema_pb.OffsetType, startOffset int64, ) error { - + initMessage := req.GetInit() if initMessage == nil { return fmt.Errorf("missing init message") } - // TODO: Fix partition access - SubscribeMessageRequest_InitMessage may not have Partition field - // ASSUMPTION: Using a default partition for now + // Extract partition information from the request t := topic.FromPbTopic(initMessage.Topic) + + // Get partition from the request's partition_offset field + if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil { + return fmt.Errorf("missing partition information in request") + } + + // Use the partition information from the request p := topic.Partition{ - RingSize: 1024, - RangeStart: 0, - RangeStop: 31, + RingSize: initMessage.PartitionOffset.Partition.RingSize, + RangeStart: initMessage.PartitionOffset.Partition.RangeStart, + RangeStop: initMessage.PartitionOffset.Partition.RangeStop, UnixTimeNs: time.Now().UnixNano(), } - + // Create offset-based subscription subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset) subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset) if err != nil { return fmt.Errorf("failed to create offset subscription: %w", err) } - + defer func() { if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil { glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr) @@ -71,41 +77,41 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( stream mq_pb.SeaweedMessaging_SubscribeMessageServer, initMessage *mq_pb.SubscribeMessageRequest_InitMessage, ) error { - + clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId) - + // TODO: Implement offset-based message reading // ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately // This should be replaced with proper offset-based reading from storage - - return localPartition.Subscribe(clientName, + + return localPartition.Subscribe(clientName, // Start position - TODO: Convert offset to MessagePosition - log_buffer.MessagePosition{}, + log_buffer.MessagePosition{}, func() bool { // Check if subscription is still active and not at end if !subscription.IsActive { return false } - + atEnd, err := subscription.IsAtEnd() if err != nil { glog.V(0).Infof("Error checking if subscription at end: %v", err) return false } - + return !atEnd }, func(logEntry *filer_pb.LogEntry) (bool, error) { // Check if this message matches our offset requirements currentOffset := subscription.GetNextOffset() - + // TODO: Map LogEntry to offset - for now using timestamp as proxy // ASSUMPTION: LogEntry.Offset field should be populated by the publish flow if logEntry.Offset < currentOffset { // Skip messages before our current offset return false, nil } - + // Send message to client if err := stream.Send(&mq_pb.SubscribeMessageResponse{ Message: &mq_pb.SubscribeMessageResponse_Data{ @@ -119,10 +125,10 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( glog.Errorf("Error sending data to %s: %v", clientName, err) return false, err } - + // Advance subscription offset subscription.AdvanceOffset() - + // Check context for cancellation select { case <-ctx.Done(): @@ -139,25 +145,25 @@ func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[str if err != nil { return nil, err } - + lag, err := subscription.GetLag() if err != nil { return nil, err } - + atEnd, err := subscription.IsAtEnd() if err != nil { return nil, err } - + return map[string]interface{}{ - "subscription_id": subscription.ID, - "start_offset": subscription.StartOffset, - "current_offset": subscription.CurrentOffset, - "offset_type": subscription.OffsetType.String(), - "is_active": subscription.IsActive, - "lag": lag, - "at_end": atEnd, + "subscription_id": subscription.ID, + "start_offset": subscription.StartOffset, + "current_offset": subscription.CurrentOffset, + "offset_type": subscription.OffsetType.String(), + "is_active": subscription.IsActive, + "lag": lag, + "at_end": atEnd, }, nil } @@ -174,6 +180,6 @@ func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int6 if err != nil { return err } - + return subscription.SeekToOffset(offset) }