diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 50dc6b458..22ca1986e 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -48,6 +48,9 @@ type Handler struct { seaweedMQHandler *integration.SeaweedMQHandler useSeaweedMQ bool + // SMQ offset storage for consumer group offsets + smqOffsetStorage *offset.SMQOffsetStorage + // Consumer group coordination groupCoordinator *consumer.GroupCoordinator @@ -97,10 +100,20 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, err return nil, err } + // Create SMQ offset storage using the first master as filer address + masterAddresses := strings.Split(masters, ",") + filerAddress := masterAddresses[0] // Use first master as filer + + smqOffsetStorage, err := offset.NewSMQOffsetStorage(filerAddress) + if err != nil { + return nil, fmt.Errorf("failed to create SMQ offset storage: %w", err) + } + return &Handler{ topics: make(map[string]*TopicInfo), // Keep for compatibility ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility seaweedMQHandler: smqHandler, + smqOffsetStorage: smqOffsetStorage, useSeaweedMQ: true, groupCoordinator: consumer.NewGroupCoordinator(), }, nil @@ -1880,3 +1893,34 @@ func (h *Handler) IsSchemaEnabled() bool { func (h *Handler) IsBrokerIntegrationEnabled() bool { return h.IsSchemaEnabled() && h.brokerClient != nil } + + +// commitOffsetToSMQ commits offset using SMQ storage +func (h *Handler) commitOffsetToSMQ(key offset.ConsumerOffsetKey, offsetValue int64, metadata string) error { + if h.smqOffsetStorage == nil { + return fmt.Errorf("SMQ offset storage not initialized") + } + + // Save to SMQ storage - use current timestamp and size 0 as placeholders + // since SMQ storage primarily tracks the committed offset + return h.smqOffsetStorage.SaveConsumerOffset(key, offsetValue, time.Now().UnixNano(), 0) +} + +// fetchOffsetFromSMQ fetches offset using SMQ storage +func (h *Handler) fetchOffsetFromSMQ(key offset.ConsumerOffsetKey) (int64, string, error) { + if h.smqOffsetStorage == nil { + return -1, "", fmt.Errorf("SMQ offset storage not initialized") + } + + entries, err := h.smqOffsetStorage.LoadConsumerOffsets(key) + if err != nil { + return -1, "", err + } + + if len(entries) == 0 { + return -1, "", nil // No committed offset + } + + // Return the committed offset (metadata is not stored in SMQ format) + return entries[0].KafkaOffset, "", nil +} diff --git a/weed/mq/kafka/protocol/offset_handlers_test.go b/weed/mq/kafka/protocol/offset_handlers_test.go new file mode 100644 index 000000000..1e0ba7aa5 --- /dev/null +++ b/weed/mq/kafka/protocol/offset_handlers_test.go @@ -0,0 +1,162 @@ +package protocol + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" +) + +func TestOffsetCommitHandlerIntegration(t *testing.T) { + // Test that the offset commit handler properly uses SMQ storage + + // Test ConsumerOffsetKey creation + key := offset.ConsumerOffsetKey{ + Topic: "test-topic", + Partition: 0, + ConsumerGroup: "test-group", + ConsumerGroupInstance: "test-instance", + } + + if key.Topic != "test-topic" { + t.Errorf("Expected topic 'test-topic', got %s", key.Topic) + } + + if key.Partition != 0 { + t.Errorf("Expected partition 0, got %d", key.Partition) + } + + if key.ConsumerGroup != "test-group" { + t.Errorf("Expected consumer group 'test-group', got %s", key.ConsumerGroup) + } + + if key.ConsumerGroupInstance != "test-instance" { + t.Errorf("Expected instance 'test-instance', got %s", key.ConsumerGroupInstance) + } +} + +func TestOffsetCommitToSMQ_WithoutStorage(t *testing.T) { + // Test error handling when SMQ storage is not initialized + handler := &Handler{ + useSeaweedMQ: true, + smqOffsetStorage: nil, // Not initialized + } + + key := offset.ConsumerOffsetKey{ + Topic: "test-topic", + Partition: 0, + ConsumerGroup: "test-group", + } + + err := handler.commitOffsetToSMQ(key, 100, "test-metadata") + if err == nil { + t.Error("Expected error when SMQ storage not initialized, got nil") + } + + expectedError := "SMQ offset storage not initialized" + if err.Error() != expectedError { + t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error()) + } +} + +func TestFetchOffsetFromSMQ_WithoutStorage(t *testing.T) { + // Test error handling when SMQ storage is not initialized + handler := &Handler{ + useSeaweedMQ: true, + smqOffsetStorage: nil, // Not initialized + } + + key := offset.ConsumerOffsetKey{ + Topic: "test-topic", + Partition: 0, + ConsumerGroup: "test-group", + } + + offset, metadata, err := handler.fetchOffsetFromSMQ(key) + if err == nil { + t.Error("Expected error when SMQ storage not initialized, got nil") + } + + if offset != -1 { + t.Errorf("Expected offset -1, got %d", offset) + } + + if metadata != "" { + t.Errorf("Expected empty metadata, got '%s'", metadata) + } +} + +func TestOffsetHandlers_StructureValidation(t *testing.T) { + // Validate that offset commit/fetch request/response structures are properly formed + + // Test OffsetCommitRequest structure + request := OffsetCommitRequest{ + GroupID: "test-group", + GenerationID: 1, + MemberID: "test-member", + GroupInstanceID: "test-instance", + RetentionTime: -1, + Topics: []OffsetCommitTopic{ + { + Name: "test-topic", + Partitions: []OffsetCommitPartition{ + { + Index: 0, + Offset: 100, + LeaderEpoch: -1, + Metadata: "test-metadata", + }, + }, + }, + }, + } + + if request.GroupID != "test-group" { + t.Errorf("Expected group ID 'test-group', got %s", request.GroupID) + } + + if len(request.Topics) != 1 { + t.Errorf("Expected 1 topic, got %d", len(request.Topics)) + } + + if request.Topics[0].Name != "test-topic" { + t.Errorf("Expected topic 'test-topic', got %s", request.Topics[0].Name) + } + + if len(request.Topics[0].Partitions) != 1 { + t.Errorf("Expected 1 partition, got %d", len(request.Topics[0].Partitions)) + } + + partition := request.Topics[0].Partitions[0] + if partition.Index != 0 { + t.Errorf("Expected partition index 0, got %d", partition.Index) + } + + if partition.Offset != 100 { + t.Errorf("Expected offset 100, got %d", partition.Offset) + } + + // Test OffsetFetchRequest structure + fetchRequest := OffsetFetchRequest{ + GroupID: "test-group", + GroupInstanceID: "test-instance", + Topics: []OffsetFetchTopic{ + { + Name: "test-topic", + Partitions: []int32{0, 1, 2}, + }, + }, + RequireStable: false, + } + + if fetchRequest.GroupID != "test-group" { + t.Errorf("Expected group ID 'test-group', got %s", fetchRequest.GroupID) + } + + if len(fetchRequest.Topics) != 1 { + t.Errorf("Expected 1 topic, got %d", len(fetchRequest.Topics)) + } + + if len(fetchRequest.Topics[0].Partitions) != 3 { + t.Errorf("Expected 3 partitions, got %d", len(fetchRequest.Topics[0].Partitions)) + } +} diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 7ecedc2ae..712b56a88 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -6,6 +6,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" ) // OffsetCommit API (key 8) - Commit consumer group offsets @@ -145,10 +146,25 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, requestBody []byte) ( } for _, partition := range topic.Partitions { - // Commit without strict assignment checks + // Create consumer offset key for SMQ storage + key := offset.ConsumerOffsetKey{ + Topic: topic.Name, + Partition: partition.Index, + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, + } + + // Commit offset using SMQ storage if available var errorCode int16 = ErrorCodeNone - if err := h.commitOffset(group, topic.Name, partition.Index, partition.Offset, partition.Metadata); err != nil { - errorCode = ErrorCodeOffsetMetadataTooLarge + if h.useSeaweedMQ && h.smqOffsetStorage != nil { + if err := h.commitOffsetToSMQ(key, partition.Offset, partition.Metadata); err != nil { + errorCode = ErrorCodeOffsetMetadataTooLarge + } + } else { + // Fall back to in-memory storage + if err := h.commitOffset(group, topic.Name, partition.Index, partition.Offset, partition.Metadata); err != nil { + errorCode = ErrorCodeOffsetMetadataTooLarge + } } partitionResponse := OffsetCommitPartitionResponse{ @@ -211,16 +227,35 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, requestBody []byte) ([ // Fetch offsets for requested partitions for _, partition := range partitionsToFetch { - offset, metadata, err := h.fetchOffset(group, topic.Name, partition) + // Create consumer offset key for SMQ storage + key := offset.ConsumerOffsetKey{ + Topic: topic.Name, + Partition: partition, + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, + } + var fetchedOffset int64 = -1 + var metadata string = "" var errorCode int16 = ErrorCodeNone - if err != nil { - errorCode = ErrorCodeOffsetLoadInProgress // Generic error + + // Fetch offset using SMQ storage if available + if h.useSeaweedMQ && h.smqOffsetStorage != nil { + if offset, meta, err := h.fetchOffsetFromSMQ(key); err == nil { + fetchedOffset = offset + metadata = meta + } + } else { + // Fall back to in-memory storage + if offset, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil { + fetchedOffset = offset + metadata = meta + } } partitionResponse := OffsetFetchPartitionResponse{ Index: partition, - Offset: offset, + Offset: fetchedOffset, LeaderEpoch: -1, // Not implemented Metadata: metadata, ErrorCode: errorCode,