From 5222ddaf2f1d4216be89f6120916ab4b4e56ec22 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 00:40:27 -0700 Subject: [PATCH] seekable subscribe messages --- weed/mq/broker/broker_grpc_sub.go | 148 ++- weed/mq/broker/broker_grpc_sub_seek_test.go | 872 ++++++++++++++++++ .../integration/broker_client_subscribe.go | 337 +++---- weed/pb/mq_broker.proto | 5 + weed/pb/mq_pb/mq_broker.pb.go | 307 +++--- 5 files changed, 1373 insertions(+), 296 deletions(-) create mode 100644 weed/mq/broker/broker_grpc_sub_seek_test.go diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 2c2326cbd..1d607999e 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -107,6 +107,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("follower %s connected", follower) } + // Channel to handle seek requests - signals Subscribe loop to restart from new offset + seekChan := make(chan *mq_pb.SubscribeMessageRequest_SeekMessage, 1) + go func() { defer cancel() // CRITICAL: Cancel context when Recv goroutine exits (client disconnect) @@ -128,6 +131,27 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) break } + // Handle seek messages + if seekMsg := ack.GetSeek(); seekMsg != nil { + glog.V(0).Infof("Subscriber %s received seek request to offset %d (type %v)", + clientName, seekMsg.Offset, seekMsg.OffsetType) + + // Send seek request to Subscribe loop + select { + case seekChan <- seekMsg: + glog.V(0).Infof("Subscriber %s seek request queued", clientName) + default: + glog.V(0).Infof("Subscriber %s seek request dropped (already pending)", clientName) + // Send error response if seek is already in progress + stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ + Error: "Seek already in progress", + }, + }}) + } + continue + } + if ack.GetAck().Key == nil { // skip ack for control messages continue @@ -182,20 +206,30 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs localTopicPartition.ListenersLock.Unlock() }() - err = localTopicPartition.Subscribe(clientName, startPosition, func() bool { - // Check cancellation before waiting - if ctx.Err() != nil || !isConnected { - return false - } + // Subscribe loop - can be restarted when seek is requested + currentPosition := startPosition +subscribeLoop: + for { + // Context for this iteration of Subscribe (can be cancelled by seek) + subscribeCtx, subscribeCancel := context.WithCancel(ctx) + + // Start Subscribe in a goroutine so we can interrupt it with seek + subscribeDone := make(chan error, 1) + go func() { + subscribeErr := localTopicPartition.Subscribe(clientName, currentPosition, func() bool { + // Check cancellation before waiting + if subscribeCtx.Err() != nil || !isConnected { + return false + } - // Wait for new data using condition variable (blocking, not polling) - localTopicPartition.ListenersLock.Lock() - localTopicPartition.ListenersCond.Wait() - localTopicPartition.ListenersLock.Unlock() + // Wait for new data using condition variable (blocking, not polling) + localTopicPartition.ListenersLock.Lock() + localTopicPartition.ListenersCond.Wait() + localTopicPartition.ListenersLock.Unlock() - // After waking up, check if we should stop - return ctx.Err() == nil && isConnected - }, func(logEntry *filer_pb.LogEntry) (bool, error) { + // After waking up, check if we should stop + return subscribeCtx.Err() == nil && isConnected + }, func(logEntry *filer_pb.LogEntry) (bool, error) { // Wait for the message to be acknowledged with a timeout to prevent infinite loops const maxWaitTime = 30 * time.Second const checkInterval = 137 * time.Millisecond @@ -215,10 +249,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Check if the client has disconnected by monitoring the context select { - case <-ctx.Done(): - err := ctx.Err() + case <-subscribeCtx.Done(): + err := subscribeCtx.Err() if err == context.Canceled { - // Client disconnected + // Subscribe cancelled (seek or disconnect) return false, nil } glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) @@ -250,7 +284,46 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs counter++ return false, nil - }) + }) + subscribeDone <- subscribeErr + }() + + // Wait for either Subscribe to complete or a seek request + select { + case err = <-subscribeDone: + subscribeCancel() + if err != nil || ctx.Err() != nil { + // Subscribe finished with error or main context cancelled - exit loop + break subscribeLoop + } + // Subscribe completed normally (shouldn't happen in streaming mode) + break subscribeLoop + + case seekMsg := <-seekChan: + // Seek requested - cancel current Subscribe and restart from new offset + glog.V(0).Infof("Subscriber %s seeking from offset %d to offset %d (type %v)", + clientName, currentPosition.GetOffset(), seekMsg.Offset, seekMsg.OffsetType) + + // Cancel current Subscribe iteration + subscribeCancel() + + // Wait for Subscribe to finish cancelling + <-subscribeDone + + // Update position for next iteration + currentPosition = b.getRequestPositionFromSeek(seekMsg) + glog.V(0).Infof("Subscriber %s restarting Subscribe from new offset %d", clientName, seekMsg.Offset) + + // Send acknowledgment that seek completed + stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ + Error: "", // Empty error means success + }, + }}) + + // Loop will restart with new position + } + } return err } @@ -304,3 +377,46 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess } return } + +// getRequestPositionFromSeek converts a seek request to a MessagePosition +// This is used when implementing full seek support in Subscribe loop +func (b *MessageQueueBroker) getRequestPositionFromSeek(seekMsg *mq_pb.SubscribeMessageRequest_SeekMessage) (startPosition log_buffer.MessagePosition) { + if seekMsg == nil { + return + } + + offsetType := seekMsg.OffsetType + offset := seekMsg.Offset + + // reset to earliest or latest + if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -3) + return + } + if offsetType == schema_pb.OffsetType_RESET_TO_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) + return + } + + // use the exact timestamp + if offsetType == schema_pb.OffsetType_EXACT_TS_NS { + startPosition = log_buffer.NewMessagePosition(offset, -2) + return + } + + // use exact offset (native offset-based positioning) + if offsetType == schema_pb.OffsetType_EXACT_OFFSET { + startPosition = log_buffer.NewMessagePositionFromOffset(offset) + return + } + + // reset to specific offset + if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET { + startPosition = log_buffer.NewMessagePositionFromOffset(offset) + return + } + + // default to exact offset + startPosition = log_buffer.NewMessagePositionFromOffset(offset) + return +} diff --git a/weed/mq/broker/broker_grpc_sub_seek_test.go b/weed/mq/broker/broker_grpc_sub_seek_test.go new file mode 100644 index 000000000..adf3df68c --- /dev/null +++ b/weed/mq/broker/broker_grpc_sub_seek_test.go @@ -0,0 +1,872 @@ +package broker + +import ( + "context" + "fmt" + "io" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/grpc/metadata" +) + +// TestGetRequestPositionFromSeek tests the helper function that converts seek requests to message positions +func TestGetRequestPositionFromSeek(t *testing.T) { + broker := &MessageQueueBroker{} + + tests := []struct { + name string + offsetType schema_pb.OffsetType + offset int64 + expectedBatch int64 + expectZeroTime bool + }{ + { + name: "reset to earliest", + offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + offset: 0, + expectedBatch: -3, + expectZeroTime: false, + }, + { + name: "reset to latest", + offsetType: schema_pb.OffsetType_RESET_TO_LATEST, + offset: 0, + expectedBatch: -4, + expectZeroTime: false, + }, + { + name: "exact offset zero", + offsetType: schema_pb.OffsetType_EXACT_OFFSET, + offset: 0, + expectedBatch: 0, + expectZeroTime: true, + }, + { + name: "exact offset 100", + offsetType: schema_pb.OffsetType_EXACT_OFFSET, + offset: 100, + expectedBatch: 100, + expectZeroTime: true, + }, + { + name: "exact offset 1000", + offsetType: schema_pb.OffsetType_EXACT_OFFSET, + offset: 1000, + expectedBatch: 1000, + expectZeroTime: true, + }, + { + name: "exact timestamp", + offsetType: schema_pb.OffsetType_EXACT_TS_NS, + offset: 1234567890123456789, + expectedBatch: -2, + expectZeroTime: false, + }, + { + name: "reset to offset", + offsetType: schema_pb.OffsetType_RESET_TO_OFFSET, + offset: 42, + expectedBatch: 42, + expectZeroTime: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tt.offset, + OffsetType: tt.offsetType, + } + + position := broker.getRequestPositionFromSeek(seekMsg) + + if position.Offset != tt.expectedBatch { + t.Errorf("Expected batch index %d, got %d", tt.expectedBatch, position.Offset) + } + + // Verify time handling + if tt.expectZeroTime && !position.Time.IsZero() { + t.Errorf("Expected zero time for offset-based seek, got %v", position.Time) + } + + if !tt.expectZeroTime && position.Time.IsZero() && tt.offsetType != schema_pb.OffsetType_RESET_TO_EARLIEST { + t.Errorf("Expected non-zero time, got zero time") + } + }) + } +} + +// TestGetRequestPositionFromSeek_NilSafety tests that the function handles nil input gracefully +func TestGetRequestPositionFromSeek_NilSafety(t *testing.T) { + broker := &MessageQueueBroker{} + + position := broker.getRequestPositionFromSeek(nil) + + // Should return zero-value position without panicking + if position.Offset != 0 { + t.Errorf("Expected zero offset for nil input, got %d", position.Offset) + } +} + +// TestGetRequestPositionFromSeek_ConsistentResults verifies that multiple calls with same input produce same output +func TestGetRequestPositionFromSeek_ConsistentResults(t *testing.T) { + broker := &MessageQueueBroker{} + + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 42, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + + // Call multiple times + positions := make([]log_buffer.MessagePosition, 5) + for i := 0; i < 5; i++ { + positions[i] = broker.getRequestPositionFromSeek(seekMsg) + time.Sleep(1 * time.Millisecond) // Small delay + } + + // All positions should be identical + for i := 1; i < len(positions); i++ { + if positions[i].Offset != positions[0].Offset { + t.Errorf("Inconsistent Offset: %d vs %d", positions[0].Offset, positions[i].Offset) + } + if !positions[i].Time.Equal(positions[0].Time) { + t.Errorf("Inconsistent Time: %v vs %v", positions[0].Time, positions[i].Time) + } + if positions[i].IsOffsetBased != positions[0].IsOffsetBased { + t.Errorf("Inconsistent IsOffsetBased: %v vs %v", positions[0].IsOffsetBased, positions[i].IsOffsetBased) + } + } +} + +// TestGetRequestPositionFromSeek_OffsetExtraction verifies offset can be correctly extracted +func TestGetRequestPositionFromSeek_OffsetExtraction(t *testing.T) { + broker := &MessageQueueBroker{} + + testOffsets := []int64{0, 1, 10, 100, 1000, 9999} + + for _, offset := range testOffsets { + t.Run(fmt.Sprintf("offset_%d", offset), func(t *testing.T) { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: offset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + + position := broker.getRequestPositionFromSeek(seekMsg) + + if !position.IsOffsetBased { + t.Error("Position should be detected as offset-based") + } + + if extractedOffset := position.GetOffset(); extractedOffset != offset { + t.Errorf("Expected extracted offset %d, got %d", offset, extractedOffset) + } + }) + } +} + +// MockSubscribeMessageStream is a mock implementation of the gRPC stream for testing +type MockSubscribeMessageStream struct { + ctx context.Context + recvChan chan *mq_pb.SubscribeMessageRequest + sentMessages []*mq_pb.SubscribeMessageResponse + mu sync.Mutex + recvErr error +} + +func NewMockSubscribeMessageStream(ctx context.Context) *MockSubscribeMessageStream { + return &MockSubscribeMessageStream{ + ctx: ctx, + recvChan: make(chan *mq_pb.SubscribeMessageRequest, 10), + sentMessages: make([]*mq_pb.SubscribeMessageResponse, 0), + } +} + +func (m *MockSubscribeMessageStream) Send(msg *mq_pb.SubscribeMessageResponse) error { + m.mu.Lock() + defer m.mu.Unlock() + m.sentMessages = append(m.sentMessages, msg) + return nil +} + +func (m *MockSubscribeMessageStream) Recv() (*mq_pb.SubscribeMessageRequest, error) { + if m.recvErr != nil { + return nil, m.recvErr + } + + select { + case msg := <-m.recvChan: + return msg, nil + case <-m.ctx.Done(): + return nil, io.EOF + } +} + +func (m *MockSubscribeMessageStream) SetHeader(metadata.MD) error { + return nil +} + +func (m *MockSubscribeMessageStream) SendHeader(metadata.MD) error { + return nil +} + +func (m *MockSubscribeMessageStream) SetTrailer(metadata.MD) {} + +func (m *MockSubscribeMessageStream) Context() context.Context { + return m.ctx +} + +func (m *MockSubscribeMessageStream) SendMsg(interface{}) error { + return nil +} + +func (m *MockSubscribeMessageStream) RecvMsg(interface{}) error { + return nil +} + +func (m *MockSubscribeMessageStream) QueueMessage(msg *mq_pb.SubscribeMessageRequest) { + m.recvChan <- msg +} + +func (m *MockSubscribeMessageStream) SetRecvError(err error) { + m.recvErr = err +} + +func (m *MockSubscribeMessageStream) GetSentMessages() []*mq_pb.SubscribeMessageResponse { + m.mu.Lock() + defer m.mu.Unlock() + return append([]*mq_pb.SubscribeMessageResponse{}, m.sentMessages...) +} + +// TestSeekMessageHandling_BasicSeek tests that seek messages are properly received and acknowledged +func TestSeekMessageHandling_BasicSeek(t *testing.T) { + // Create seek message + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 100, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, + } + + // Verify message structure + if seekReq := seekMsg.GetSeek(); seekReq == nil { + t.Fatal("Failed to create seek message") + } else { + if seekReq.Offset != 100 { + t.Errorf("Expected offset 100, got %d", seekReq.Offset) + } + if seekReq.OffsetType != schema_pb.OffsetType_EXACT_OFFSET { + t.Errorf("Expected EXACT_OFFSET, got %v", seekReq.OffsetType) + } + } +} + +// TestSeekMessageHandling_MultipleSeekTypes tests different seek offset types +func TestSeekMessageHandling_MultipleSeekTypes(t *testing.T) { + testCases := []struct { + name string + offset int64 + offsetType schema_pb.OffsetType + }{ + { + name: "seek to earliest", + offset: 0, + offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + }, + { + name: "seek to latest", + offset: 0, + offsetType: schema_pb.OffsetType_RESET_TO_LATEST, + }, + { + name: "seek to exact offset", + offset: 42, + offsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + { + name: "seek to timestamp", + offset: time.Now().UnixNano(), + offsetType: schema_pb.OffsetType_EXACT_TS_NS, + }, + { + name: "reset to offset", + offset: 1000, + offsetType: schema_pb.OffsetType_RESET_TO_OFFSET, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tc.offset, + OffsetType: tc.offsetType, + }, + }, + } + + seekReq := seekMsg.GetSeek() + if seekReq == nil { + t.Fatal("Failed to get seek message") + } + + if seekReq.Offset != tc.offset { + t.Errorf("Expected offset %d, got %d", tc.offset, seekReq.Offset) + } + + if seekReq.OffsetType != tc.offsetType { + t.Errorf("Expected offset type %v, got %v", tc.offsetType, seekReq.OffsetType) + } + }) + } +} + +// TestSeekMessageHandling_AckVsSeekDistinction tests that we can distinguish between ack and seek messages +func TestSeekMessageHandling_AckVsSeekDistinction(t *testing.T) { + // Create ack message + ackMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: []byte("test-key"), + TsNs: time.Now().UnixNano(), + }, + }, + } + + // Create seek message + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 100, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, + } + + // Verify ack message doesn't match seek + if ackMsg.GetSeek() != nil { + t.Error("Ack message should not be detected as seek") + } + if ackMsg.GetAck() == nil { + t.Error("Ack message should be detected as ack") + } + + // Verify seek message doesn't match ack + if seekMsg.GetAck() != nil { + t.Error("Seek message should not be detected as ack") + } + if seekMsg.GetSeek() == nil { + t.Error("Seek message should be detected as seek") + } +} + +// TestSeekMessageResponse_SuccessFormat tests the response format for successful seek +func TestSeekMessageResponse_SuccessFormat(t *testing.T) { + // Create success response (empty error string = success) + successResponse := &mq_pb.SubscribeMessageResponse{ + Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ + Error: "", // Empty error means success + }, + }, + } + + ctrlMsg := successResponse.GetCtrl() + if ctrlMsg == nil { + t.Fatal("Failed to get control message") + } + + // Empty error string indicates success + if ctrlMsg.Error != "" { + t.Errorf("Expected empty error for success, got: %s", ctrlMsg.Error) + } +} + +// TestSeekMessageResponse_ErrorFormat tests the response format for failed seek +func TestSeekMessageResponse_ErrorFormat(t *testing.T) { + // Create error response + errorResponse := &mq_pb.SubscribeMessageResponse{ + Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ + Error: "Seek not implemented", + }, + }, + } + + ctrlMsg := errorResponse.GetCtrl() + if ctrlMsg == nil { + t.Fatal("Failed to get control message") + } + + // Non-empty error string indicates failure + if ctrlMsg.Error == "" { + t.Error("Expected non-empty error for failure") + } + + if ctrlMsg.Error != "Seek not implemented" { + t.Errorf("Expected specific error message, got: %s", ctrlMsg.Error) + } +} + +// TestSeekMessageHandling_BackwardSeek tests backward seeking scenarios +func TestSeekMessageHandling_BackwardSeek(t *testing.T) { + testCases := []struct { + name string + currentPos int64 + seekOffset int64 + expectedGap int64 + }{ + { + name: "small backward gap", + currentPos: 100, + seekOffset: 90, + expectedGap: 10, + }, + { + name: "medium backward gap", + currentPos: 1000, + seekOffset: 500, + expectedGap: 500, + }, + { + name: "large backward gap", + currentPos: 1000000, + seekOffset: 1, + expectedGap: 999999, + }, + { + name: "seek to zero", + currentPos: 100, + seekOffset: 0, + expectedGap: 100, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Verify gap calculation + gap := tc.currentPos - tc.seekOffset + if gap != tc.expectedGap { + t.Errorf("Expected gap %d, got %d", tc.expectedGap, gap) + } + + // Create seek message for backward seek + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tc.seekOffset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, + } + + seekReq := seekMsg.GetSeek() + if seekReq == nil { + t.Fatal("Failed to create seek message") + } + + if seekReq.Offset != tc.seekOffset { + t.Errorf("Expected offset %d, got %d", tc.seekOffset, seekReq.Offset) + } + }) + } +} + +// TestSeekMessageHandling_ForwardSeek tests forward seeking scenarios +func TestSeekMessageHandling_ForwardSeek(t *testing.T) { + testCases := []struct { + name string + currentPos int64 + seekOffset int64 + shouldSeek bool + }{ + { + name: "small forward gap", + currentPos: 100, + seekOffset: 110, + shouldSeek: false, // Forward seeks don't need special handling + }, + { + name: "same position", + currentPos: 100, + seekOffset: 100, + shouldSeek: false, + }, + { + name: "large forward gap", + currentPos: 100, + seekOffset: 10000, + shouldSeek: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // For forward seeks, gateway typically just continues reading + // No special seek message needed + isBackward := tc.seekOffset < tc.currentPos + + if isBackward && !tc.shouldSeek { + t.Error("Backward seek should require seek message") + } + }) + } +} + +// TestSeekIntegration_PositionConversion tests the complete flow from seek message to position +func TestSeekIntegration_PositionConversion(t *testing.T) { + broker := &MessageQueueBroker{} + + testCases := []struct { + name string + offset int64 + offsetType schema_pb.OffsetType + verifyFunc func(t *testing.T, pos log_buffer.MessagePosition) + }{ + { + name: "exact offset conversion", + offset: 42, + offsetType: schema_pb.OffsetType_EXACT_OFFSET, + verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) { + if !pos.IsOffsetBased { + t.Error("Expected offset-based position") + } + if pos.GetOffset() != 42 { + t.Errorf("Expected offset 42, got %d", pos.GetOffset()) + } + }, + }, + { + name: "earliest offset conversion", + offset: 0, + offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) { + if pos.Offset != -3 { + t.Errorf("Expected batch -3 for earliest, got %d", pos.Offset) + } + }, + }, + { + name: "latest offset conversion", + offset: 0, + offsetType: schema_pb.OffsetType_RESET_TO_LATEST, + verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) { + if pos.Offset != -4 { + t.Errorf("Expected batch -4 for latest, got %d", pos.Offset) + } + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create seek message + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tc.offset, + OffsetType: tc.offsetType, + } + + // Convert to position + position := broker.getRequestPositionFromSeek(seekMsg) + + // Verify result + tc.verifyFunc(t, position) + }) + } +} + +// TestSeekMessageHandling_ConcurrentSeeks tests handling multiple seek requests +func TestSeekMessageHandling_ConcurrentSeeks(t *testing.T) { + broker := &MessageQueueBroker{} + + // Simulate multiple concurrent seek requests + seekOffsets := []int64{10, 20, 30, 40, 50} + + var wg sync.WaitGroup + results := make([]log_buffer.MessagePosition, len(seekOffsets)) + + for i, offset := range seekOffsets { + wg.Add(1) + go func(idx int, off int64) { + defer wg.Done() + + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: off, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + + results[idx] = broker.getRequestPositionFromSeek(seekMsg) + }(i, offset) + } + + wg.Wait() + + // Verify all results are correct + for i, offset := range seekOffsets { + if results[i].GetOffset() != offset { + t.Errorf("Expected offset %d at index %d, got %d", offset, i, results[i].GetOffset()) + } + } +} + +// TestSeekMessageProtocol_WireFormat verifies the protobuf message structure +func TestSeekMessageProtocol_WireFormat(t *testing.T) { + // Test that SeekMessage is properly defined in the oneof + req := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 100, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, + } + + // Verify oneof is set correctly + switch msg := req.Message.(type) { + case *mq_pb.SubscribeMessageRequest_Seek: + if msg.Seek.Offset != 100 { + t.Errorf("Expected offset 100, got %d", msg.Seek.Offset) + } + default: + t.Errorf("Expected Seek message, got %T", msg) + } + + // Verify other message types are nil + if req.GetAck() != nil { + t.Error("Seek message should not have Ack") + } + if req.GetInit() != nil { + t.Error("Seek message should not have Init") + } +} + +// TestSeekByTimestamp tests timestamp-based seek operations +func TestSeekByTimestamp(t *testing.T) { + broker := &MessageQueueBroker{} + + testCases := []struct { + name string + timestampNs int64 + offsetType schema_pb.OffsetType + }{ + { + name: "seek to specific timestamp", + timestampNs: 1234567890123456789, + offsetType: schema_pb.OffsetType_EXACT_TS_NS, + }, + { + name: "seek to current timestamp", + timestampNs: time.Now().UnixNano(), + offsetType: schema_pb.OffsetType_EXACT_TS_NS, + }, + { + name: "seek to past timestamp", + timestampNs: time.Now().Add(-24 * time.Hour).UnixNano(), + offsetType: schema_pb.OffsetType_EXACT_TS_NS, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tc.timestampNs, + OffsetType: tc.offsetType, + } + + position := broker.getRequestPositionFromSeek(seekMsg) + + // For timestamp-based seeks, Time should be set to the timestamp + expectedTime := time.Unix(0, tc.timestampNs) + if !position.Time.Equal(expectedTime) { + t.Errorf("Expected time %v, got %v", expectedTime, position.Time) + } + + // Batch should be -2 for EXACT_TS_NS + if position.Offset != -2 { + t.Errorf("Expected batch -2 for timestamp seek, got %d", position.Offset) + } + }) + } +} + +// TestSeekByTimestamp_Ordering tests that timestamp seeks preserve ordering +func TestSeekByTimestamp_Ordering(t *testing.T) { + broker := &MessageQueueBroker{} + + // Create timestamps in chronological order + baseTime := time.Now().Add(-1 * time.Hour) + timestamps := []int64{ + baseTime.UnixNano(), + baseTime.Add(10 * time.Minute).UnixNano(), + baseTime.Add(20 * time.Minute).UnixNano(), + baseTime.Add(30 * time.Minute).UnixNano(), + } + + positions := make([]log_buffer.MessagePosition, len(timestamps)) + + for i, ts := range timestamps { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: ts, + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + } + positions[i] = broker.getRequestPositionFromSeek(seekMsg) + } + + // Verify positions are in chronological order + for i := 1; i < len(positions); i++ { + if !positions[i].Time.After(positions[i-1].Time) { + t.Errorf("Timestamp ordering violated: position[%d].Time (%v) should be after position[%d].Time (%v)", + i, positions[i].Time, i-1, positions[i-1].Time) + } + } +} + +// TestSeekByTimestamp_EdgeCases tests edge cases for timestamp seeks +func TestSeekByTimestamp_EdgeCases(t *testing.T) { + broker := &MessageQueueBroker{} + + testCases := []struct { + name string + timestampNs int64 + expectValid bool + }{ + { + name: "zero timestamp", + timestampNs: 0, + expectValid: true, // Valid - means Unix epoch + }, + { + name: "negative timestamp", + timestampNs: -1, + expectValid: true, // Valid in Go (before Unix epoch) + }, + { + name: "far future timestamp", + timestampNs: time.Now().Add(100 * 365 * 24 * time.Hour).UnixNano(), + expectValid: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: tc.timestampNs, + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + } + + position := broker.getRequestPositionFromSeek(seekMsg) + + if tc.expectValid { + expectedTime := time.Unix(0, tc.timestampNs) + if !position.Time.Equal(expectedTime) { + t.Errorf("Expected time %v, got %v", expectedTime, position.Time) + } + } + }) + } +} + +// TestSeekByTimestamp_VsOffset tests that timestamp and offset seeks are independent +func TestSeekByTimestamp_VsOffset(t *testing.T) { + broker := &MessageQueueBroker{} + + timestampSeek := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: time.Now().UnixNano(), + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + } + + offsetSeek := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 100, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + + timestampPos := broker.getRequestPositionFromSeek(timestampSeek) + offsetPos := broker.getRequestPositionFromSeek(offsetSeek) + + // Timestamp-based position should have batch -2 + if timestampPos.Offset != -2 { + t.Errorf("Timestamp seek should have batch -2, got %d", timestampPos.Offset) + } + + // Offset-based position should have the exact offset in Offset field + if offsetPos.GetOffset() != 100 { + t.Errorf("Offset seek should have offset 100, got %d", offsetPos.GetOffset()) + } + + // They should use different positioning mechanisms + if timestampPos.IsOffsetBased { + t.Error("Timestamp seek should not be offset-based") + } + + if !offsetPos.IsOffsetBased { + t.Error("Offset seek should be offset-based") + } +} + +// TestSeekOptimization_SkipRedundantSeek tests that seeking to the same offset is optimized +func TestSeekOptimization_SkipRedundantSeek(t *testing.T) { + broker := &MessageQueueBroker{} + + // Test that seeking to the same offset multiple times produces the same result + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 100, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + + // First seek + pos1 := broker.getRequestPositionFromSeek(seekMsg) + + // Second seek to same offset + pos2 := broker.getRequestPositionFromSeek(seekMsg) + + // Third seek to same offset + pos3 := broker.getRequestPositionFromSeek(seekMsg) + + // All positions should be identical + if pos1.GetOffset() != pos2.GetOffset() || pos2.GetOffset() != pos3.GetOffset() { + t.Errorf("Multiple seeks to same offset should produce identical results: %d, %d, %d", + pos1.GetOffset(), pos2.GetOffset(), pos3.GetOffset()) + } + + // Verify the offset is correct + if pos1.GetOffset() != 100 { + t.Errorf("Expected offset 100, got %d", pos1.GetOffset()) + } +} + +// TestSeekOptimization_DifferentOffsets tests that different offsets produce different positions +func TestSeekOptimization_DifferentOffsets(t *testing.T) { + broker := &MessageQueueBroker{} + + offsets := []int64{0, 50, 100, 150, 200} + positions := make([]log_buffer.MessagePosition, len(offsets)) + + for i, offset := range offsets { + seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: offset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + } + positions[i] = broker.getRequestPositionFromSeek(seekMsg) + } + + // Verify each position has the correct offset + for i, offset := range offsets { + if positions[i].GetOffset() != offset { + t.Errorf("Position %d: expected offset %d, got %d", i, offset, positions[i].GetOffset()) + } + } + + // Verify all positions are different + for i := 1; i < len(positions); i++ { + if positions[i].GetOffset() == positions[i-1].GetOffset() { + t.Errorf("Positions %d and %d should be different", i-1, i) + } + } +} diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 432cdfe04..e9adcd234 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -113,25 +113,25 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta } session.mu.Unlock() - // Decision logic: - // 1. Forward read (startOffset >= currentOffset): Always reuse - ReadRecordsFromOffset will handle it - // 2. Backward read with cache hit: Reuse - ReadRecordsFromOffset will serve from cache - // 3. Backward read without cache: Reuse if gap is small, let ReadRecordsFromOffset handle recreation - // This prevents GetOrCreateSubscriber from constantly recreating sessions for small offsets - - if startOffset >= currentOffset || canUseCache { - // Can read forward OR offset is in cache - reuse session - bc.subscribersLock.RUnlock() - glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)", - key, currentOffset, startOffset) - return session, nil - } + // With seekable broker: Always reuse existing session + // Any offset mismatch will be handled by FetchRecords via SeekMessage + // This includes: + // 1. Forward read: Natural continuation + // 2. Backward read with cache hit: Serve from cache + // 3. Backward read without cache: Send seek message to broker + // No need for stream recreation - broker repositions internally - // Backward seek, not in cache - // Let ReadRecordsFromOffset handle the recreation decision based on the actual read context bc.subscribersLock.RUnlock() - glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)", - key, currentOffset, startOffset) + if canUseCache { + glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (cached)", + key, currentOffset, startOffset) + } else if startOffset >= currentOffset { + glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (forward read)", + key, currentOffset, startOffset) + } else { + glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will seek backward)", + key, currentOffset, startOffset) + } return session, nil } @@ -143,32 +143,16 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta bc.subscribersLock.Lock() defer bc.subscribersLock.Unlock() - // CRITICAL FIX: Double-check if session exists AND verify it's at the right offset - // This can happen if another thread created a session while we were acquiring the lock - // (only possible in the non-recreation path where we released the read lock) + // Double-check if session was created by another thread while we were acquiring the lock if session, exists := bc.subscribers[key]; exists { + // With seekable broker, always reuse existing session + // FetchRecords will handle any offset mismatch via seek session.mu.Lock() existingOffset := session.StartOffset session.mu.Unlock() - // Only reuse if the session is at or before the requested offset - if existingOffset <= startOffset { - glog.V(1).Infof("[FETCH] Session already exists at offset %d (requested %d), reusing", existingOffset, startOffset) - return session, nil - } - - // Session is at wrong offset - must recreate - glog.V(2).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset) - // CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls - session.mu.Lock() - if session.Stream != nil { - _ = session.Stream.CloseSend() - } - if session.Cancel != nil { - session.Cancel() - } - session.mu.Unlock() - delete(bc.subscribers, key) + glog.V(1).Infof("[FETCH] Session created concurrently at offset %d (requested %d), reusing", existingOffset, startOffset) + return session, nil } // Use BrokerClient's context so subscribers are automatically cancelled when connection closes @@ -273,148 +257,55 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok } } - // CRITICAL: Get the current offset atomically before making recreation decision - // We need to unlock first (lock acquired at line 257) then re-acquire for atomic read + // Get the current offset atomically for comparison currentStartOffset := session.StartOffset session.mu.Unlock() - // CRITICAL FIX for Schema Registry: Keep subscriber alive across multiple fetch requests - // Schema Registry expects to make multiple poll() calls on the same consumer connection + // With seekable broker: Keep subscriber alive across all requests + // Schema Registry and other clients expect persistent consumer connections // - // Three scenarios: - // 1. requestedOffset < session.StartOffset: Need to seek backward (recreate) - // 2. requestedOffset == session.StartOffset: Continue reading (use existing) - // 3. requestedOffset > session.StartOffset: Continue reading forward (use existing) + // Three scenarios, all handled via seek: + // 1. requestedOffset < session.StartOffset: Send seek message (backward) + // 2. requestedOffset == session.StartOffset: Continue reading (no seek needed) + // 3. requestedOffset > session.StartOffset: Send seek message (forward) // - // The session will naturally advance as records are consumed, so we should NOT - // recreate it just because requestedOffset != session.StartOffset - - // OPTIMIZATION: Only recreate for EXTREMELY LARGE backward seeks (>1000000 offsets back) - // Most backward seeks should be served from cache or tolerated as forward reads - // This prevents creating zombie streams that never get cleaned up on the broker - // gRPC's stream.Recv() NEVER unblocks when streams are cancelled, leaving goroutines - // orphaned forever. Each recreation leaves 2 goroutines (first record + loop) blocked. - // With 14K recreations, that's 28K leaked goroutines. Solution: almost never recreate. - const maxBackwardGap = 1000000 - offsetGap := currentStartOffset - requestedOffset - - if requestedOffset < currentStartOffset && offsetGap > maxBackwardGap { - // Need to seek backward significantly - close old session and create a fresh subscriber - // Restarting an existing stream doesn't work reliably because the broker may still - // have old data buffered in the stream pipeline - glog.V(2).Infof("[FETCH] Seeking backward significantly: requested=%d < session=%d (gap=%d), creating fresh subscriber", - requestedOffset, currentStartOffset, offsetGap) - - // Extract session details (note: session.mu was already unlocked at line 294) - topic := session.Topic - partition := session.Partition - consumerGroup := session.ConsumerGroup - consumerID := session.ConsumerID - key := session.Key() - - // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset - // This prevents multiple threads from all deciding to recreate based on stale data - glog.V(2).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) - bc.subscribersLock.Lock() - glog.V(2).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset) - - // Double-check if another thread already recreated the session at the desired offset - // This prevents multiple concurrent threads from all trying to recreate the same session - if existingSession, exists := bc.subscribers[key]; exists { - existingSession.mu.Lock() - existingOffset := existingSession.StartOffset - existingSession.mu.Unlock() - - // Check if the session was already recreated at (or before) the requested offset - if existingOffset <= requestedOffset { - bc.subscribersLock.Unlock() - glog.V(2).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) - // Re-acquire the existing session and continue - return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) - } - - glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) - - // Session still needs recreation - close it - // CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls - existingSession.mu.Lock() - if existingSession.Stream != nil { - _ = existingSession.Stream.CloseSend() - } - if existingSession.Cancel != nil { - existingSession.Cancel() - } - existingSession.mu.Unlock() - delete(bc.subscribers, key) - glog.V(2).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) - } - // CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session - // to prevent other threads from interfering. We'll create the session inline. - // bc.subscribersLock.Unlock() - REMOVED to fix race condition - - // Create a completely fresh subscriber at the requested offset - // INLINE SESSION CREATION to hold the lock continuously - glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset) - subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx) - - stream, err := bc.client.SubscribeMessage(subscriberCtx) - if err != nil { - bc.subscribersLock.Unlock() - return nil, fmt.Errorf("failed to create subscribe stream: %v", err) - } - - // Get the actual partition assignment from the broker - actualPartition, err := bc.getActualPartitionAssignment(topic, partition) - if err != nil { - bc.subscribersLock.Unlock() - _ = stream.CloseSend() - return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) + // The stream persists for the entire consumer session - no recreation needed + if requestedOffset != currentStartOffset { + offsetDiff := requestedOffset - currentStartOffset + seekDirection := "forward" + if offsetDiff < 0 { + seekDirection = "backward" } - // Use EXACT_OFFSET to position subscriber at the exact Kafka offset - offsetType := schema_pb.OffsetType_EXACT_OFFSET + glog.V(2).Infof("[FETCH] Offset mismatch: %s seek from %d to %d (diff=%d)", + seekDirection, currentStartOffset, requestedOffset, offsetDiff) - glog.V(2).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d", - topic, partition, requestedOffset) - - glog.V(2).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s", - topic, partition, requestedOffset, offsetType, consumerGroup, consumerID) - - // Send init message using the actual partition structure - initReq := createSubscribeInitMessage(topic, actualPartition, requestedOffset, offsetType, consumerGroup, consumerID) - if err := stream.Send(initReq); err != nil { - bc.subscribersLock.Unlock() - _ = stream.CloseSend() - return nil, fmt.Errorf("failed to send subscribe init: %v", err) + // Send seek message to reposition stream + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: requestedOffset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, } - newSession := &BrokerSubscriberSession{ - Topic: topic, - Partition: partition, - Stream: stream, - StartOffset: requestedOffset, - ConsumerGroup: consumerGroup, - ConsumerID: consumerID, - Ctx: subscriberCtx, - Cancel: subscriberCancel, + if err := session.Stream.Send(seekMsg); err != nil { + return nil, fmt.Errorf("seek to offset %d failed: %v", requestedOffset, err) } - bc.subscribers[key] = newSession - bc.subscribersLock.Unlock() - glog.V(2).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) + // Update session state after successful seek + session.mu.Lock() + session.StartOffset = requestedOffset + session.consumedRecords = nil // Clear cache after seek + session.mu.Unlock() - // Read from fresh subscriber - glog.V(2).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) - return bc.ReadRecords(ctx, newSession, maxRecords) + glog.V(2).Infof("[FETCH] Seek to offset %d successful", requestedOffset) + } else { + glog.V(2).Infof("[FETCH] Offset match: continuing from offset %d", requestedOffset) } - // requestedOffset >= session.StartOffset: Keep reading forward from existing session - // This handles: - // - Exact match (requestedOffset == session.StartOffset) - // - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache) - glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", - requestedOffset, currentStartOffset) - // Note: session.mu was already unlocked at line 294 after reading currentStartOffset + // Read records from current position return bc.ReadRecords(ctx, session, maxRecords) } @@ -782,3 +673,119 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO return nil } + +// Seek helper methods for BrokerSubscriberSession + +// SeekToOffset repositions the stream to read from a specific offset +func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error { + // Skip seek if already at the requested offset + session.mu.Lock() + currentOffset := session.StartOffset + session.mu.Unlock() + + if currentOffset == offset { + glog.V(2).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition) + return nil + } + + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: offset, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + }, + }, + } + + if err := session.Stream.Send(seekMsg); err != nil { + return fmt.Errorf("seek to offset %d failed: %v", offset, err) + } + + session.mu.Lock() + session.StartOffset = offset + session.consumedRecords = nil + session.mu.Unlock() + + glog.V(2).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition) + return nil +} + +// SeekToTimestamp repositions the stream to read from messages at or after a specific timestamp +// timestamp is in nanoseconds since Unix epoch +// Note: We don't skip this operation even if we think we're at the right position because +// we can't easily determine the offset corresponding to a timestamp without querying the broker +func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error { + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: timestampNs, + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + }, + }, + } + + if err := session.Stream.Send(seekMsg); err != nil { + return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err) + } + + session.mu.Lock() + // Note: We don't know the exact offset at this timestamp yet + // It will be updated when we read the first message + session.consumedRecords = nil + session.mu.Unlock() + + glog.V(2).Infof("[SEEK] Seeked to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition) + return nil +} + +// SeekToEarliest repositions the stream to the beginning of the partition +// Note: We don't skip this operation even if StartOffset == 0 because the broker +// may have a different notion of "earliest" (e.g., after compaction or retention) +func (session *BrokerSubscriberSession) SeekToEarliest() error { + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 0, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + }, + }, + } + + if err := session.Stream.Send(seekMsg); err != nil { + return fmt.Errorf("seek to earliest failed: %v", err) + } + + session.mu.Lock() + session.StartOffset = 0 + session.consumedRecords = nil + session.mu.Unlock() + + glog.V(2).Infof("[SEEK] Seeked to earliest for %s[%d]", session.Topic, session.Partition) + return nil +} + +// SeekToLatest repositions the stream to the end of the partition (next new message) +// Note: We don't skip this operation because "latest" is a moving target and we can't +// reliably determine if we're already at the latest position without querying the broker +func (session *BrokerSubscriberSession) SeekToLatest() error { + seekMsg := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Seek{ + Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{ + Offset: 0, + OffsetType: schema_pb.OffsetType_RESET_TO_LATEST, + }, + }, + } + + if err := session.Stream.Send(seekMsg); err != nil { + return fmt.Errorf("seek to latest failed: %v", err) + } + + session.mu.Lock() + // Offset will be set when we read the first new message + session.consumedRecords = nil + session.mu.Unlock() + + glog.V(2).Infof("[SEEK] Seeked to latest for %s[%d]", session.Topic, session.Partition) + return nil +} diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index ff6f95de8..205e17aba 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -329,9 +329,14 @@ message SubscribeMessageRequest { int64 ts_ns = 1; // Timestamp in nanoseconds for acknowledgment tracking bytes key = 2; } + message SeekMessage { + int64 offset = 1; // New offset to seek to + schema_pb.OffsetType offset_type = 2; // EXACT_OFFSET, RESET_TO_LATEST, etc. + } oneof message { InitMessage init = 1; AckMessage ack = 2; + SeekMessage seek = 3; } } message SubscribeMessageResponse { diff --git a/weed/pb/mq_pb/mq_broker.pb.go b/weed/pb/mq_pb/mq_broker.pb.go index ae174f224..49be07f50 100644 --- a/weed/pb/mq_pb/mq_broker.pb.go +++ b/weed/pb/mq_pb/mq_broker.pb.go @@ -2250,6 +2250,7 @@ type SubscribeMessageRequest struct { // // *SubscribeMessageRequest_Init // *SubscribeMessageRequest_Ack + // *SubscribeMessageRequest_Seek Message isSubscribeMessageRequest_Message `protobuf_oneof:"message"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -2310,6 +2311,15 @@ func (x *SubscribeMessageRequest) GetAck() *SubscribeMessageRequest_AckMessage { return nil } +func (x *SubscribeMessageRequest) GetSeek() *SubscribeMessageRequest_SeekMessage { + if x != nil { + if x, ok := x.Message.(*SubscribeMessageRequest_Seek); ok { + return x.Seek + } + } + return nil +} + type isSubscribeMessageRequest_Message interface { isSubscribeMessageRequest_Message() } @@ -2322,10 +2332,16 @@ type SubscribeMessageRequest_Ack struct { Ack *SubscribeMessageRequest_AckMessage `protobuf:"bytes,2,opt,name=ack,proto3,oneof"` } +type SubscribeMessageRequest_Seek struct { + Seek *SubscribeMessageRequest_SeekMessage `protobuf:"bytes,3,opt,name=seek,proto3,oneof"` +} + func (*SubscribeMessageRequest_Init) isSubscribeMessageRequest_Message() {} func (*SubscribeMessageRequest_Ack) isSubscribeMessageRequest_Message() {} +func (*SubscribeMessageRequest_Seek) isSubscribeMessageRequest_Message() {} + type SubscribeMessageResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Types that are valid to be assigned to Message: @@ -3761,6 +3777,58 @@ func (x *SubscribeMessageRequest_AckMessage) GetKey() []byte { return nil } +type SubscribeMessageRequest_SeekMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` // New offset to seek to + OffsetType schema_pb.OffsetType `protobuf:"varint,2,opt,name=offset_type,json=offsetType,proto3,enum=schema_pb.OffsetType" json:"offset_type,omitempty"` // EXACT_OFFSET, RESET_TO_LATEST, etc. + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SubscribeMessageRequest_SeekMessage) Reset() { + *x = SubscribeMessageRequest_SeekMessage{} + mi := &file_mq_broker_proto_msgTypes[63] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SubscribeMessageRequest_SeekMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeMessageRequest_SeekMessage) ProtoMessage() {} + +func (x *SubscribeMessageRequest_SeekMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[63] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeMessageRequest_SeekMessage.ProtoReflect.Descriptor instead. +func (*SubscribeMessageRequest_SeekMessage) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{36, 2} +} + +func (x *SubscribeMessageRequest_SeekMessage) GetOffset() int64 { + if x != nil { + return x.Offset + } + return 0 +} + +func (x *SubscribeMessageRequest_SeekMessage) GetOffsetType() schema_pb.OffsetType { + if x != nil { + return x.OffsetType + } + return schema_pb.OffsetType(0) +} + type SubscribeMessageResponse_SubscribeCtrlMessage struct { state protoimpl.MessageState `protogen:"open.v1"` Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` @@ -3772,7 +3840,7 @@ type SubscribeMessageResponse_SubscribeCtrlMessage struct { func (x *SubscribeMessageResponse_SubscribeCtrlMessage) Reset() { *x = SubscribeMessageResponse_SubscribeCtrlMessage{} - mi := &file_mq_broker_proto_msgTypes[63] + mi := &file_mq_broker_proto_msgTypes[64] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3784,7 +3852,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) String() string { func (*SubscribeMessageResponse_SubscribeCtrlMessage) ProtoMessage() {} func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[63] + mi := &file_mq_broker_proto_msgTypes[64] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3832,7 +3900,7 @@ type SubscribeFollowMeRequest_InitMessage struct { func (x *SubscribeFollowMeRequest_InitMessage) Reset() { *x = SubscribeFollowMeRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[64] + mi := &file_mq_broker_proto_msgTypes[65] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3844,7 +3912,7 @@ func (x *SubscribeFollowMeRequest_InitMessage) String() string { func (*SubscribeFollowMeRequest_InitMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[64] + mi := &file_mq_broker_proto_msgTypes[65] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3890,7 +3958,7 @@ type SubscribeFollowMeRequest_AckMessage struct { func (x *SubscribeFollowMeRequest_AckMessage) Reset() { *x = SubscribeFollowMeRequest_AckMessage{} - mi := &file_mq_broker_proto_msgTypes[65] + mi := &file_mq_broker_proto_msgTypes[66] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3902,7 +3970,7 @@ func (x *SubscribeFollowMeRequest_AckMessage) String() string { func (*SubscribeFollowMeRequest_AckMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[65] + mi := &file_mq_broker_proto_msgTypes[66] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3933,7 +4001,7 @@ type SubscribeFollowMeRequest_CloseMessage struct { func (x *SubscribeFollowMeRequest_CloseMessage) Reset() { *x = SubscribeFollowMeRequest_CloseMessage{} - mi := &file_mq_broker_proto_msgTypes[66] + mi := &file_mq_broker_proto_msgTypes[67] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3945,7 +4013,7 @@ func (x *SubscribeFollowMeRequest_CloseMessage) String() string { func (*SubscribeFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[66] + mi := &file_mq_broker_proto_msgTypes[67] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4144,10 +4212,11 @@ const file_mq_broker_proto_rawDesc = "" + "\fCloseMessageB\t\n" + "\amessage\"5\n" + "\x17PublishFollowMeResponse\x12\x1a\n" + - "\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\xf5\x04\n" + + "\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\x9d\x06\n" + "\x17SubscribeMessageRequest\x12G\n" + "\x04init\x18\x01 \x01(\v21.messaging_pb.SubscribeMessageRequest.InitMessageH\x00R\x04init\x12D\n" + - "\x03ack\x18\x02 \x01(\v20.messaging_pb.SubscribeMessageRequest.AckMessageH\x00R\x03ack\x1a\x8a\x03\n" + + "\x03ack\x18\x02 \x01(\v20.messaging_pb.SubscribeMessageRequest.AckMessageH\x00R\x03ack\x12G\n" + + "\x04seek\x18\x03 \x01(\v21.messaging_pb.SubscribeMessageRequest.SeekMessageH\x00R\x04seek\x1a\x8a\x03\n" + "\vInitMessage\x12%\n" + "\x0econsumer_group\x18\x01 \x01(\tR\rconsumerGroup\x12\x1f\n" + "\vconsumer_id\x18\x02 \x01(\tR\n" + @@ -4164,7 +4233,11 @@ const file_mq_broker_proto_rawDesc = "" + "\n" + "AckMessage\x12\x13\n" + "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x10\n" + - "\x03key\x18\x02 \x01(\fR\x03keyB\t\n" + + "\x03key\x18\x02 \x01(\fR\x03key\x1a]\n" + + "\vSeekMessage\x12\x16\n" + + "\x06offset\x18\x01 \x01(\x03R\x06offset\x126\n" + + "\voffset_type\x18\x02 \x01(\x0e2\x15.schema_pb.OffsetTypeR\n" + + "offsetTypeB\t\n" + "\amessage\"\xa7\x02\n" + "\x18SubscribeMessageResponse\x12Q\n" + "\x04ctrl\x18\x01 \x01(\v2;.messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessageH\x00R\x04ctrl\x12/\n" + @@ -4260,7 +4333,7 @@ func file_mq_broker_proto_rawDescGZIP() []byte { return file_mq_broker_proto_rawDescData } -var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 67) +var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 68) var file_mq_broker_proto_goTypes = []any{ (*FindBrokerLeaderRequest)(nil), // 0: messaging_pb.FindBrokerLeaderRequest (*FindBrokerLeaderResponse)(nil), // 1: messaging_pb.FindBrokerLeaderResponse @@ -4325,47 +4398,48 @@ var file_mq_broker_proto_goTypes = []any{ (*PublishFollowMeRequest_CloseMessage)(nil), // 60: messaging_pb.PublishFollowMeRequest.CloseMessage (*SubscribeMessageRequest_InitMessage)(nil), // 61: messaging_pb.SubscribeMessageRequest.InitMessage (*SubscribeMessageRequest_AckMessage)(nil), // 62: messaging_pb.SubscribeMessageRequest.AckMessage - (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 63: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - (*SubscribeFollowMeRequest_InitMessage)(nil), // 64: messaging_pb.SubscribeFollowMeRequest.InitMessage - (*SubscribeFollowMeRequest_AckMessage)(nil), // 65: messaging_pb.SubscribeFollowMeRequest.AckMessage - (*SubscribeFollowMeRequest_CloseMessage)(nil), // 66: messaging_pb.SubscribeFollowMeRequest.CloseMessage - (*schema_pb.Topic)(nil), // 67: schema_pb.Topic - (*schema_pb.Partition)(nil), // 68: schema_pb.Partition - (*schema_pb.RecordType)(nil), // 69: schema_pb.RecordType - (*filer_pb.LogEntry)(nil), // 70: filer_pb.LogEntry - (*schema_pb.PartitionOffset)(nil), // 71: schema_pb.PartitionOffset - (schema_pb.OffsetType)(0), // 72: schema_pb.OffsetType + (*SubscribeMessageRequest_SeekMessage)(nil), // 63: messaging_pb.SubscribeMessageRequest.SeekMessage + (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 64: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + (*SubscribeFollowMeRequest_InitMessage)(nil), // 65: messaging_pb.SubscribeFollowMeRequest.InitMessage + (*SubscribeFollowMeRequest_AckMessage)(nil), // 66: messaging_pb.SubscribeFollowMeRequest.AckMessage + (*SubscribeFollowMeRequest_CloseMessage)(nil), // 67: messaging_pb.SubscribeFollowMeRequest.CloseMessage + (*schema_pb.Topic)(nil), // 68: schema_pb.Topic + (*schema_pb.Partition)(nil), // 69: schema_pb.Partition + (*schema_pb.RecordType)(nil), // 70: schema_pb.RecordType + (*filer_pb.LogEntry)(nil), // 71: filer_pb.LogEntry + (*schema_pb.PartitionOffset)(nil), // 72: schema_pb.PartitionOffset + (schema_pb.OffsetType)(0), // 73: schema_pb.OffsetType } var file_mq_broker_proto_depIdxs = []int32{ 50, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry - 67, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic - 68, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition + 68, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic + 69, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition 51, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage 2, // 4: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats - 67, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic + 68, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic 8, // 6: messaging_pb.ConfigureTopicRequest.retention:type_name -> messaging_pb.TopicRetention - 69, // 7: messaging_pb.ConfigureTopicRequest.message_record_type:type_name -> schema_pb.RecordType + 70, // 7: messaging_pb.ConfigureTopicRequest.message_record_type:type_name -> schema_pb.RecordType 17, // 8: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 8, // 9: messaging_pb.ConfigureTopicResponse.retention:type_name -> messaging_pb.TopicRetention - 69, // 10: messaging_pb.ConfigureTopicResponse.message_record_type:type_name -> schema_pb.RecordType - 67, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic - 67, // 12: messaging_pb.TopicExistsRequest.topic:type_name -> schema_pb.Topic - 67, // 13: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic - 67, // 14: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic + 70, // 10: messaging_pb.ConfigureTopicResponse.message_record_type:type_name -> schema_pb.RecordType + 68, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic + 68, // 12: messaging_pb.TopicExistsRequest.topic:type_name -> schema_pb.Topic + 68, // 13: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic + 68, // 14: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic 17, // 15: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 68, // 16: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition - 67, // 17: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic - 67, // 18: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic + 69, // 16: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition + 68, // 17: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic + 68, // 18: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic 17, // 19: messaging_pb.GetTopicConfigurationResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 8, // 20: messaging_pb.GetTopicConfigurationResponse.retention:type_name -> messaging_pb.TopicRetention - 69, // 21: messaging_pb.GetTopicConfigurationResponse.message_record_type:type_name -> schema_pb.RecordType - 67, // 22: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic + 70, // 21: messaging_pb.GetTopicConfigurationResponse.message_record_type:type_name -> schema_pb.RecordType + 68, // 22: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic 24, // 23: messaging_pb.GetTopicPublishersResponse.publishers:type_name -> messaging_pb.TopicPublisher - 67, // 24: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic + 68, // 24: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic 25, // 25: messaging_pb.GetTopicSubscribersResponse.subscribers:type_name -> messaging_pb.TopicSubscriber - 68, // 26: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition - 68, // 27: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition - 67, // 28: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic + 69, // 26: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition + 69, // 27: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition + 68, // 28: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic 17, // 29: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 52, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage 54, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage @@ -4381,80 +4455,82 @@ var file_mq_broker_proto_depIdxs = []int32{ 60, // 41: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage 61, // 42: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage 62, // 43: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage - 63, // 44: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - 31, // 45: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage - 64, // 46: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage - 65, // 47: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage - 66, // 48: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage - 67, // 49: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic - 67, // 50: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic - 67, // 51: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic - 68, // 52: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition - 70, // 53: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> filer_pb.LogEntry - 67, // 54: messaging_pb.GetPartitionRangeInfoRequest.topic:type_name -> schema_pb.Topic - 68, // 55: messaging_pb.GetPartitionRangeInfoRequest.partition:type_name -> schema_pb.Partition - 48, // 56: messaging_pb.GetPartitionRangeInfoResponse.offset_range:type_name -> messaging_pb.OffsetRangeInfo - 49, // 57: messaging_pb.GetPartitionRangeInfoResponse.timestamp_range:type_name -> messaging_pb.TimestampRangeInfo - 3, // 58: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats - 67, // 59: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic - 68, // 60: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition - 68, // 61: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition - 17, // 62: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment - 68, // 63: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition - 67, // 64: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 68, // 65: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition - 67, // 66: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 68, // 67: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 67, // 68: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 71, // 69: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset - 72, // 70: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType - 67, // 71: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 68, // 72: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 0, // 73: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 4, // 74: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest - 6, // 75: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest - 11, // 76: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest - 13, // 77: messaging_pb.SeaweedMessaging.TopicExists:input_type -> messaging_pb.TopicExistsRequest - 9, // 78: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 15, // 79: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 18, // 80: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest - 20, // 81: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest - 22, // 82: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest - 26, // 83: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 40, // 84: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest - 42, // 85: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest - 28, // 86: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest - 32, // 87: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest - 36, // 88: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest - 34, // 89: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest - 38, // 90: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest - 44, // 91: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest - 46, // 92: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:input_type -> messaging_pb.GetPartitionRangeInfoRequest - 1, // 93: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 5, // 94: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse - 7, // 95: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse - 12, // 96: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse - 14, // 97: messaging_pb.SeaweedMessaging.TopicExists:output_type -> messaging_pb.TopicExistsResponse - 10, // 98: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 16, // 99: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 19, // 100: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse - 21, // 101: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse - 23, // 102: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse - 27, // 103: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 41, // 104: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse - 43, // 105: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse - 29, // 106: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse - 33, // 107: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse - 37, // 108: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse - 35, // 109: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse - 39, // 110: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse - 45, // 111: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse - 47, // 112: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:output_type -> messaging_pb.GetPartitionRangeInfoResponse - 93, // [93:113] is the sub-list for method output_type - 73, // [73:93] is the sub-list for method input_type - 73, // [73:73] is the sub-list for extension type_name - 73, // [73:73] is the sub-list for extension extendee - 0, // [0:73] is the sub-list for field type_name + 63, // 44: messaging_pb.SubscribeMessageRequest.seek:type_name -> messaging_pb.SubscribeMessageRequest.SeekMessage + 64, // 45: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + 31, // 46: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage + 65, // 47: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage + 66, // 48: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage + 67, // 49: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage + 68, // 50: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic + 68, // 51: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic + 68, // 52: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic + 69, // 53: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition + 71, // 54: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> filer_pb.LogEntry + 68, // 55: messaging_pb.GetPartitionRangeInfoRequest.topic:type_name -> schema_pb.Topic + 69, // 56: messaging_pb.GetPartitionRangeInfoRequest.partition:type_name -> schema_pb.Partition + 48, // 57: messaging_pb.GetPartitionRangeInfoResponse.offset_range:type_name -> messaging_pb.OffsetRangeInfo + 49, // 58: messaging_pb.GetPartitionRangeInfoResponse.timestamp_range:type_name -> messaging_pb.TimestampRangeInfo + 3, // 59: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats + 68, // 60: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic + 69, // 61: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition + 69, // 62: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition + 17, // 63: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment + 69, // 64: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition + 68, // 65: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 69, // 66: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition + 68, // 67: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 69, // 68: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 68, // 69: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 72, // 70: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset + 73, // 71: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType + 73, // 72: messaging_pb.SubscribeMessageRequest.SeekMessage.offset_type:type_name -> schema_pb.OffsetType + 68, // 73: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 69, // 74: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 0, // 75: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 4, // 76: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest + 6, // 77: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest + 11, // 78: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest + 13, // 79: messaging_pb.SeaweedMessaging.TopicExists:input_type -> messaging_pb.TopicExistsRequest + 9, // 80: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest + 15, // 81: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 18, // 82: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest + 20, // 83: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest + 22, // 84: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest + 26, // 85: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 40, // 86: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest + 42, // 87: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest + 28, // 88: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest + 32, // 89: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest + 36, // 90: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest + 34, // 91: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest + 38, // 92: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest + 44, // 93: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest + 46, // 94: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:input_type -> messaging_pb.GetPartitionRangeInfoRequest + 1, // 95: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 5, // 96: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse + 7, // 97: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse + 12, // 98: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse + 14, // 99: messaging_pb.SeaweedMessaging.TopicExists:output_type -> messaging_pb.TopicExistsResponse + 10, // 100: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse + 16, // 101: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 19, // 102: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse + 21, // 103: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse + 23, // 104: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse + 27, // 105: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 41, // 106: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse + 43, // 107: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse + 29, // 108: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse + 33, // 109: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse + 37, // 110: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse + 35, // 111: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse + 39, // 112: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse + 45, // 113: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse + 47, // 114: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:output_type -> messaging_pb.GetPartitionRangeInfoResponse + 95, // [95:115] is the sub-list for method output_type + 75, // [75:95] is the sub-list for method input_type + 75, // [75:75] is the sub-list for extension type_name + 75, // [75:75] is the sub-list for extension extendee + 0, // [0:75] is the sub-list for field type_name } func init() { file_mq_broker_proto_init() } @@ -4488,6 +4564,7 @@ func file_mq_broker_proto_init() { file_mq_broker_proto_msgTypes[36].OneofWrappers = []any{ (*SubscribeMessageRequest_Init)(nil), (*SubscribeMessageRequest_Ack)(nil), + (*SubscribeMessageRequest_Seek)(nil), } file_mq_broker_proto_msgTypes[37].OneofWrappers = []any{ (*SubscribeMessageResponse_Ctrl)(nil), @@ -4504,7 +4581,7 @@ func file_mq_broker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_broker_proto_rawDesc), len(file_mq_broker_proto_rawDesc)), NumEnums: 0, - NumMessages: 67, + NumMessages: 68, NumExtensions: 0, NumServices: 1, },