Browse Source

seekable subscribe messages

pull/7329/head
chrislu 7 days ago
parent
commit
5222ddaf2f
  1. 148
      weed/mq/broker/broker_grpc_sub.go
  2. 872
      weed/mq/broker/broker_grpc_sub_seek_test.go
  3. 337
      weed/mq/kafka/integration/broker_client_subscribe.go
  4. 5
      weed/pb/mq_broker.proto
  5. 307
      weed/pb/mq_pb/mq_broker.pb.go

148
weed/mq/broker/broker_grpc_sub.go

@ -107,6 +107,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("follower %s connected", follower)
}
// Channel to handle seek requests - signals Subscribe loop to restart from new offset
seekChan := make(chan *mq_pb.SubscribeMessageRequest_SeekMessage, 1)
go func() {
defer cancel() // CRITICAL: Cancel context when Recv goroutine exits (client disconnect)
@ -128,6 +131,27 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
break
}
// Handle seek messages
if seekMsg := ack.GetSeek(); seekMsg != nil {
glog.V(0).Infof("Subscriber %s received seek request to offset %d (type %v)",
clientName, seekMsg.Offset, seekMsg.OffsetType)
// Send seek request to Subscribe loop
select {
case seekChan <- seekMsg:
glog.V(0).Infof("Subscriber %s seek request queued", clientName)
default:
glog.V(0).Infof("Subscriber %s seek request dropped (already pending)", clientName)
// Send error response if seek is already in progress
stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
Error: "Seek already in progress",
},
}})
}
continue
}
if ack.GetAck().Key == nil {
// skip ack for control messages
continue
@ -182,20 +206,30 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
localTopicPartition.ListenersLock.Unlock()
}()
err = localTopicPartition.Subscribe(clientName, startPosition, func() bool {
// Check cancellation before waiting
if ctx.Err() != nil || !isConnected {
return false
}
// Subscribe loop - can be restarted when seek is requested
currentPosition := startPosition
subscribeLoop:
for {
// Context for this iteration of Subscribe (can be cancelled by seek)
subscribeCtx, subscribeCancel := context.WithCancel(ctx)
// Start Subscribe in a goroutine so we can interrupt it with seek
subscribeDone := make(chan error, 1)
go func() {
subscribeErr := localTopicPartition.Subscribe(clientName, currentPosition, func() bool {
// Check cancellation before waiting
if subscribeCtx.Err() != nil || !isConnected {
return false
}
// Wait for new data using condition variable (blocking, not polling)
localTopicPartition.ListenersLock.Lock()
localTopicPartition.ListenersCond.Wait()
localTopicPartition.ListenersLock.Unlock()
// Wait for new data using condition variable (blocking, not polling)
localTopicPartition.ListenersLock.Lock()
localTopicPartition.ListenersCond.Wait()
localTopicPartition.ListenersLock.Unlock()
// After waking up, check if we should stop
return ctx.Err() == nil && isConnected
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
// After waking up, check if we should stop
return subscribeCtx.Err() == nil && isConnected
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
// Wait for the message to be acknowledged with a timeout to prevent infinite loops
const maxWaitTime = 30 * time.Second
const checkInterval = 137 * time.Millisecond
@ -215,10 +249,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():
err := ctx.Err()
case <-subscribeCtx.Done():
err := subscribeCtx.Err()
if err == context.Canceled {
// Client disconnected
// Subscribe cancelled (seek or disconnect)
return false, nil
}
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
@ -250,7 +284,46 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
counter++
return false, nil
})
})
subscribeDone <- subscribeErr
}()
// Wait for either Subscribe to complete or a seek request
select {
case err = <-subscribeDone:
subscribeCancel()
if err != nil || ctx.Err() != nil {
// Subscribe finished with error or main context cancelled - exit loop
break subscribeLoop
}
// Subscribe completed normally (shouldn't happen in streaming mode)
break subscribeLoop
case seekMsg := <-seekChan:
// Seek requested - cancel current Subscribe and restart from new offset
glog.V(0).Infof("Subscriber %s seeking from offset %d to offset %d (type %v)",
clientName, currentPosition.GetOffset(), seekMsg.Offset, seekMsg.OffsetType)
// Cancel current Subscribe iteration
subscribeCancel()
// Wait for Subscribe to finish cancelling
<-subscribeDone
// Update position for next iteration
currentPosition = b.getRequestPositionFromSeek(seekMsg)
glog.V(0).Infof("Subscriber %s restarting Subscribe from new offset %d", clientName, seekMsg.Offset)
// Send acknowledgment that seek completed
stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
Error: "", // Empty error means success
},
}})
// Loop will restart with new position
}
}
return err
}
@ -304,3 +377,46 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
}
return
}
// getRequestPositionFromSeek converts a seek request to a MessagePosition
// This is used when implementing full seek support in Subscribe loop
func (b *MessageQueueBroker) getRequestPositionFromSeek(seekMsg *mq_pb.SubscribeMessageRequest_SeekMessage) (startPosition log_buffer.MessagePosition) {
if seekMsg == nil {
return
}
offsetType := seekMsg.OffsetType
offset := seekMsg.Offset
// reset to earliest or latest
if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
return
}
if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
return
}
// use the exact timestamp
if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
startPosition = log_buffer.NewMessagePosition(offset, -2)
return
}
// use exact offset (native offset-based positioning)
if offsetType == schema_pb.OffsetType_EXACT_OFFSET {
startPosition = log_buffer.NewMessagePositionFromOffset(offset)
return
}
// reset to specific offset
if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET {
startPosition = log_buffer.NewMessagePositionFromOffset(offset)
return
}
// default to exact offset
startPosition = log_buffer.NewMessagePositionFromOffset(offset)
return
}

872
weed/mq/broker/broker_grpc_sub_seek_test.go

@ -0,0 +1,872 @@
package broker
import (
"context"
"fmt"
"io"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc/metadata"
)
// TestGetRequestPositionFromSeek tests the helper function that converts seek requests to message positions
func TestGetRequestPositionFromSeek(t *testing.T) {
broker := &MessageQueueBroker{}
tests := []struct {
name string
offsetType schema_pb.OffsetType
offset int64
expectedBatch int64
expectZeroTime bool
}{
{
name: "reset to earliest",
offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
offset: 0,
expectedBatch: -3,
expectZeroTime: false,
},
{
name: "reset to latest",
offsetType: schema_pb.OffsetType_RESET_TO_LATEST,
offset: 0,
expectedBatch: -4,
expectZeroTime: false,
},
{
name: "exact offset zero",
offsetType: schema_pb.OffsetType_EXACT_OFFSET,
offset: 0,
expectedBatch: 0,
expectZeroTime: true,
},
{
name: "exact offset 100",
offsetType: schema_pb.OffsetType_EXACT_OFFSET,
offset: 100,
expectedBatch: 100,
expectZeroTime: true,
},
{
name: "exact offset 1000",
offsetType: schema_pb.OffsetType_EXACT_OFFSET,
offset: 1000,
expectedBatch: 1000,
expectZeroTime: true,
},
{
name: "exact timestamp",
offsetType: schema_pb.OffsetType_EXACT_TS_NS,
offset: 1234567890123456789,
expectedBatch: -2,
expectZeroTime: false,
},
{
name: "reset to offset",
offsetType: schema_pb.OffsetType_RESET_TO_OFFSET,
offset: 42,
expectedBatch: 42,
expectZeroTime: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tt.offset,
OffsetType: tt.offsetType,
}
position := broker.getRequestPositionFromSeek(seekMsg)
if position.Offset != tt.expectedBatch {
t.Errorf("Expected batch index %d, got %d", tt.expectedBatch, position.Offset)
}
// Verify time handling
if tt.expectZeroTime && !position.Time.IsZero() {
t.Errorf("Expected zero time for offset-based seek, got %v", position.Time)
}
if !tt.expectZeroTime && position.Time.IsZero() && tt.offsetType != schema_pb.OffsetType_RESET_TO_EARLIEST {
t.Errorf("Expected non-zero time, got zero time")
}
})
}
}
// TestGetRequestPositionFromSeek_NilSafety tests that the function handles nil input gracefully
func TestGetRequestPositionFromSeek_NilSafety(t *testing.T) {
broker := &MessageQueueBroker{}
position := broker.getRequestPositionFromSeek(nil)
// Should return zero-value position without panicking
if position.Offset != 0 {
t.Errorf("Expected zero offset for nil input, got %d", position.Offset)
}
}
// TestGetRequestPositionFromSeek_ConsistentResults verifies that multiple calls with same input produce same output
func TestGetRequestPositionFromSeek_ConsistentResults(t *testing.T) {
broker := &MessageQueueBroker{}
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 42,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
// Call multiple times
positions := make([]log_buffer.MessagePosition, 5)
for i := 0; i < 5; i++ {
positions[i] = broker.getRequestPositionFromSeek(seekMsg)
time.Sleep(1 * time.Millisecond) // Small delay
}
// All positions should be identical
for i := 1; i < len(positions); i++ {
if positions[i].Offset != positions[0].Offset {
t.Errorf("Inconsistent Offset: %d vs %d", positions[0].Offset, positions[i].Offset)
}
if !positions[i].Time.Equal(positions[0].Time) {
t.Errorf("Inconsistent Time: %v vs %v", positions[0].Time, positions[i].Time)
}
if positions[i].IsOffsetBased != positions[0].IsOffsetBased {
t.Errorf("Inconsistent IsOffsetBased: %v vs %v", positions[0].IsOffsetBased, positions[i].IsOffsetBased)
}
}
}
// TestGetRequestPositionFromSeek_OffsetExtraction verifies offset can be correctly extracted
func TestGetRequestPositionFromSeek_OffsetExtraction(t *testing.T) {
broker := &MessageQueueBroker{}
testOffsets := []int64{0, 1, 10, 100, 1000, 9999}
for _, offset := range testOffsets {
t.Run(fmt.Sprintf("offset_%d", offset), func(t *testing.T) {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: offset,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
position := broker.getRequestPositionFromSeek(seekMsg)
if !position.IsOffsetBased {
t.Error("Position should be detected as offset-based")
}
if extractedOffset := position.GetOffset(); extractedOffset != offset {
t.Errorf("Expected extracted offset %d, got %d", offset, extractedOffset)
}
})
}
}
// MockSubscribeMessageStream is a mock implementation of the gRPC stream for testing
type MockSubscribeMessageStream struct {
ctx context.Context
recvChan chan *mq_pb.SubscribeMessageRequest
sentMessages []*mq_pb.SubscribeMessageResponse
mu sync.Mutex
recvErr error
}
func NewMockSubscribeMessageStream(ctx context.Context) *MockSubscribeMessageStream {
return &MockSubscribeMessageStream{
ctx: ctx,
recvChan: make(chan *mq_pb.SubscribeMessageRequest, 10),
sentMessages: make([]*mq_pb.SubscribeMessageResponse, 0),
}
}
func (m *MockSubscribeMessageStream) Send(msg *mq_pb.SubscribeMessageResponse) error {
m.mu.Lock()
defer m.mu.Unlock()
m.sentMessages = append(m.sentMessages, msg)
return nil
}
func (m *MockSubscribeMessageStream) Recv() (*mq_pb.SubscribeMessageRequest, error) {
if m.recvErr != nil {
return nil, m.recvErr
}
select {
case msg := <-m.recvChan:
return msg, nil
case <-m.ctx.Done():
return nil, io.EOF
}
}
func (m *MockSubscribeMessageStream) SetHeader(metadata.MD) error {
return nil
}
func (m *MockSubscribeMessageStream) SendHeader(metadata.MD) error {
return nil
}
func (m *MockSubscribeMessageStream) SetTrailer(metadata.MD) {}
func (m *MockSubscribeMessageStream) Context() context.Context {
return m.ctx
}
func (m *MockSubscribeMessageStream) SendMsg(interface{}) error {
return nil
}
func (m *MockSubscribeMessageStream) RecvMsg(interface{}) error {
return nil
}
func (m *MockSubscribeMessageStream) QueueMessage(msg *mq_pb.SubscribeMessageRequest) {
m.recvChan <- msg
}
func (m *MockSubscribeMessageStream) SetRecvError(err error) {
m.recvErr = err
}
func (m *MockSubscribeMessageStream) GetSentMessages() []*mq_pb.SubscribeMessageResponse {
m.mu.Lock()
defer m.mu.Unlock()
return append([]*mq_pb.SubscribeMessageResponse{}, m.sentMessages...)
}
// TestSeekMessageHandling_BasicSeek tests that seek messages are properly received and acknowledged
func TestSeekMessageHandling_BasicSeek(t *testing.T) {
// Create seek message
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 100,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
// Verify message structure
if seekReq := seekMsg.GetSeek(); seekReq == nil {
t.Fatal("Failed to create seek message")
} else {
if seekReq.Offset != 100 {
t.Errorf("Expected offset 100, got %d", seekReq.Offset)
}
if seekReq.OffsetType != schema_pb.OffsetType_EXACT_OFFSET {
t.Errorf("Expected EXACT_OFFSET, got %v", seekReq.OffsetType)
}
}
}
// TestSeekMessageHandling_MultipleSeekTypes tests different seek offset types
func TestSeekMessageHandling_MultipleSeekTypes(t *testing.T) {
testCases := []struct {
name string
offset int64
offsetType schema_pb.OffsetType
}{
{
name: "seek to earliest",
offset: 0,
offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
},
{
name: "seek to latest",
offset: 0,
offsetType: schema_pb.OffsetType_RESET_TO_LATEST,
},
{
name: "seek to exact offset",
offset: 42,
offsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
{
name: "seek to timestamp",
offset: time.Now().UnixNano(),
offsetType: schema_pb.OffsetType_EXACT_TS_NS,
},
{
name: "reset to offset",
offset: 1000,
offsetType: schema_pb.OffsetType_RESET_TO_OFFSET,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tc.offset,
OffsetType: tc.offsetType,
},
},
}
seekReq := seekMsg.GetSeek()
if seekReq == nil {
t.Fatal("Failed to get seek message")
}
if seekReq.Offset != tc.offset {
t.Errorf("Expected offset %d, got %d", tc.offset, seekReq.Offset)
}
if seekReq.OffsetType != tc.offsetType {
t.Errorf("Expected offset type %v, got %v", tc.offsetType, seekReq.OffsetType)
}
})
}
}
// TestSeekMessageHandling_AckVsSeekDistinction tests that we can distinguish between ack and seek messages
func TestSeekMessageHandling_AckVsSeekDistinction(t *testing.T) {
// Create ack message
ackMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
Key: []byte("test-key"),
TsNs: time.Now().UnixNano(),
},
},
}
// Create seek message
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 100,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
// Verify ack message doesn't match seek
if ackMsg.GetSeek() != nil {
t.Error("Ack message should not be detected as seek")
}
if ackMsg.GetAck() == nil {
t.Error("Ack message should be detected as ack")
}
// Verify seek message doesn't match ack
if seekMsg.GetAck() != nil {
t.Error("Seek message should not be detected as ack")
}
if seekMsg.GetSeek() == nil {
t.Error("Seek message should be detected as seek")
}
}
// TestSeekMessageResponse_SuccessFormat tests the response format for successful seek
func TestSeekMessageResponse_SuccessFormat(t *testing.T) {
// Create success response (empty error string = success)
successResponse := &mq_pb.SubscribeMessageResponse{
Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
Error: "", // Empty error means success
},
},
}
ctrlMsg := successResponse.GetCtrl()
if ctrlMsg == nil {
t.Fatal("Failed to get control message")
}
// Empty error string indicates success
if ctrlMsg.Error != "" {
t.Errorf("Expected empty error for success, got: %s", ctrlMsg.Error)
}
}
// TestSeekMessageResponse_ErrorFormat tests the response format for failed seek
func TestSeekMessageResponse_ErrorFormat(t *testing.T) {
// Create error response
errorResponse := &mq_pb.SubscribeMessageResponse{
Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
Error: "Seek not implemented",
},
},
}
ctrlMsg := errorResponse.GetCtrl()
if ctrlMsg == nil {
t.Fatal("Failed to get control message")
}
// Non-empty error string indicates failure
if ctrlMsg.Error == "" {
t.Error("Expected non-empty error for failure")
}
if ctrlMsg.Error != "Seek not implemented" {
t.Errorf("Expected specific error message, got: %s", ctrlMsg.Error)
}
}
// TestSeekMessageHandling_BackwardSeek tests backward seeking scenarios
func TestSeekMessageHandling_BackwardSeek(t *testing.T) {
testCases := []struct {
name string
currentPos int64
seekOffset int64
expectedGap int64
}{
{
name: "small backward gap",
currentPos: 100,
seekOffset: 90,
expectedGap: 10,
},
{
name: "medium backward gap",
currentPos: 1000,
seekOffset: 500,
expectedGap: 500,
},
{
name: "large backward gap",
currentPos: 1000000,
seekOffset: 1,
expectedGap: 999999,
},
{
name: "seek to zero",
currentPos: 100,
seekOffset: 0,
expectedGap: 100,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Verify gap calculation
gap := tc.currentPos - tc.seekOffset
if gap != tc.expectedGap {
t.Errorf("Expected gap %d, got %d", tc.expectedGap, gap)
}
// Create seek message for backward seek
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tc.seekOffset,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
seekReq := seekMsg.GetSeek()
if seekReq == nil {
t.Fatal("Failed to create seek message")
}
if seekReq.Offset != tc.seekOffset {
t.Errorf("Expected offset %d, got %d", tc.seekOffset, seekReq.Offset)
}
})
}
}
// TestSeekMessageHandling_ForwardSeek tests forward seeking scenarios
func TestSeekMessageHandling_ForwardSeek(t *testing.T) {
testCases := []struct {
name string
currentPos int64
seekOffset int64
shouldSeek bool
}{
{
name: "small forward gap",
currentPos: 100,
seekOffset: 110,
shouldSeek: false, // Forward seeks don't need special handling
},
{
name: "same position",
currentPos: 100,
seekOffset: 100,
shouldSeek: false,
},
{
name: "large forward gap",
currentPos: 100,
seekOffset: 10000,
shouldSeek: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// For forward seeks, gateway typically just continues reading
// No special seek message needed
isBackward := tc.seekOffset < tc.currentPos
if isBackward && !tc.shouldSeek {
t.Error("Backward seek should require seek message")
}
})
}
}
// TestSeekIntegration_PositionConversion tests the complete flow from seek message to position
func TestSeekIntegration_PositionConversion(t *testing.T) {
broker := &MessageQueueBroker{}
testCases := []struct {
name string
offset int64
offsetType schema_pb.OffsetType
verifyFunc func(t *testing.T, pos log_buffer.MessagePosition)
}{
{
name: "exact offset conversion",
offset: 42,
offsetType: schema_pb.OffsetType_EXACT_OFFSET,
verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) {
if !pos.IsOffsetBased {
t.Error("Expected offset-based position")
}
if pos.GetOffset() != 42 {
t.Errorf("Expected offset 42, got %d", pos.GetOffset())
}
},
},
{
name: "earliest offset conversion",
offset: 0,
offsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) {
if pos.Offset != -3 {
t.Errorf("Expected batch -3 for earliest, got %d", pos.Offset)
}
},
},
{
name: "latest offset conversion",
offset: 0,
offsetType: schema_pb.OffsetType_RESET_TO_LATEST,
verifyFunc: func(t *testing.T, pos log_buffer.MessagePosition) {
if pos.Offset != -4 {
t.Errorf("Expected batch -4 for latest, got %d", pos.Offset)
}
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create seek message
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tc.offset,
OffsetType: tc.offsetType,
}
// Convert to position
position := broker.getRequestPositionFromSeek(seekMsg)
// Verify result
tc.verifyFunc(t, position)
})
}
}
// TestSeekMessageHandling_ConcurrentSeeks tests handling multiple seek requests
func TestSeekMessageHandling_ConcurrentSeeks(t *testing.T) {
broker := &MessageQueueBroker{}
// Simulate multiple concurrent seek requests
seekOffsets := []int64{10, 20, 30, 40, 50}
var wg sync.WaitGroup
results := make([]log_buffer.MessagePosition, len(seekOffsets))
for i, offset := range seekOffsets {
wg.Add(1)
go func(idx int, off int64) {
defer wg.Done()
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: off,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
results[idx] = broker.getRequestPositionFromSeek(seekMsg)
}(i, offset)
}
wg.Wait()
// Verify all results are correct
for i, offset := range seekOffsets {
if results[i].GetOffset() != offset {
t.Errorf("Expected offset %d at index %d, got %d", offset, i, results[i].GetOffset())
}
}
}
// TestSeekMessageProtocol_WireFormat verifies the protobuf message structure
func TestSeekMessageProtocol_WireFormat(t *testing.T) {
// Test that SeekMessage is properly defined in the oneof
req := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 100,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
// Verify oneof is set correctly
switch msg := req.Message.(type) {
case *mq_pb.SubscribeMessageRequest_Seek:
if msg.Seek.Offset != 100 {
t.Errorf("Expected offset 100, got %d", msg.Seek.Offset)
}
default:
t.Errorf("Expected Seek message, got %T", msg)
}
// Verify other message types are nil
if req.GetAck() != nil {
t.Error("Seek message should not have Ack")
}
if req.GetInit() != nil {
t.Error("Seek message should not have Init")
}
}
// TestSeekByTimestamp tests timestamp-based seek operations
func TestSeekByTimestamp(t *testing.T) {
broker := &MessageQueueBroker{}
testCases := []struct {
name string
timestampNs int64
offsetType schema_pb.OffsetType
}{
{
name: "seek to specific timestamp",
timestampNs: 1234567890123456789,
offsetType: schema_pb.OffsetType_EXACT_TS_NS,
},
{
name: "seek to current timestamp",
timestampNs: time.Now().UnixNano(),
offsetType: schema_pb.OffsetType_EXACT_TS_NS,
},
{
name: "seek to past timestamp",
timestampNs: time.Now().Add(-24 * time.Hour).UnixNano(),
offsetType: schema_pb.OffsetType_EXACT_TS_NS,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tc.timestampNs,
OffsetType: tc.offsetType,
}
position := broker.getRequestPositionFromSeek(seekMsg)
// For timestamp-based seeks, Time should be set to the timestamp
expectedTime := time.Unix(0, tc.timestampNs)
if !position.Time.Equal(expectedTime) {
t.Errorf("Expected time %v, got %v", expectedTime, position.Time)
}
// Batch should be -2 for EXACT_TS_NS
if position.Offset != -2 {
t.Errorf("Expected batch -2 for timestamp seek, got %d", position.Offset)
}
})
}
}
// TestSeekByTimestamp_Ordering tests that timestamp seeks preserve ordering
func TestSeekByTimestamp_Ordering(t *testing.T) {
broker := &MessageQueueBroker{}
// Create timestamps in chronological order
baseTime := time.Now().Add(-1 * time.Hour)
timestamps := []int64{
baseTime.UnixNano(),
baseTime.Add(10 * time.Minute).UnixNano(),
baseTime.Add(20 * time.Minute).UnixNano(),
baseTime.Add(30 * time.Minute).UnixNano(),
}
positions := make([]log_buffer.MessagePosition, len(timestamps))
for i, ts := range timestamps {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: ts,
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
}
positions[i] = broker.getRequestPositionFromSeek(seekMsg)
}
// Verify positions are in chronological order
for i := 1; i < len(positions); i++ {
if !positions[i].Time.After(positions[i-1].Time) {
t.Errorf("Timestamp ordering violated: position[%d].Time (%v) should be after position[%d].Time (%v)",
i, positions[i].Time, i-1, positions[i-1].Time)
}
}
}
// TestSeekByTimestamp_EdgeCases tests edge cases for timestamp seeks
func TestSeekByTimestamp_EdgeCases(t *testing.T) {
broker := &MessageQueueBroker{}
testCases := []struct {
name string
timestampNs int64
expectValid bool
}{
{
name: "zero timestamp",
timestampNs: 0,
expectValid: true, // Valid - means Unix epoch
},
{
name: "negative timestamp",
timestampNs: -1,
expectValid: true, // Valid in Go (before Unix epoch)
},
{
name: "far future timestamp",
timestampNs: time.Now().Add(100 * 365 * 24 * time.Hour).UnixNano(),
expectValid: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: tc.timestampNs,
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
}
position := broker.getRequestPositionFromSeek(seekMsg)
if tc.expectValid {
expectedTime := time.Unix(0, tc.timestampNs)
if !position.Time.Equal(expectedTime) {
t.Errorf("Expected time %v, got %v", expectedTime, position.Time)
}
}
})
}
}
// TestSeekByTimestamp_VsOffset tests that timestamp and offset seeks are independent
func TestSeekByTimestamp_VsOffset(t *testing.T) {
broker := &MessageQueueBroker{}
timestampSeek := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: time.Now().UnixNano(),
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
}
offsetSeek := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 100,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
timestampPos := broker.getRequestPositionFromSeek(timestampSeek)
offsetPos := broker.getRequestPositionFromSeek(offsetSeek)
// Timestamp-based position should have batch -2
if timestampPos.Offset != -2 {
t.Errorf("Timestamp seek should have batch -2, got %d", timestampPos.Offset)
}
// Offset-based position should have the exact offset in Offset field
if offsetPos.GetOffset() != 100 {
t.Errorf("Offset seek should have offset 100, got %d", offsetPos.GetOffset())
}
// They should use different positioning mechanisms
if timestampPos.IsOffsetBased {
t.Error("Timestamp seek should not be offset-based")
}
if !offsetPos.IsOffsetBased {
t.Error("Offset seek should be offset-based")
}
}
// TestSeekOptimization_SkipRedundantSeek tests that seeking to the same offset is optimized
func TestSeekOptimization_SkipRedundantSeek(t *testing.T) {
broker := &MessageQueueBroker{}
// Test that seeking to the same offset multiple times produces the same result
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 100,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
// First seek
pos1 := broker.getRequestPositionFromSeek(seekMsg)
// Second seek to same offset
pos2 := broker.getRequestPositionFromSeek(seekMsg)
// Third seek to same offset
pos3 := broker.getRequestPositionFromSeek(seekMsg)
// All positions should be identical
if pos1.GetOffset() != pos2.GetOffset() || pos2.GetOffset() != pos3.GetOffset() {
t.Errorf("Multiple seeks to same offset should produce identical results: %d, %d, %d",
pos1.GetOffset(), pos2.GetOffset(), pos3.GetOffset())
}
// Verify the offset is correct
if pos1.GetOffset() != 100 {
t.Errorf("Expected offset 100, got %d", pos1.GetOffset())
}
}
// TestSeekOptimization_DifferentOffsets tests that different offsets produce different positions
func TestSeekOptimization_DifferentOffsets(t *testing.T) {
broker := &MessageQueueBroker{}
offsets := []int64{0, 50, 100, 150, 200}
positions := make([]log_buffer.MessagePosition, len(offsets))
for i, offset := range offsets {
seekMsg := &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: offset,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
}
positions[i] = broker.getRequestPositionFromSeek(seekMsg)
}
// Verify each position has the correct offset
for i, offset := range offsets {
if positions[i].GetOffset() != offset {
t.Errorf("Position %d: expected offset %d, got %d", i, offset, positions[i].GetOffset())
}
}
// Verify all positions are different
for i := 1; i < len(positions); i++ {
if positions[i].GetOffset() == positions[i-1].GetOffset() {
t.Errorf("Positions %d and %d should be different", i-1, i)
}
}
}

337
weed/mq/kafka/integration/broker_client_subscribe.go

@ -113,25 +113,25 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
}
session.mu.Unlock()
// Decision logic:
// 1. Forward read (startOffset >= currentOffset): Always reuse - ReadRecordsFromOffset will handle it
// 2. Backward read with cache hit: Reuse - ReadRecordsFromOffset will serve from cache
// 3. Backward read without cache: Reuse if gap is small, let ReadRecordsFromOffset handle recreation
// This prevents GetOrCreateSubscriber from constantly recreating sessions for small offsets
if startOffset >= currentOffset || canUseCache {
// Can read forward OR offset is in cache - reuse session
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (forward or cached)",
key, currentOffset, startOffset)
return session, nil
}
// With seekable broker: Always reuse existing session
// Any offset mismatch will be handled by FetchRecords via SeekMessage
// This includes:
// 1. Forward read: Natural continuation
// 2. Backward read with cache hit: Serve from cache
// 3. Backward read without cache: Send seek message to broker
// No need for stream recreation - broker repositions internally
// Backward seek, not in cache
// Let ReadRecordsFromOffset handle the recreation decision based on the actual read context
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will handle in ReadRecordsFromOffset)",
key, currentOffset, startOffset)
if canUseCache {
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (cached)",
key, currentOffset, startOffset)
} else if startOffset >= currentOffset {
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (forward read)",
key, currentOffset, startOffset)
} else {
glog.V(2).Infof("[FETCH] Reusing session for %s: session at %d, requested %d (will seek backward)",
key, currentOffset, startOffset)
}
return session, nil
}
@ -143,32 +143,16 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
bc.subscribersLock.Lock()
defer bc.subscribersLock.Unlock()
// CRITICAL FIX: Double-check if session exists AND verify it's at the right offset
// This can happen if another thread created a session while we were acquiring the lock
// (only possible in the non-recreation path where we released the read lock)
// Double-check if session was created by another thread while we were acquiring the lock
if session, exists := bc.subscribers[key]; exists {
// With seekable broker, always reuse existing session
// FetchRecords will handle any offset mismatch via seek
session.mu.Lock()
existingOffset := session.StartOffset
session.mu.Unlock()
// Only reuse if the session is at or before the requested offset
if existingOffset <= startOffset {
glog.V(1).Infof("[FETCH] Session already exists at offset %d (requested %d), reusing", existingOffset, startOffset)
return session, nil
}
// Session is at wrong offset - must recreate
glog.V(2).Infof("[FETCH] Session exists at wrong offset %d (requested %d), recreating", existingOffset, startOffset)
// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls
session.mu.Lock()
if session.Stream != nil {
_ = session.Stream.CloseSend()
}
if session.Cancel != nil {
session.Cancel()
}
session.mu.Unlock()
delete(bc.subscribers, key)
glog.V(1).Infof("[FETCH] Session created concurrently at offset %d (requested %d), reusing", existingOffset, startOffset)
return session, nil
}
// Use BrokerClient's context so subscribers are automatically cancelled when connection closes
@ -273,148 +257,55 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
}
}
// CRITICAL: Get the current offset atomically before making recreation decision
// We need to unlock first (lock acquired at line 257) then re-acquire for atomic read
// Get the current offset atomically for comparison
currentStartOffset := session.StartOffset
session.mu.Unlock()
// CRITICAL FIX for Schema Registry: Keep subscriber alive across multiple fetch requests
// Schema Registry expects to make multiple poll() calls on the same consumer connection
// With seekable broker: Keep subscriber alive across all requests
// Schema Registry and other clients expect persistent consumer connections
//
// Three scenarios:
// 1. requestedOffset < session.StartOffset: Need to seek backward (recreate)
// 2. requestedOffset == session.StartOffset: Continue reading (use existing)
// 3. requestedOffset > session.StartOffset: Continue reading forward (use existing)
// Three scenarios, all handled via seek:
// 1. requestedOffset < session.StartOffset: Send seek message (backward)
// 2. requestedOffset == session.StartOffset: Continue reading (no seek needed)
// 3. requestedOffset > session.StartOffset: Send seek message (forward)
//
// The session will naturally advance as records are consumed, so we should NOT
// recreate it just because requestedOffset != session.StartOffset
// OPTIMIZATION: Only recreate for EXTREMELY LARGE backward seeks (>1000000 offsets back)
// Most backward seeks should be served from cache or tolerated as forward reads
// This prevents creating zombie streams that never get cleaned up on the broker
// gRPC's stream.Recv() NEVER unblocks when streams are cancelled, leaving goroutines
// orphaned forever. Each recreation leaves 2 goroutines (first record + loop) blocked.
// With 14K recreations, that's 28K leaked goroutines. Solution: almost never recreate.
const maxBackwardGap = 1000000
offsetGap := currentStartOffset - requestedOffset
if requestedOffset < currentStartOffset && offsetGap > maxBackwardGap {
// Need to seek backward significantly - close old session and create a fresh subscriber
// Restarting an existing stream doesn't work reliably because the broker may still
// have old data buffered in the stream pipeline
glog.V(2).Infof("[FETCH] Seeking backward significantly: requested=%d < session=%d (gap=%d), creating fresh subscriber",
requestedOffset, currentStartOffset, offsetGap)
// Extract session details (note: session.mu was already unlocked at line 294)
topic := session.Topic
partition := session.Partition
consumerGroup := session.ConsumerGroup
consumerID := session.ConsumerID
key := session.Key()
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
// This prevents multiple threads from all deciding to recreate based on stale data
glog.V(2).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset)
bc.subscribersLock.Lock()
glog.V(2).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset)
// Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(2).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
glog.V(2).Infof("[FETCH] Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset)
// Session still needs recreation - close it
// CRITICAL: Hold session lock while cancelling to prevent race with active Recv() calls
existingSession.mu.Lock()
if existingSession.Stream != nil {
_ = existingSession.Stream.CloseSend()
}
if existingSession.Cancel != nil {
existingSession.Cancel()
}
existingSession.mu.Unlock()
delete(bc.subscribers, key)
glog.V(2).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset)
}
// CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session
// to prevent other threads from interfering. We'll create the session inline.
// bc.subscribersLock.Unlock() - REMOVED to fix race condition
// Create a completely fresh subscriber at the requested offset
// INLINE SESSION CREATION to hold the lock continuously
glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset)
subscriberCtx, subscriberCancel := context.WithCancel(bc.ctx)
stream, err := bc.client.SubscribeMessage(subscriberCtx)
if err != nil {
bc.subscribersLock.Unlock()
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Get the actual partition assignment from the broker
actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
if err != nil {
bc.subscribersLock.Unlock()
_ = stream.CloseSend()
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
// The stream persists for the entire consumer session - no recreation needed
if requestedOffset != currentStartOffset {
offsetDiff := requestedOffset - currentStartOffset
seekDirection := "forward"
if offsetDiff < 0 {
seekDirection = "backward"
}
// Use EXACT_OFFSET to position subscriber at the exact Kafka offset
offsetType := schema_pb.OffsetType_EXACT_OFFSET
glog.V(2).Infof("[FETCH] Offset mismatch: %s seek from %d to %d (diff=%d)",
seekDirection, currentStartOffset, requestedOffset, offsetDiff)
glog.V(2).Infof("[FETCH] Creating inline subscriber for backward seek: topic=%s partition=%d offset=%d",
topic, partition, requestedOffset)
glog.V(2).Infof("[SUBSCRIBE-INIT] ReadRecordsFromOffset (backward seek) sending init: topic=%s partition=%d startOffset=%d offsetType=%v consumerGroup=%s consumerID=%s",
topic, partition, requestedOffset, offsetType, consumerGroup, consumerID)
// Send init message using the actual partition structure
initReq := createSubscribeInitMessage(topic, actualPartition, requestedOffset, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
bc.subscribersLock.Unlock()
_ = stream.CloseSend()
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
// Send seek message to reposition stream
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: requestedOffset,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
newSession := &BrokerSubscriberSession{
Topic: topic,
Partition: partition,
Stream: stream,
StartOffset: requestedOffset,
ConsumerGroup: consumerGroup,
ConsumerID: consumerID,
Ctx: subscriberCtx,
Cancel: subscriberCancel,
if err := session.Stream.Send(seekMsg); err != nil {
return nil, fmt.Errorf("seek to offset %d failed: %v", requestedOffset, err)
}
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(2).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Update session state after successful seek
session.mu.Lock()
session.StartOffset = requestedOffset
session.consumedRecords = nil // Clear cache after seek
session.mu.Unlock()
// Read from fresh subscriber
glog.V(2).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
return bc.ReadRecords(ctx, newSession, maxRecords)
glog.V(2).Infof("[FETCH] Seek to offset %d successful", requestedOffset)
} else {
glog.V(2).Infof("[FETCH] Offset match: continuing from offset %d", requestedOffset)
}
// requestedOffset >= session.StartOffset: Keep reading forward from existing session
// This handles:
// - Exact match (requestedOffset == session.StartOffset)
// - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache)
glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)",
requestedOffset, currentStartOffset)
// Note: session.mu was already unlocked at line 294 after reading currentStartOffset
// Read records from current position
return bc.ReadRecords(ctx, session, maxRecords)
}
@ -782,3 +673,119 @@ func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newO
return nil
}
// Seek helper methods for BrokerSubscriberSession
// SeekToOffset repositions the stream to read from a specific offset
func (session *BrokerSubscriberSession) SeekToOffset(offset int64) error {
// Skip seek if already at the requested offset
session.mu.Lock()
currentOffset := session.StartOffset
session.mu.Unlock()
if currentOffset == offset {
glog.V(2).Infof("[SEEK] Already at offset %d for %s[%d], skipping seek", offset, session.Topic, session.Partition)
return nil
}
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: offset,
OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
},
},
}
if err := session.Stream.Send(seekMsg); err != nil {
return fmt.Errorf("seek to offset %d failed: %v", offset, err)
}
session.mu.Lock()
session.StartOffset = offset
session.consumedRecords = nil
session.mu.Unlock()
glog.V(2).Infof("[SEEK] Seeked to offset %d for %s[%d]", offset, session.Topic, session.Partition)
return nil
}
// SeekToTimestamp repositions the stream to read from messages at or after a specific timestamp
// timestamp is in nanoseconds since Unix epoch
// Note: We don't skip this operation even if we think we're at the right position because
// we can't easily determine the offset corresponding to a timestamp without querying the broker
func (session *BrokerSubscriberSession) SeekToTimestamp(timestampNs int64) error {
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: timestampNs,
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
},
},
}
if err := session.Stream.Send(seekMsg); err != nil {
return fmt.Errorf("seek to timestamp %d failed: %v", timestampNs, err)
}
session.mu.Lock()
// Note: We don't know the exact offset at this timestamp yet
// It will be updated when we read the first message
session.consumedRecords = nil
session.mu.Unlock()
glog.V(2).Infof("[SEEK] Seeked to timestamp %d for %s[%d]", timestampNs, session.Topic, session.Partition)
return nil
}
// SeekToEarliest repositions the stream to the beginning of the partition
// Note: We don't skip this operation even if StartOffset == 0 because the broker
// may have a different notion of "earliest" (e.g., after compaction or retention)
func (session *BrokerSubscriberSession) SeekToEarliest() error {
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 0,
OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
},
},
}
if err := session.Stream.Send(seekMsg); err != nil {
return fmt.Errorf("seek to earliest failed: %v", err)
}
session.mu.Lock()
session.StartOffset = 0
session.consumedRecords = nil
session.mu.Unlock()
glog.V(2).Infof("[SEEK] Seeked to earliest for %s[%d]", session.Topic, session.Partition)
return nil
}
// SeekToLatest repositions the stream to the end of the partition (next new message)
// Note: We don't skip this operation because "latest" is a moving target and we can't
// reliably determine if we're already at the latest position without querying the broker
func (session *BrokerSubscriberSession) SeekToLatest() error {
seekMsg := &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Seek{
Seek: &mq_pb.SubscribeMessageRequest_SeekMessage{
Offset: 0,
OffsetType: schema_pb.OffsetType_RESET_TO_LATEST,
},
},
}
if err := session.Stream.Send(seekMsg); err != nil {
return fmt.Errorf("seek to latest failed: %v", err)
}
session.mu.Lock()
// Offset will be set when we read the first new message
session.consumedRecords = nil
session.mu.Unlock()
glog.V(2).Infof("[SEEK] Seeked to latest for %s[%d]", session.Topic, session.Partition)
return nil
}

5
weed/pb/mq_broker.proto

@ -329,9 +329,14 @@ message SubscribeMessageRequest {
int64 ts_ns = 1; // Timestamp in nanoseconds for acknowledgment tracking
bytes key = 2;
}
message SeekMessage {
int64 offset = 1; // New offset to seek to
schema_pb.OffsetType offset_type = 2; // EXACT_OFFSET, RESET_TO_LATEST, etc.
}
oneof message {
InitMessage init = 1;
AckMessage ack = 2;
SeekMessage seek = 3;
}
}
message SubscribeMessageResponse {

307
weed/pb/mq_pb/mq_broker.pb.go

@ -2250,6 +2250,7 @@ type SubscribeMessageRequest struct {
//
// *SubscribeMessageRequest_Init
// *SubscribeMessageRequest_Ack
// *SubscribeMessageRequest_Seek
Message isSubscribeMessageRequest_Message `protobuf_oneof:"message"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
@ -2310,6 +2311,15 @@ func (x *SubscribeMessageRequest) GetAck() *SubscribeMessageRequest_AckMessage {
return nil
}
func (x *SubscribeMessageRequest) GetSeek() *SubscribeMessageRequest_SeekMessage {
if x != nil {
if x, ok := x.Message.(*SubscribeMessageRequest_Seek); ok {
return x.Seek
}
}
return nil
}
type isSubscribeMessageRequest_Message interface {
isSubscribeMessageRequest_Message()
}
@ -2322,10 +2332,16 @@ type SubscribeMessageRequest_Ack struct {
Ack *SubscribeMessageRequest_AckMessage `protobuf:"bytes,2,opt,name=ack,proto3,oneof"`
}
type SubscribeMessageRequest_Seek struct {
Seek *SubscribeMessageRequest_SeekMessage `protobuf:"bytes,3,opt,name=seek,proto3,oneof"`
}
func (*SubscribeMessageRequest_Init) isSubscribeMessageRequest_Message() {}
func (*SubscribeMessageRequest_Ack) isSubscribeMessageRequest_Message() {}
func (*SubscribeMessageRequest_Seek) isSubscribeMessageRequest_Message() {}
type SubscribeMessageResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Types that are valid to be assigned to Message:
@ -3761,6 +3777,58 @@ func (x *SubscribeMessageRequest_AckMessage) GetKey() []byte {
return nil
}
type SubscribeMessageRequest_SeekMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` // New offset to seek to
OffsetType schema_pb.OffsetType `protobuf:"varint,2,opt,name=offset_type,json=offsetType,proto3,enum=schema_pb.OffsetType" json:"offset_type,omitempty"` // EXACT_OFFSET, RESET_TO_LATEST, etc.
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SubscribeMessageRequest_SeekMessage) Reset() {
*x = SubscribeMessageRequest_SeekMessage{}
mi := &file_mq_broker_proto_msgTypes[63]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SubscribeMessageRequest_SeekMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeMessageRequest_SeekMessage) ProtoMessage() {}
func (x *SubscribeMessageRequest_SeekMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_broker_proto_msgTypes[63]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeMessageRequest_SeekMessage.ProtoReflect.Descriptor instead.
func (*SubscribeMessageRequest_SeekMessage) Descriptor() ([]byte, []int) {
return file_mq_broker_proto_rawDescGZIP(), []int{36, 2}
}
func (x *SubscribeMessageRequest_SeekMessage) GetOffset() int64 {
if x != nil {
return x.Offset
}
return 0
}
func (x *SubscribeMessageRequest_SeekMessage) GetOffsetType() schema_pb.OffsetType {
if x != nil {
return x.OffsetType
}
return schema_pb.OffsetType(0)
}
type SubscribeMessageResponse_SubscribeCtrlMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
@ -3772,7 +3840,7 @@ type SubscribeMessageResponse_SubscribeCtrlMessage struct {
func (x *SubscribeMessageResponse_SubscribeCtrlMessage) Reset() {
*x = SubscribeMessageResponse_SubscribeCtrlMessage{}
mi := &file_mq_broker_proto_msgTypes[63]
mi := &file_mq_broker_proto_msgTypes[64]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3784,7 +3852,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) String() string {
func (*SubscribeMessageResponse_SubscribeCtrlMessage) ProtoMessage() {}
func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_broker_proto_msgTypes[63]
mi := &file_mq_broker_proto_msgTypes[64]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3832,7 +3900,7 @@ type SubscribeFollowMeRequest_InitMessage struct {
func (x *SubscribeFollowMeRequest_InitMessage) Reset() {
*x = SubscribeFollowMeRequest_InitMessage{}
mi := &file_mq_broker_proto_msgTypes[64]
mi := &file_mq_broker_proto_msgTypes[65]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3844,7 +3912,7 @@ func (x *SubscribeFollowMeRequest_InitMessage) String() string {
func (*SubscribeFollowMeRequest_InitMessage) ProtoMessage() {}
func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_broker_proto_msgTypes[64]
mi := &file_mq_broker_proto_msgTypes[65]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3890,7 +3958,7 @@ type SubscribeFollowMeRequest_AckMessage struct {
func (x *SubscribeFollowMeRequest_AckMessage) Reset() {
*x = SubscribeFollowMeRequest_AckMessage{}
mi := &file_mq_broker_proto_msgTypes[65]
mi := &file_mq_broker_proto_msgTypes[66]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3902,7 +3970,7 @@ func (x *SubscribeFollowMeRequest_AckMessage) String() string {
func (*SubscribeFollowMeRequest_AckMessage) ProtoMessage() {}
func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_broker_proto_msgTypes[65]
mi := &file_mq_broker_proto_msgTypes[66]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -3933,7 +4001,7 @@ type SubscribeFollowMeRequest_CloseMessage struct {
func (x *SubscribeFollowMeRequest_CloseMessage) Reset() {
*x = SubscribeFollowMeRequest_CloseMessage{}
mi := &file_mq_broker_proto_msgTypes[66]
mi := &file_mq_broker_proto_msgTypes[67]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -3945,7 +4013,7 @@ func (x *SubscribeFollowMeRequest_CloseMessage) String() string {
func (*SubscribeFollowMeRequest_CloseMessage) ProtoMessage() {}
func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message {
mi := &file_mq_broker_proto_msgTypes[66]
mi := &file_mq_broker_proto_msgTypes[67]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -4144,10 +4212,11 @@ const file_mq_broker_proto_rawDesc = "" +
"\fCloseMessageB\t\n" +
"\amessage\"5\n" +
"\x17PublishFollowMeResponse\x12\x1a\n" +
"\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\xf5\x04\n" +
"\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\x9d\x06\n" +
"\x17SubscribeMessageRequest\x12G\n" +
"\x04init\x18\x01 \x01(\v21.messaging_pb.SubscribeMessageRequest.InitMessageH\x00R\x04init\x12D\n" +
"\x03ack\x18\x02 \x01(\v20.messaging_pb.SubscribeMessageRequest.AckMessageH\x00R\x03ack\x1a\x8a\x03\n" +
"\x03ack\x18\x02 \x01(\v20.messaging_pb.SubscribeMessageRequest.AckMessageH\x00R\x03ack\x12G\n" +
"\x04seek\x18\x03 \x01(\v21.messaging_pb.SubscribeMessageRequest.SeekMessageH\x00R\x04seek\x1a\x8a\x03\n" +
"\vInitMessage\x12%\n" +
"\x0econsumer_group\x18\x01 \x01(\tR\rconsumerGroup\x12\x1f\n" +
"\vconsumer_id\x18\x02 \x01(\tR\n" +
@ -4164,7 +4233,11 @@ const file_mq_broker_proto_rawDesc = "" +
"\n" +
"AckMessage\x12\x13\n" +
"\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x10\n" +
"\x03key\x18\x02 \x01(\fR\x03keyB\t\n" +
"\x03key\x18\x02 \x01(\fR\x03key\x1a]\n" +
"\vSeekMessage\x12\x16\n" +
"\x06offset\x18\x01 \x01(\x03R\x06offset\x126\n" +
"\voffset_type\x18\x02 \x01(\x0e2\x15.schema_pb.OffsetTypeR\n" +
"offsetTypeB\t\n" +
"\amessage\"\xa7\x02\n" +
"\x18SubscribeMessageResponse\x12Q\n" +
"\x04ctrl\x18\x01 \x01(\v2;.messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessageH\x00R\x04ctrl\x12/\n" +
@ -4260,7 +4333,7 @@ func file_mq_broker_proto_rawDescGZIP() []byte {
return file_mq_broker_proto_rawDescData
}
var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 67)
var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 68)
var file_mq_broker_proto_goTypes = []any{
(*FindBrokerLeaderRequest)(nil), // 0: messaging_pb.FindBrokerLeaderRequest
(*FindBrokerLeaderResponse)(nil), // 1: messaging_pb.FindBrokerLeaderResponse
@ -4325,47 +4398,48 @@ var file_mq_broker_proto_goTypes = []any{
(*PublishFollowMeRequest_CloseMessage)(nil), // 60: messaging_pb.PublishFollowMeRequest.CloseMessage
(*SubscribeMessageRequest_InitMessage)(nil), // 61: messaging_pb.SubscribeMessageRequest.InitMessage
(*SubscribeMessageRequest_AckMessage)(nil), // 62: messaging_pb.SubscribeMessageRequest.AckMessage
(*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 63: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage
(*SubscribeFollowMeRequest_InitMessage)(nil), // 64: messaging_pb.SubscribeFollowMeRequest.InitMessage
(*SubscribeFollowMeRequest_AckMessage)(nil), // 65: messaging_pb.SubscribeFollowMeRequest.AckMessage
(*SubscribeFollowMeRequest_CloseMessage)(nil), // 66: messaging_pb.SubscribeFollowMeRequest.CloseMessage
(*schema_pb.Topic)(nil), // 67: schema_pb.Topic
(*schema_pb.Partition)(nil), // 68: schema_pb.Partition
(*schema_pb.RecordType)(nil), // 69: schema_pb.RecordType
(*filer_pb.LogEntry)(nil), // 70: filer_pb.LogEntry
(*schema_pb.PartitionOffset)(nil), // 71: schema_pb.PartitionOffset
(schema_pb.OffsetType)(0), // 72: schema_pb.OffsetType
(*SubscribeMessageRequest_SeekMessage)(nil), // 63: messaging_pb.SubscribeMessageRequest.SeekMessage
(*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 64: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage
(*SubscribeFollowMeRequest_InitMessage)(nil), // 65: messaging_pb.SubscribeFollowMeRequest.InitMessage
(*SubscribeFollowMeRequest_AckMessage)(nil), // 66: messaging_pb.SubscribeFollowMeRequest.AckMessage
(*SubscribeFollowMeRequest_CloseMessage)(nil), // 67: messaging_pb.SubscribeFollowMeRequest.CloseMessage
(*schema_pb.Topic)(nil), // 68: schema_pb.Topic
(*schema_pb.Partition)(nil), // 69: schema_pb.Partition
(*schema_pb.RecordType)(nil), // 70: schema_pb.RecordType
(*filer_pb.LogEntry)(nil), // 71: filer_pb.LogEntry
(*schema_pb.PartitionOffset)(nil), // 72: schema_pb.PartitionOffset
(schema_pb.OffsetType)(0), // 73: schema_pb.OffsetType
}
var file_mq_broker_proto_depIdxs = []int32{
50, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry
67, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic
68, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition
68, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic
69, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition
51, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage
2, // 4: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats
67, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic
68, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic
8, // 6: messaging_pb.ConfigureTopicRequest.retention:type_name -> messaging_pb.TopicRetention
69, // 7: messaging_pb.ConfigureTopicRequest.message_record_type:type_name -> schema_pb.RecordType
70, // 7: messaging_pb.ConfigureTopicRequest.message_record_type:type_name -> schema_pb.RecordType
17, // 8: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
8, // 9: messaging_pb.ConfigureTopicResponse.retention:type_name -> messaging_pb.TopicRetention
69, // 10: messaging_pb.ConfigureTopicResponse.message_record_type:type_name -> schema_pb.RecordType
67, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic
67, // 12: messaging_pb.TopicExistsRequest.topic:type_name -> schema_pb.Topic
67, // 13: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic
67, // 14: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic
70, // 10: messaging_pb.ConfigureTopicResponse.message_record_type:type_name -> schema_pb.RecordType
68, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic
68, // 12: messaging_pb.TopicExistsRequest.topic:type_name -> schema_pb.Topic
68, // 13: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic
68, // 14: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic
17, // 15: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
68, // 16: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition
67, // 17: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic
67, // 18: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic
69, // 16: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition
68, // 17: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic
68, // 18: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic
17, // 19: messaging_pb.GetTopicConfigurationResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
8, // 20: messaging_pb.GetTopicConfigurationResponse.retention:type_name -> messaging_pb.TopicRetention
69, // 21: messaging_pb.GetTopicConfigurationResponse.message_record_type:type_name -> schema_pb.RecordType
67, // 22: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic
70, // 21: messaging_pb.GetTopicConfigurationResponse.message_record_type:type_name -> schema_pb.RecordType
68, // 22: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic
24, // 23: messaging_pb.GetTopicPublishersResponse.publishers:type_name -> messaging_pb.TopicPublisher
67, // 24: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic
68, // 24: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic
25, // 25: messaging_pb.GetTopicSubscribersResponse.subscribers:type_name -> messaging_pb.TopicSubscriber
68, // 26: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition
68, // 27: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition
67, // 28: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic
69, // 26: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition
69, // 27: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition
68, // 28: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic
17, // 29: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
52, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage
54, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage
@ -4381,80 +4455,82 @@ var file_mq_broker_proto_depIdxs = []int32{
60, // 41: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage
61, // 42: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage
62, // 43: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage
63, // 44: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage
31, // 45: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage
64, // 46: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage
65, // 47: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage
66, // 48: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage
67, // 49: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic
67, // 50: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic
67, // 51: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic
68, // 52: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition
70, // 53: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> filer_pb.LogEntry
67, // 54: messaging_pb.GetPartitionRangeInfoRequest.topic:type_name -> schema_pb.Topic
68, // 55: messaging_pb.GetPartitionRangeInfoRequest.partition:type_name -> schema_pb.Partition
48, // 56: messaging_pb.GetPartitionRangeInfoResponse.offset_range:type_name -> messaging_pb.OffsetRangeInfo
49, // 57: messaging_pb.GetPartitionRangeInfoResponse.timestamp_range:type_name -> messaging_pb.TimestampRangeInfo
3, // 58: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats
67, // 59: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic
68, // 60: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition
68, // 61: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition
17, // 62: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment
68, // 63: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition
67, // 64: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic
68, // 65: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition
67, // 66: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic
68, // 67: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition
67, // 68: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic
71, // 69: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset
72, // 70: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType
67, // 71: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic
68, // 72: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition
0, // 73: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
4, // 74: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest
6, // 75: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest
11, // 76: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest
13, // 77: messaging_pb.SeaweedMessaging.TopicExists:input_type -> messaging_pb.TopicExistsRequest
9, // 78: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
15, // 79: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
18, // 80: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest
20, // 81: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest
22, // 82: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest
26, // 83: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
40, // 84: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest
42, // 85: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest
28, // 86: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest
32, // 87: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest
36, // 88: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest
34, // 89: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest
38, // 90: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest
44, // 91: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest
46, // 92: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:input_type -> messaging_pb.GetPartitionRangeInfoRequest
1, // 93: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
5, // 94: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse
7, // 95: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse
12, // 96: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse
14, // 97: messaging_pb.SeaweedMessaging.TopicExists:output_type -> messaging_pb.TopicExistsResponse
10, // 98: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
16, // 99: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
19, // 100: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse
21, // 101: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse
23, // 102: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse
27, // 103: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
41, // 104: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse
43, // 105: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse
29, // 106: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse
33, // 107: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse
37, // 108: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse
35, // 109: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse
39, // 110: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse
45, // 111: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse
47, // 112: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:output_type -> messaging_pb.GetPartitionRangeInfoResponse
93, // [93:113] is the sub-list for method output_type
73, // [73:93] is the sub-list for method input_type
73, // [73:73] is the sub-list for extension type_name
73, // [73:73] is the sub-list for extension extendee
0, // [0:73] is the sub-list for field type_name
63, // 44: messaging_pb.SubscribeMessageRequest.seek:type_name -> messaging_pb.SubscribeMessageRequest.SeekMessage
64, // 45: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage
31, // 46: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage
65, // 47: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage
66, // 48: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage
67, // 49: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage
68, // 50: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic
68, // 51: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic
68, // 52: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic
69, // 53: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition
71, // 54: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> filer_pb.LogEntry
68, // 55: messaging_pb.GetPartitionRangeInfoRequest.topic:type_name -> schema_pb.Topic
69, // 56: messaging_pb.GetPartitionRangeInfoRequest.partition:type_name -> schema_pb.Partition
48, // 57: messaging_pb.GetPartitionRangeInfoResponse.offset_range:type_name -> messaging_pb.OffsetRangeInfo
49, // 58: messaging_pb.GetPartitionRangeInfoResponse.timestamp_range:type_name -> messaging_pb.TimestampRangeInfo
3, // 59: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats
68, // 60: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic
69, // 61: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition
69, // 62: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition
17, // 63: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment
69, // 64: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition
68, // 65: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic
69, // 66: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition
68, // 67: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic
69, // 68: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition
68, // 69: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic
72, // 70: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset
73, // 71: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType
73, // 72: messaging_pb.SubscribeMessageRequest.SeekMessage.offset_type:type_name -> schema_pb.OffsetType
68, // 73: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic
69, // 74: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition
0, // 75: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
4, // 76: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest
6, // 77: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest
11, // 78: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest
13, // 79: messaging_pb.SeaweedMessaging.TopicExists:input_type -> messaging_pb.TopicExistsRequest
9, // 80: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
15, // 81: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
18, // 82: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest
20, // 83: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest
22, // 84: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest
26, // 85: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
40, // 86: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest
42, // 87: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest
28, // 88: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest
32, // 89: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest
36, // 90: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest
34, // 91: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest
38, // 92: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest
44, // 93: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest
46, // 94: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:input_type -> messaging_pb.GetPartitionRangeInfoRequest
1, // 95: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
5, // 96: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse
7, // 97: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse
12, // 98: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse
14, // 99: messaging_pb.SeaweedMessaging.TopicExists:output_type -> messaging_pb.TopicExistsResponse
10, // 100: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
16, // 101: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
19, // 102: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse
21, // 103: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse
23, // 104: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse
27, // 105: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
41, // 106: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse
43, // 107: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse
29, // 108: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse
33, // 109: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse
37, // 110: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse
35, // 111: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse
39, // 112: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse
45, // 113: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse
47, // 114: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:output_type -> messaging_pb.GetPartitionRangeInfoResponse
95, // [95:115] is the sub-list for method output_type
75, // [75:95] is the sub-list for method input_type
75, // [75:75] is the sub-list for extension type_name
75, // [75:75] is the sub-list for extension extendee
0, // [0:75] is the sub-list for field type_name
}
func init() { file_mq_broker_proto_init() }
@ -4488,6 +4564,7 @@ func file_mq_broker_proto_init() {
file_mq_broker_proto_msgTypes[36].OneofWrappers = []any{
(*SubscribeMessageRequest_Init)(nil),
(*SubscribeMessageRequest_Ack)(nil),
(*SubscribeMessageRequest_Seek)(nil),
}
file_mq_broker_proto_msgTypes[37].OneofWrappers = []any{
(*SubscribeMessageResponse_Ctrl)(nil),
@ -4504,7 +4581,7 @@ func file_mq_broker_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_broker_proto_rawDesc), len(file_mq_broker_proto_rawDesc)),
NumEnums: 0,
NumMessages: 67,
NumMessages: 68,
NumExtensions: 0,
NumServices: 1,
},

Loading…
Cancel
Save