|
|
|
@ -24,29 +24,35 @@ func (b *MessageQueueBroker) SubscribeWithOffset( |
|
|
|
offsetType schema_pb.OffsetType, |
|
|
|
startOffset int64, |
|
|
|
) error { |
|
|
|
|
|
|
|
|
|
|
|
initMessage := req.GetInit() |
|
|
|
if initMessage == nil { |
|
|
|
return fmt.Errorf("missing init message") |
|
|
|
} |
|
|
|
|
|
|
|
// TODO: Fix partition access - SubscribeMessageRequest_InitMessage may not have Partition field
|
|
|
|
// ASSUMPTION: Using a default partition for now
|
|
|
|
// Extract partition information from the request
|
|
|
|
t := topic.FromPbTopic(initMessage.Topic) |
|
|
|
|
|
|
|
// Get partition from the request's partition_offset field
|
|
|
|
if initMessage.PartitionOffset == nil || initMessage.PartitionOffset.Partition == nil { |
|
|
|
return fmt.Errorf("missing partition information in request") |
|
|
|
} |
|
|
|
|
|
|
|
// Use the partition information from the request
|
|
|
|
p := topic.Partition{ |
|
|
|
RingSize: 1024, |
|
|
|
RangeStart: 0, |
|
|
|
RangeStop: 31, |
|
|
|
RingSize: initMessage.PartitionOffset.Partition.RingSize, |
|
|
|
RangeStart: initMessage.PartitionOffset.Partition.RangeStart, |
|
|
|
RangeStop: initMessage.PartitionOffset.Partition.RangeStop, |
|
|
|
UnixTimeNs: time.Now().UnixNano(), |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create offset-based subscription
|
|
|
|
subscriptionID := fmt.Sprintf("%s-%s-%d", initMessage.ConsumerGroup, initMessage.ConsumerId, startOffset) |
|
|
|
subscription, err := b.offsetManager.CreateSubscription(subscriptionID, t, p, offsetType, startOffset) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to create offset subscription: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
if closeErr := b.offsetManager.CloseSubscription(subscriptionID); closeErr != nil { |
|
|
|
glog.V(0).Infof("Failed to close subscription %s: %v", subscriptionID, closeErr) |
|
|
|
@ -71,41 +77,41 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( |
|
|
|
stream mq_pb.SeaweedMessaging_SubscribeMessageServer, |
|
|
|
initMessage *mq_pb.SubscribeMessageRequest_InitMessage, |
|
|
|
) error { |
|
|
|
|
|
|
|
|
|
|
|
clientName := fmt.Sprintf("%s-%s", initMessage.ConsumerGroup, initMessage.ConsumerId) |
|
|
|
|
|
|
|
|
|
|
|
// TODO: Implement offset-based message reading
|
|
|
|
// ASSUMPTION: For now, we'll use the existing subscription mechanism and track offsets separately
|
|
|
|
// This should be replaced with proper offset-based reading from storage
|
|
|
|
|
|
|
|
return localPartition.Subscribe(clientName, |
|
|
|
|
|
|
|
return localPartition.Subscribe(clientName, |
|
|
|
// Start position - TODO: Convert offset to MessagePosition
|
|
|
|
log_buffer.MessagePosition{}, |
|
|
|
log_buffer.MessagePosition{}, |
|
|
|
func() bool { |
|
|
|
// Check if subscription is still active and not at end
|
|
|
|
if !subscription.IsActive { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
atEnd, err := subscription.IsAtEnd() |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("Error checking if subscription at end: %v", err) |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return !atEnd |
|
|
|
}, |
|
|
|
func(logEntry *filer_pb.LogEntry) (bool, error) { |
|
|
|
// Check if this message matches our offset requirements
|
|
|
|
currentOffset := subscription.GetNextOffset() |
|
|
|
|
|
|
|
|
|
|
|
// TODO: Map LogEntry to offset - for now using timestamp as proxy
|
|
|
|
// ASSUMPTION: LogEntry.Offset field should be populated by the publish flow
|
|
|
|
if logEntry.Offset < currentOffset { |
|
|
|
// Skip messages before our current offset
|
|
|
|
return false, nil |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Send message to client
|
|
|
|
if err := stream.Send(&mq_pb.SubscribeMessageResponse{ |
|
|
|
Message: &mq_pb.SubscribeMessageResponse_Data{ |
|
|
|
@ -119,10 +125,10 @@ func (b *MessageQueueBroker) subscribeWithOffsetSubscription( |
|
|
|
glog.Errorf("Error sending data to %s: %v", clientName, err) |
|
|
|
return false, err |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Advance subscription offset
|
|
|
|
subscription.AdvanceOffset() |
|
|
|
|
|
|
|
|
|
|
|
// Check context for cancellation
|
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
@ -139,25 +145,25 @@ func (b *MessageQueueBroker) GetSubscriptionInfo(subscriptionID string) (map[str |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
lag, err := subscription.GetLag() |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
atEnd, err := subscription.IsAtEnd() |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return map[string]interface{}{ |
|
|
|
"subscription_id": subscription.ID, |
|
|
|
"start_offset": subscription.StartOffset, |
|
|
|
"current_offset": subscription.CurrentOffset, |
|
|
|
"offset_type": subscription.OffsetType.String(), |
|
|
|
"is_active": subscription.IsActive, |
|
|
|
"lag": lag, |
|
|
|
"at_end": atEnd, |
|
|
|
"subscription_id": subscription.ID, |
|
|
|
"start_offset": subscription.StartOffset, |
|
|
|
"current_offset": subscription.CurrentOffset, |
|
|
|
"offset_type": subscription.OffsetType.String(), |
|
|
|
"is_active": subscription.IsActive, |
|
|
|
"lag": lag, |
|
|
|
"at_end": atEnd, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
@ -174,6 +180,6 @@ func (b *MessageQueueBroker) SeekSubscription(subscriptionID string, offset int6 |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return subscription.SeekToOffset(offset) |
|
|
|
} |