diff --git a/test/kafka/seaweedmq_integration_test.go b/test/kafka/seaweedmq_integration_test.go index 7b8b7ed58..73a8ba97e 100644 --- a/test/kafka/seaweedmq_integration_test.go +++ b/test/kafka/seaweedmq_integration_test.go @@ -110,7 +110,7 @@ func testSeaweedMQTopicLifecycle(t *testing.T, addr string) { // Test CreateTopics request topicName := "seaweedmq-test-topic" createReq := buildCreateTopicsRequestCustom(topicName) - + _, err = conn.Write(createReq) if err != nil { t.Fatalf("Failed to send CreateTopics: %v", err) @@ -143,7 +143,7 @@ func testSeaweedMQTopicLifecycle(t *testing.T, addr string) { func testSeaweedMQProduceConsume(t *testing.T, addr string) { // This would be a more comprehensive test in a full implementation // For now, just test that Produce requests are handled - + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) if err != nil { t.Fatalf("Failed to connect: %v", err) @@ -174,65 +174,65 @@ func testSeaweedMQProduceConsume(t *testing.T, addr string) { // TODO: Send a Produce request and verify it works with SeaweedMQ // This would require building a proper Kafka Produce request - + t.Logf("SeaweedMQ produce/consume test placeholder completed") } // buildCreateTopicsRequestCustom creates a CreateTopics request for a specific topic func buildCreateTopicsRequestCustom(topicName string) []byte { clientID := "seaweedmq-test-client" - + // Approximate message size messageSize := 2 + 2 + 4 + 2 + len(clientID) + 4 + 4 + 2 + len(topicName) + 4 + 2 + 4 + 4 - + request := make([]byte, 0, messageSize+4) - + // Message size placeholder sizePos := len(request) request = append(request, 0, 0, 0, 0) - + // API key (CreateTopics = 19) request = append(request, 0, 19) - + // API version request = append(request, 0, 4) - + // Correlation ID request = append(request, 0, 0, 0x30, 0x42) // 12354 - + // Client ID request = append(request, 0, byte(len(clientID))) request = append(request, []byte(clientID)...) - + // Timeout (5000ms) request = append(request, 0, 0, 0x13, 0x88) - + // Topics count (1) request = append(request, 0, 0, 0, 1) - + // Topic name request = append(request, 0, byte(len(topicName))) request = append(request, []byte(topicName)...) - + // Num partitions (1) request = append(request, 0, 0, 0, 1) - + // Replication factor (1) request = append(request, 0, 1) - + // Configs count (0) request = append(request, 0, 0, 0, 0) - + // Topic timeout (5000ms) request = append(request, 0, 0, 0x13, 0x88) - + // Fix message size actualSize := len(request) - 4 request[sizePos] = byte(actualSize >> 24) request[sizePos+1] = byte(actualSize >> 16) request[sizePos+2] = byte(actualSize >> 8) request[sizePos+3] = byte(actualSize) - + return request } @@ -285,8 +285,8 @@ func TestSeaweedMQGateway_ModeSelection(t *testing.T) { // TestSeaweedMQGateway_ConfigValidation tests configuration validation func TestSeaweedMQGateway_ConfigValidation(t *testing.T) { testCases := []struct { - name string - options gateway.Options + name string + options gateway.Options shouldWork bool }{ { @@ -321,11 +321,11 @@ func TestSeaweedMQGateway_ConfigValidation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { server := gateway.NewServer(tc.options) err := server.Start() - + if tc.shouldWork && err != nil { t.Errorf("Expected config to work, got error: %v", err) } - + if err == nil { server.Close() t.Logf("Config test passed for %s", tc.name) diff --git a/weed/command/mq_kafka_gateway.go b/weed/command/mq_kafka_gateway.go index 5de8414be..cc77f5eb7 100644 --- a/weed/command/mq_kafka_gateway.go +++ b/weed/command/mq_kafka_gateway.go @@ -1,31 +1,31 @@ package command import ( - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" ) var ( - mqKafkaGatewayOptions mqKafkaGatewayOpts + mqKafkaGatewayOptions mqKafkaGatewayOpts ) type mqKafkaGatewayOpts struct { - listen *string - agentAddress *string - seaweedMode *bool + listen *string + agentAddress *string + seaweedMode *bool } func init() { - cmdMqKafkaGateway.Run = runMqKafkaGateway - mqKafkaGatewayOptions.listen = cmdMqKafkaGateway.Flag.String("listen", ":9092", "Kafka gateway listen address") - mqKafkaGatewayOptions.agentAddress = cmdMqKafkaGateway.Flag.String("agent", "", "SeaweedMQ Agent address (e.g., localhost:17777)") - mqKafkaGatewayOptions.seaweedMode = cmdMqKafkaGateway.Flag.Bool("seaweedmq", false, "Use SeaweedMQ backend instead of in-memory stub") + cmdMqKafkaGateway.Run = runMqKafkaGateway + mqKafkaGatewayOptions.listen = cmdMqKafkaGateway.Flag.String("listen", ":9092", "Kafka gateway listen address") + mqKafkaGatewayOptions.agentAddress = cmdMqKafkaGateway.Flag.String("agent", "", "SeaweedMQ Agent address (e.g., localhost:17777)") + mqKafkaGatewayOptions.seaweedMode = cmdMqKafkaGateway.Flag.Bool("seaweedmq", false, "Use SeaweedMQ backend instead of in-memory stub") } var cmdMqKafkaGateway = &Command{ - UsageLine: "mq.kafka.gateway [-listen=:9092] [-agent=localhost:17777] [-seaweedmq]", - Short: "start a Kafka wire-protocol gateway for SeaweedMQ", - Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ. + UsageLine: "mq.kafka.gateway [-listen=:9092] [-agent=localhost:17777] [-seaweedmq]", + Short: "start a Kafka wire-protocol gateway for SeaweedMQ", + Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ. By default, uses an in-memory stub for development and testing. Use -seaweedmq -agent=
to connect to a real SeaweedMQ Agent for production. @@ -35,42 +35,40 @@ This is experimental and currently supports a minimal subset for development. } func runMqKafkaGateway(cmd *Command, args []string) bool { - // Validate options - if *mqKafkaGatewayOptions.seaweedMode && *mqKafkaGatewayOptions.agentAddress == "" { - glog.Fatalf("SeaweedMQ mode requires -agent address") - return false - } + // Validate options + if *mqKafkaGatewayOptions.seaweedMode && *mqKafkaGatewayOptions.agentAddress == "" { + glog.Fatalf("SeaweedMQ mode requires -agent address") + return false + } - srv := gateway.NewServer(gateway.Options{ - Listen: *mqKafkaGatewayOptions.listen, - AgentAddress: *mqKafkaGatewayOptions.agentAddress, - UseSeaweedMQ: *mqKafkaGatewayOptions.seaweedMode, - }) + srv := gateway.NewServer(gateway.Options{ + Listen: *mqKafkaGatewayOptions.listen, + AgentAddress: *mqKafkaGatewayOptions.agentAddress, + UseSeaweedMQ: *mqKafkaGatewayOptions.seaweedMode, + }) - mode := "in-memory" - if *mqKafkaGatewayOptions.seaweedMode { - mode = "SeaweedMQ (" + *mqKafkaGatewayOptions.agentAddress + ")" - } - glog.V(0).Infof("Starting MQ Kafka Gateway on %s with %s backend", *mqKafkaGatewayOptions.listen, mode) - if err := srv.Start(); err != nil { - glog.Fatalf("mq kafka gateway start: %v", err) - return false - } + mode := "in-memory" + if *mqKafkaGatewayOptions.seaweedMode { + mode = "SeaweedMQ (" + *mqKafkaGatewayOptions.agentAddress + ")" + } + glog.V(0).Infof("Starting MQ Kafka Gateway on %s with %s backend", *mqKafkaGatewayOptions.listen, mode) + if err := srv.Start(); err != nil { + glog.Fatalf("mq kafka gateway start: %v", err) + return false + } - // Set up graceful shutdown - defer func() { - glog.V(0).Infof("Shutting down MQ Kafka Gateway...") - if err := srv.Close(); err != nil { - glog.Errorf("mq kafka gateway close: %v", err) - } - }() + // Set up graceful shutdown + defer func() { + glog.V(0).Infof("Shutting down MQ Kafka Gateway...") + if err := srv.Close(); err != nil { + glog.Errorf("mq kafka gateway close: %v", err) + } + }() - // Serve blocks until closed - if err := srv.Wait(); err != nil { - glog.Errorf("mq kafka gateway wait: %v", err) - return false - } - return true + // Serve blocks until closed + if err := srv.Wait(); err != nil { + glog.Errorf("mq kafka gateway wait: %v", err) + return false + } + return true } - - diff --git a/weed/mq/KAFKA_PHASE3_PLAN.md b/weed/mq/KAFKA_PHASE3_PLAN.md new file mode 100644 index 000000000..3be8abd8e --- /dev/null +++ b/weed/mq/KAFKA_PHASE3_PLAN.md @@ -0,0 +1,242 @@ +# Phase 3: Consumer Groups & Advanced Kafka Features + +## Overview + +Phase 3 transforms the Kafka Gateway from a basic producer/consumer system into a full-featured, production-ready Kafka-compatible platform with consumer groups, advanced APIs, and enterprise features. + +## Goals + +- **Consumer Group Coordination**: Full distributed consumer support +- **Advanced Kafka APIs**: Offset management, group coordination, heartbeats +- **Performance & Scalability**: Connection pooling, batching, compression +- **Production Features**: Metrics, monitoring, advanced configuration +- **Enterprise Ready**: Security, observability, operational tools + +## Core Features + +### 1. Consumer Group Coordination + +**New Kafka APIs to Implement:** +- **JoinGroup** (API 11): Consumer joins a consumer group +- **SyncGroup** (API 14): Coordinate partition assignments +- **Heartbeat** (API 12): Keep consumer alive in group +- **LeaveGroup** (API 13): Clean consumer departure +- **OffsetCommit** (API 8): Commit consumer offsets +- **OffsetFetch** (API 9): Retrieve committed offsets +- **DescribeGroups** (API 15): Get group metadata + +**Consumer Group Manager:** +- Group membership tracking +- Partition assignment strategies (Range, RoundRobin) +- Rebalancing coordination +- Offset storage and retrieval +- Consumer liveness monitoring + +### 2. Advanced Record Processing + +**Record Batch Improvements:** +- Full Kafka record format parsing (v0, v1, v2) +- Compression support (gzip, snappy, lz4, zstd) +- Proper CRC validation +- Transaction markers handling +- Timestamp extraction and validation + +**Performance Optimizations:** +- Record batching for SeaweedMQ +- Connection pooling to Agent +- Async publishing with acknowledgment batching +- Memory pooling for large messages + +### 3. Enhanced Protocol Support + +**Additional APIs:** +- **FindCoordinator** (API 10): Locate group coordinator +- **DescribeConfigs** (API 32): Get broker/topic configs +- **AlterConfigs** (API 33): Modify configurations +- **DescribeLogDirs** (API 35): Storage information +- **CreatePartitions** (API 37): Dynamic partition scaling + +**Protocol Improvements:** +- Multiple API version support +- Better error code mapping +- Request/response correlation tracking +- Protocol version negotiation + +### 4. Operational Features + +**Metrics & Monitoring:** +- Prometheus metrics endpoint +- Consumer group lag monitoring +- Throughput and latency metrics +- Error rate tracking +- Connection pool metrics + +**Health & Diagnostics:** +- Health check endpoints +- Debug APIs for troubleshooting +- Consumer group status reporting +- Partition assignment visualization + +**Configuration Management:** +- Dynamic configuration updates +- Topic-level settings +- Consumer group policies +- Rate limiting and quotas + +## Implementation Plan + +### Step 1: Consumer Group Foundation (2-3 days) +1. Consumer group state management +2. Basic JoinGroup/SyncGroup APIs +3. Partition assignment logic +4. Group membership tracking + +### Step 2: Offset Management (1-2 days) +1. OffsetCommit/OffsetFetch APIs +2. Offset storage in SeaweedMQ +3. Consumer position tracking +4. Offset retention policies + +### Step 3: Consumer Coordination (1-2 days) +1. Heartbeat mechanism +2. Group rebalancing +3. Consumer failure detection +4. LeaveGroup handling + +### Step 4: Advanced Record Processing (2-3 days) +1. Full record batch parsing +2. Compression codec support +3. Performance optimizations +4. Memory management + +### Step 5: Enhanced APIs (1-2 days) +1. FindCoordinator implementation +2. DescribeGroups functionality +3. Configuration APIs +4. Administrative tools + +### Step 6: Production Features (2-3 days) +1. Metrics and monitoring +2. Health checks +3. Operational dashboards +4. Performance tuning + +## Architecture Changes + +### Consumer Group Coordinator +``` +┌─────────────────────────────────────────────────┐ +│ Gateway Server │ +├─────────────────────────────────────────────────┤ +│ Protocol Handler │ +│ ├── Consumer Group Coordinator │ +│ │ ├── Group State Machine │ +│ │ ├── Partition Assignment │ +│ │ ├── Rebalancing Logic │ +│ │ └── Offset Manager │ +│ ├── Enhanced Record Processor │ +│ └── Metrics Collector │ +├─────────────────────────────────────────────────┤ +│ SeaweedMQ Integration Layer │ +│ ├── Connection Pool │ +│ ├── Batch Publisher │ +│ └── Offset Storage │ +└─────────────────────────────────────────────────┘ +``` + +### Consumer Group State Management +``` +Consumer Group States: +- Empty: No active consumers +- PreparingRebalance: Waiting for consumers to join +- CompletingRebalance: Assigning partitions +- Stable: Normal operation +- Dead: Group marked for deletion + +Consumer States: +- Unknown: Initial state +- MemberPending: Joining group +- MemberStable: Active in group +- MemberLeaving: Graceful departure +``` + +## Success Criteria + +### Functional Requirements +- ✅ Consumer groups work with multiple consumers +- ✅ Automatic partition rebalancing +- ✅ Offset commit/fetch functionality +- ✅ Consumer failure handling +- ✅ Full Kafka record format support +- ✅ Compression support for major codecs + +### Performance Requirements +- ✅ Handle 10k+ messages/second per partition +- ✅ Support 100+ consumer groups simultaneously +- ✅ Sub-100ms consumer group rebalancing +- ✅ Memory usage < 1GB for 1000 consumers + +### Compatibility Requirements +- ✅ Compatible with kafka-go, Sarama, and other Go clients +- ✅ Support Kafka 2.8+ client protocol versions +- ✅ Backwards compatible with Phase 1&2 implementations + +## Testing Strategy + +### Unit Tests +- Consumer group state transitions +- Partition assignment algorithms +- Offset management logic +- Record parsing and validation + +### Integration Tests +- Multi-consumer group scenarios +- Consumer failures and recovery +- Rebalancing under load +- SeaweedMQ storage integration + +### End-to-End Tests +- Real Kafka client libraries (kafka-go, Sarama) +- Producer/consumer workflows +- Consumer group coordination +- Performance benchmarking + +### Load Tests +- 1000+ concurrent consumers +- High-throughput scenarios +- Memory and CPU profiling +- Failure recovery testing + +## Deliverables + +1. **Consumer Group Coordinator** - Full group management system +2. **Enhanced Protocol Handler** - 13+ Kafka APIs supported +3. **Advanced Record Processing** - Compression, batching, validation +4. **Metrics & Monitoring** - Prometheus integration, dashboards +5. **Performance Optimizations** - Connection pooling, memory management +6. **Comprehensive Testing** - Unit, integration, E2E, and load tests +7. **Documentation** - API docs, deployment guides, troubleshooting + +## Risk Mitigation + +### Technical Risks +- **Consumer group complexity**: Start with basic Range assignment, expand gradually +- **Performance bottlenecks**: Profile early, optimize incrementally +- **SeaweedMQ integration**: Maintain compatibility layer for fallback + +### Operational Risks +- **Breaking changes**: Maintain Phase 2 compatibility throughout +- **Resource usage**: Implement proper resource limits and monitoring +- **Data consistency**: Ensure offset storage reliability + +## Post-Phase 3 Vision + +After Phase 3, the SeaweedFS Kafka Gateway will be: + +- **Production Ready**: Handle enterprise Kafka workloads +- **Highly Compatible**: Work with major Kafka client libraries +- **Operationally Excellent**: Full observability and management tools +- **Performant**: Meet enterprise throughput requirements +- **Reliable**: Handle failures gracefully with strong consistency guarantees + +This positions SeaweedFS as a compelling alternative to traditional Kafka deployments, especially for organizations already using SeaweedFS for storage and wanting unified message queue capabilities. diff --git a/weed/mq/kafka/consumer/assignment.go b/weed/mq/kafka/consumer/assignment.go new file mode 100644 index 000000000..9f85fa3bf --- /dev/null +++ b/weed/mq/kafka/consumer/assignment.go @@ -0,0 +1,289 @@ +package consumer + +import ( + "sort" +) + +// AssignmentStrategy defines how partitions are assigned to consumers +type AssignmentStrategy interface { + Name() string + Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment +} + +// RangeAssignmentStrategy implements the Range assignment strategy +// Assigns partitions in ranges to consumers, similar to Kafka's range assignor +type RangeAssignmentStrategy struct{} + +func (r *RangeAssignmentStrategy) Name() string { + return "range" +} + +func (r *RangeAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Assign partitions for each topic + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + // Sort partitions for consistent assignment + sort.Slice(partitions, func(i, j int) bool { + return partitions[i] < partitions[j] + }) + + // Find members subscribed to this topic + topicMembers := make([]*GroupMember, 0) + for _, member := range sortedMembers { + for _, subscribedTopic := range member.Subscription { + if subscribedTopic == topic { + topicMembers = append(topicMembers, member) + break + } + } + } + + if len(topicMembers) == 0 { + continue + } + + // Assign partitions to members using range strategy + numPartitions := len(partitions) + numMembers := len(topicMembers) + partitionsPerMember := numPartitions / numMembers + remainingPartitions := numPartitions % numMembers + + partitionIndex := 0 + for memberIndex, member := range topicMembers { + // Calculate how many partitions this member should get + memberPartitions := partitionsPerMember + if memberIndex < remainingPartitions { + memberPartitions++ + } + + // Assign partitions to this member + for i := 0; i < memberPartitions && partitionIndex < numPartitions; i++ { + assignment := PartitionAssignment{ + Topic: topic, + Partition: partitions[partitionIndex], + } + assignments[member.ID] = append(assignments[member.ID], assignment) + partitionIndex++ + } + } + } + + return assignments +} + +// RoundRobinAssignmentStrategy implements the RoundRobin assignment strategy +// Distributes partitions evenly across all consumers in round-robin fashion +type RoundRobinAssignmentStrategy struct{} + +func (rr *RoundRobinAssignmentStrategy) Name() string { + return "roundrobin" +} + +func (rr *RoundRobinAssignmentStrategy) Assign(members []*GroupMember, topicPartitions map[string][]int32) map[string][]PartitionAssignment { + if len(members) == 0 { + return make(map[string][]PartitionAssignment) + } + + assignments := make(map[string][]PartitionAssignment) + for _, member := range members { + assignments[member.ID] = make([]PartitionAssignment, 0) + } + + // Sort members for consistent assignment + sortedMembers := make([]*GroupMember, len(members)) + copy(sortedMembers, members) + sort.Slice(sortedMembers, func(i, j int) bool { + return sortedMembers[i].ID < sortedMembers[j].ID + }) + + // Collect all partition assignments across all topics + allAssignments := make([]PartitionAssignment, 0) + + // Get all subscribed topics + subscribedTopics := make(map[string]bool) + for _, member := range members { + for _, topic := range member.Subscription { + subscribedTopics[topic] = true + } + } + + // Collect all partitions from all subscribed topics + for topic := range subscribedTopics { + partitions, exists := topicPartitions[topic] + if !exists { + continue + } + + for _, partition := range partitions { + allAssignments = append(allAssignments, PartitionAssignment{ + Topic: topic, + Partition: partition, + }) + } + } + + // Sort assignments for consistent distribution + sort.Slice(allAssignments, func(i, j int) bool { + if allAssignments[i].Topic != allAssignments[j].Topic { + return allAssignments[i].Topic < allAssignments[j].Topic + } + return allAssignments[i].Partition < allAssignments[j].Partition + }) + + // Distribute partitions in round-robin fashion + memberIndex := 0 + for _, assignment := range allAssignments { + // Find a member that is subscribed to this topic + assigned := false + startIndex := memberIndex + + for !assigned { + member := sortedMembers[memberIndex] + + // Check if this member is subscribed to the topic + subscribed := false + for _, topic := range member.Subscription { + if topic == assignment.Topic { + subscribed = true + break + } + } + + if subscribed { + assignments[member.ID] = append(assignments[member.ID], assignment) + assigned = true + } + + memberIndex = (memberIndex + 1) % len(sortedMembers) + + // Prevent infinite loop if no member is subscribed to this topic + if memberIndex == startIndex && !assigned { + break + } + } + } + + return assignments +} + +// GetAssignmentStrategy returns the appropriate assignment strategy +func GetAssignmentStrategy(name string) AssignmentStrategy { + switch name { + case "range": + return &RangeAssignmentStrategy{} + case "roundrobin": + return &RoundRobinAssignmentStrategy{} + default: + // Default to range strategy + return &RangeAssignmentStrategy{} + } +} + +// AssignPartitions performs partition assignment for a consumer group +func (group *ConsumerGroup) AssignPartitions(topicPartitions map[string][]int32) { + if len(group.Members) == 0 { + return + } + + // Convert members map to slice + members := make([]*GroupMember, 0, len(group.Members)) + for _, member := range group.Members { + if member.State == MemberStateStable || member.State == MemberStatePending { + members = append(members, member) + } + } + + if len(members) == 0 { + return + } + + // Get assignment strategy + strategy := GetAssignmentStrategy(group.Protocol) + assignments := strategy.Assign(members, topicPartitions) + + // Apply assignments to members + for memberID, assignment := range assignments { + if member, exists := group.Members[memberID]; exists { + member.Assignment = assignment + } + } +} + +// GetMemberAssignments returns the current partition assignments for all members +func (group *ConsumerGroup) GetMemberAssignments() map[string][]PartitionAssignment { + group.Mu.RLock() + defer group.Mu.RUnlock() + + assignments := make(map[string][]PartitionAssignment) + for memberID, member := range group.Members { + assignments[memberID] = make([]PartitionAssignment, len(member.Assignment)) + copy(assignments[memberID], member.Assignment) + } + + return assignments +} + +// UpdateMemberSubscription updates a member's topic subscription +func (group *ConsumerGroup) UpdateMemberSubscription(memberID string, topics []string) { + group.Mu.Lock() + defer group.Mu.Unlock() + + member, exists := group.Members[memberID] + if !exists { + return + } + + // Update member subscription + member.Subscription = make([]string, len(topics)) + copy(member.Subscription, topics) + + // Update group's subscribed topics + group.SubscribedTopics = make(map[string]bool) + for _, m := range group.Members { + for _, topic := range m.Subscription { + group.SubscribedTopics[topic] = true + } + } +} + +// GetSubscribedTopics returns all topics subscribed by the group +func (group *ConsumerGroup) GetSubscribedTopics() []string { + group.Mu.RLock() + defer group.Mu.RUnlock() + + topics := make([]string, 0, len(group.SubscribedTopics)) + for topic := range group.SubscribedTopics { + topics = append(topics, topic) + } + + sort.Strings(topics) + return topics +} diff --git a/weed/mq/kafka/consumer/assignment_test.go b/weed/mq/kafka/consumer/assignment_test.go new file mode 100644 index 000000000..520200ed3 --- /dev/null +++ b/weed/mq/kafka/consumer/assignment_test.go @@ -0,0 +1,359 @@ +package consumer + +import ( + "reflect" + "sort" + "testing" +) + +func TestRangeAssignmentStrategy(t *testing.T) { + strategy := &RangeAssignmentStrategy{} + + if strategy.Name() != "range" { + t.Errorf("Expected strategy name 'range', got '%s'", strategy.Name()) + } + + // Test with 2 members, 4 partitions on one topic + members := []*GroupMember{ + { + ID: "member1", + Subscription: []string{"topic1"}, + }, + { + ID: "member2", + Subscription: []string{"topic1"}, + }, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify all members have assignments + if len(assignments) != 2 { + t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) + } + + // Verify total partitions assigned + totalAssigned := 0 + for _, assignment := range assignments { + totalAssigned += len(assignment) + } + + if totalAssigned != 4 { + t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) + } + + // Range assignment should distribute evenly: 2 partitions each + for memberID, assignment := range assignments { + if len(assignment) != 2 { + t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) + } + + // Verify all assignments are for the subscribed topic + for _, pa := range assignment { + if pa.Topic != "topic1" { + t.Errorf("Expected topic 'topic1', got '%s'", pa.Topic) + } + } + } +} + +func TestRangeAssignmentStrategy_UnevenPartitions(t *testing.T) { + strategy := &RangeAssignmentStrategy{} + + // Test with 3 members, 4 partitions - should distribute 2,1,1 + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1"}}, + {ID: "member2", Subscription: []string{"topic1"}}, + {ID: "member3", Subscription: []string{"topic1"}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Get assignment counts + counts := make([]int, 0, 3) + for _, assignment := range assignments { + counts = append(counts, len(assignment)) + } + sort.Ints(counts) + + // Should be distributed as [1, 1, 2] (first member gets extra partition) + expected := []int{1, 1, 2} + if !reflect.DeepEqual(counts, expected) { + t.Errorf("Expected partition distribution %v, got %v", expected, counts) + } +} + +func TestRangeAssignmentStrategy_MultipleTopics(t *testing.T) { + strategy := &RangeAssignmentStrategy{} + + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1", "topic2"}}, + {ID: "member2", Subscription: []string{"topic1"}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1}, + "topic2": {0, 1}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Member1 should get assignments from both topics + member1Assignments := assignments["member1"] + topicsAssigned := make(map[string]int) + for _, pa := range member1Assignments { + topicsAssigned[pa.Topic]++ + } + + if len(topicsAssigned) != 2 { + t.Errorf("Expected member1 to be assigned to 2 topics, got %d", len(topicsAssigned)) + } + + // Member2 should only get topic1 assignments + member2Assignments := assignments["member2"] + for _, pa := range member2Assignments { + if pa.Topic != "topic1" { + t.Errorf("Expected member2 to only get topic1, but got %s", pa.Topic) + } + } +} + +func TestRoundRobinAssignmentStrategy(t *testing.T) { + strategy := &RoundRobinAssignmentStrategy{} + + if strategy.Name() != "roundrobin" { + t.Errorf("Expected strategy name 'roundrobin', got '%s'", strategy.Name()) + } + + // Test with 2 members, 4 partitions on one topic + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1"}}, + {ID: "member2", Subscription: []string{"topic1"}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Verify all members have assignments + if len(assignments) != 2 { + t.Fatalf("Expected assignments for 2 members, got %d", len(assignments)) + } + + // Verify total partitions assigned + totalAssigned := 0 + for _, assignment := range assignments { + totalAssigned += len(assignment) + } + + if totalAssigned != 4 { + t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned) + } + + // Round robin should distribute evenly: 2 partitions each + for memberID, assignment := range assignments { + if len(assignment) != 2 { + t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) + } + } +} + +func TestRoundRobinAssignmentStrategy_MultipleTopics(t *testing.T) { + strategy := &RoundRobinAssignmentStrategy{} + + members := []*GroupMember{ + {ID: "member1", Subscription: []string{"topic1", "topic2"}}, + {ID: "member2", Subscription: []string{"topic1", "topic2"}}, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1}, + "topic2": {0, 1}, + } + + assignments := strategy.Assign(members, topicPartitions) + + // Each member should get 2 partitions (round robin across topics) + for memberID, assignment := range assignments { + if len(assignment) != 2 { + t.Errorf("Expected 2 partitions for member %s, got %d", memberID, len(assignment)) + } + } + + // Verify no partition is assigned twice + assignedPartitions := make(map[string]map[int32]bool) + for _, assignment := range assignments { + for _, pa := range assignment { + if assignedPartitions[pa.Topic] == nil { + assignedPartitions[pa.Topic] = make(map[int32]bool) + } + if assignedPartitions[pa.Topic][pa.Partition] { + t.Errorf("Partition %d of topic %s assigned multiple times", pa.Partition, pa.Topic) + } + assignedPartitions[pa.Topic][pa.Partition] = true + } + } +} + +func TestGetAssignmentStrategy(t *testing.T) { + rangeStrategy := GetAssignmentStrategy("range") + if rangeStrategy.Name() != "range" { + t.Errorf("Expected range strategy, got %s", rangeStrategy.Name()) + } + + rrStrategy := GetAssignmentStrategy("roundrobin") + if rrStrategy.Name() != "roundrobin" { + t.Errorf("Expected roundrobin strategy, got %s", rrStrategy.Name()) + } + + // Unknown strategy should default to range + defaultStrategy := GetAssignmentStrategy("unknown") + if defaultStrategy.Name() != "range" { + t.Errorf("Expected default strategy to be range, got %s", defaultStrategy.Name()) + } +} + +func TestConsumerGroup_AssignPartitions(t *testing.T) { + group := &ConsumerGroup{ + ID: "test-group", + Protocol: "range", + Members: map[string]*GroupMember{ + "member1": { + ID: "member1", + Subscription: []string{"topic1"}, + State: MemberStateStable, + }, + "member2": { + ID: "member2", + Subscription: []string{"topic1"}, + State: MemberStateStable, + }, + }, + } + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + group.AssignPartitions(topicPartitions) + + // Verify assignments were created + for memberID, member := range group.Members { + if len(member.Assignment) == 0 { + t.Errorf("Expected member %s to have partition assignments", memberID) + } + + // Verify all assignments are valid + for _, pa := range member.Assignment { + if pa.Topic != "topic1" { + t.Errorf("Unexpected topic assignment: %s", pa.Topic) + } + if pa.Partition < 0 || pa.Partition >= 4 { + t.Errorf("Unexpected partition assignment: %d", pa.Partition) + } + } + } +} + +func TestConsumerGroup_GetMemberAssignments(t *testing.T) { + group := &ConsumerGroup{ + Members: map[string]*GroupMember{ + "member1": { + ID: "member1", + Assignment: []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic1", Partition: 1}, + }, + }, + }, + } + + assignments := group.GetMemberAssignments() + + if len(assignments) != 1 { + t.Fatalf("Expected 1 member assignment, got %d", len(assignments)) + } + + member1Assignments := assignments["member1"] + if len(member1Assignments) != 2 { + t.Errorf("Expected 2 partition assignments for member1, got %d", len(member1Assignments)) + } + + // Verify assignment content + expectedAssignments := []PartitionAssignment{ + {Topic: "topic1", Partition: 0}, + {Topic: "topic1", Partition: 1}, + } + + if !reflect.DeepEqual(member1Assignments, expectedAssignments) { + t.Errorf("Expected assignments %v, got %v", expectedAssignments, member1Assignments) + } +} + +func TestConsumerGroup_UpdateMemberSubscription(t *testing.T) { + group := &ConsumerGroup{ + Members: map[string]*GroupMember{ + "member1": { + ID: "member1", + Subscription: []string{"topic1"}, + }, + "member2": { + ID: "member2", + Subscription: []string{"topic2"}, + }, + }, + SubscribedTopics: map[string]bool{ + "topic1": true, + "topic2": true, + }, + } + + // Update member1's subscription + group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) + + // Verify member subscription updated + member1 := group.Members["member1"] + expectedSubscription := []string{"topic1", "topic3"} + if !reflect.DeepEqual(member1.Subscription, expectedSubscription) { + t.Errorf("Expected subscription %v, got %v", expectedSubscription, member1.Subscription) + } + + // Verify group subscribed topics updated + expectedGroupTopics := []string{"topic1", "topic2", "topic3"} + actualGroupTopics := group.GetSubscribedTopics() + + if !reflect.DeepEqual(actualGroupTopics, expectedGroupTopics) { + t.Errorf("Expected group topics %v, got %v", expectedGroupTopics, actualGroupTopics) + } +} + +func TestAssignmentStrategy_EmptyMembers(t *testing.T) { + rangeStrategy := &RangeAssignmentStrategy{} + rrStrategy := &RoundRobinAssignmentStrategy{} + + topicPartitions := map[string][]int32{ + "topic1": {0, 1, 2, 3}, + } + + // Both strategies should handle empty members gracefully + rangeAssignments := rangeStrategy.Assign([]*GroupMember{}, topicPartitions) + rrAssignments := rrStrategy.Assign([]*GroupMember{}, topicPartitions) + + if len(rangeAssignments) != 0 { + t.Error("Expected empty assignments for empty members list (range)") + } + + if len(rrAssignments) != 0 { + t.Error("Expected empty assignments for empty members list (round robin)") + } +} diff --git a/weed/mq/kafka/consumer/group_coordinator.go b/weed/mq/kafka/consumer/group_coordinator.go new file mode 100644 index 000000000..dc1a94dc2 --- /dev/null +++ b/weed/mq/kafka/consumer/group_coordinator.go @@ -0,0 +1,298 @@ +package consumer + +import ( + "fmt" + "sync" + "time" +) + +// GroupState represents the state of a consumer group +type GroupState int + +const ( + GroupStateEmpty GroupState = iota + GroupStatePreparingRebalance + GroupStateCompletingRebalance + GroupStateStable + GroupStateDead +) + +func (gs GroupState) String() string { + switch gs { + case GroupStateEmpty: + return "Empty" + case GroupStatePreparingRebalance: + return "PreparingRebalance" + case GroupStateCompletingRebalance: + return "CompletingRebalance" + case GroupStateStable: + return "Stable" + case GroupStateDead: + return "Dead" + default: + return "Unknown" + } +} + +// MemberState represents the state of a group member +type MemberState int + +const ( + MemberStateUnknown MemberState = iota + MemberStatePending + MemberStateStable + MemberStateLeaving +) + +func (ms MemberState) String() string { + switch ms { + case MemberStateUnknown: + return "Unknown" + case MemberStatePending: + return "Pending" + case MemberStateStable: + return "Stable" + case MemberStateLeaving: + return "Leaving" + default: + return "Unknown" + } +} + +// GroupMember represents a consumer in a consumer group +type GroupMember struct { + ID string // Member ID (generated by gateway) + ClientID string // Client ID from consumer + ClientHost string // Client host/IP + SessionTimeout int32 // Session timeout in milliseconds + RebalanceTimeout int32 // Rebalance timeout in milliseconds + Subscription []string // Subscribed topics + Assignment []PartitionAssignment // Assigned partitions + Metadata []byte // Protocol-specific metadata + State MemberState // Current member state + LastHeartbeat time.Time // Last heartbeat timestamp + JoinedAt time.Time // When member joined group +} + +// PartitionAssignment represents partition assignment for a member +type PartitionAssignment struct { + Topic string + Partition int32 +} + +// ConsumerGroup represents a Kafka consumer group +type ConsumerGroup struct { + ID string // Group ID + State GroupState // Current group state + Generation int32 // Generation ID (incremented on rebalance) + Protocol string // Assignment protocol (e.g., "range", "roundrobin") + Leader string // Leader member ID + Members map[string]*GroupMember // Group members by member ID + SubscribedTopics map[string]bool // Topics subscribed by group + OffsetCommits map[string]map[int32]OffsetCommit // Topic -> Partition -> Offset + CreatedAt time.Time // Group creation time + LastActivity time.Time // Last activity (join, heartbeat, etc.) + + Mu sync.RWMutex // Protects group state +} + +// OffsetCommit represents a committed offset for a topic partition +type OffsetCommit struct { + Offset int64 // Committed offset + Metadata string // Optional metadata + Timestamp time.Time // Commit timestamp +} + +// GroupCoordinator manages consumer groups +type GroupCoordinator struct { + groups map[string]*ConsumerGroup // Group ID -> Group + groupsMu sync.RWMutex // Protects groups map + + // Configuration + sessionTimeoutMin int32 // Minimum session timeout (ms) + sessionTimeoutMax int32 // Maximum session timeout (ms) + rebalanceTimeoutMs int32 // Default rebalance timeout (ms) + + // Cleanup + cleanupTicker *time.Ticker + stopChan chan struct{} + stopOnce sync.Once +} + +// NewGroupCoordinator creates a new consumer group coordinator +func NewGroupCoordinator() *GroupCoordinator { + gc := &GroupCoordinator{ + groups: make(map[string]*ConsumerGroup), + sessionTimeoutMin: 6000, // 6 seconds + sessionTimeoutMax: 300000, // 5 minutes + rebalanceTimeoutMs: 300000, // 5 minutes + stopChan: make(chan struct{}), + } + + // Start cleanup routine + gc.cleanupTicker = time.NewTicker(30 * time.Second) + go gc.cleanupRoutine() + + return gc +} + +// GetOrCreateGroup returns an existing group or creates a new one +func (gc *GroupCoordinator) GetOrCreateGroup(groupID string) *ConsumerGroup { + gc.groupsMu.Lock() + defer gc.groupsMu.Unlock() + + group, exists := gc.groups[groupID] + if !exists { + group = &ConsumerGroup{ + ID: groupID, + State: GroupStateEmpty, + Generation: 0, + Members: make(map[string]*GroupMember), + SubscribedTopics: make(map[string]bool), + OffsetCommits: make(map[string]map[int32]OffsetCommit), + CreatedAt: time.Now(), + LastActivity: time.Now(), + } + gc.groups[groupID] = group + } + + return group +} + +// GetGroup returns an existing group or nil if not found +func (gc *GroupCoordinator) GetGroup(groupID string) *ConsumerGroup { + gc.groupsMu.RLock() + defer gc.groupsMu.RUnlock() + + return gc.groups[groupID] +} + +// RemoveGroup removes a group from the coordinator +func (gc *GroupCoordinator) RemoveGroup(groupID string) { + gc.groupsMu.Lock() + defer gc.groupsMu.Unlock() + + delete(gc.groups, groupID) +} + +// ListGroups returns all current group IDs +func (gc *GroupCoordinator) ListGroups() []string { + gc.groupsMu.RLock() + defer gc.groupsMu.RUnlock() + + groups := make([]string, 0, len(gc.groups)) + for groupID := range gc.groups { + groups = append(groups, groupID) + } + return groups +} + +// GenerateMemberID creates a unique member ID +func (gc *GroupCoordinator) GenerateMemberID(clientID, clientHost string) string { + // Use timestamp + client info to create unique member ID + timestamp := time.Now().UnixNano() + return fmt.Sprintf("%s-%s-%d", clientID, clientHost, timestamp) +} + +// ValidateSessionTimeout checks if session timeout is within acceptable range +func (gc *GroupCoordinator) ValidateSessionTimeout(timeout int32) bool { + return timeout >= gc.sessionTimeoutMin && timeout <= gc.sessionTimeoutMax +} + +// cleanupRoutine periodically cleans up dead groups and expired members +func (gc *GroupCoordinator) cleanupRoutine() { + for { + select { + case <-gc.cleanupTicker.C: + gc.performCleanup() + case <-gc.stopChan: + return + } + } +} + +// performCleanup removes expired members and empty groups +func (gc *GroupCoordinator) performCleanup() { + now := time.Now() + gc.groupsMu.Lock() + defer gc.groupsMu.Unlock() + + for groupID, group := range gc.groups { + group.Mu.Lock() + + // Check for expired members + expiredMembers := make([]string, 0) + for memberID, member := range group.Members { + sessionDuration := time.Duration(member.SessionTimeout) * time.Millisecond + if now.Sub(member.LastHeartbeat) > sessionDuration { + expiredMembers = append(expiredMembers, memberID) + } + } + + // Remove expired members + for _, memberID := range expiredMembers { + delete(group.Members, memberID) + if group.Leader == memberID { + group.Leader = "" + } + } + + // Update group state based on member count + if len(group.Members) == 0 { + if group.State != GroupStateEmpty { + group.State = GroupStateEmpty + group.Generation++ + } + + // Mark group for deletion if empty for too long (30 minutes) + if now.Sub(group.LastActivity) > 30*time.Minute { + group.State = GroupStateDead + } + } + + group.Mu.Unlock() + + // Remove dead groups + if group.State == GroupStateDead { + delete(gc.groups, groupID) + } + } +} + +// Close shuts down the group coordinator +func (gc *GroupCoordinator) Close() { + gc.stopOnce.Do(func() { + close(gc.stopChan) + if gc.cleanupTicker != nil { + gc.cleanupTicker.Stop() + } + }) +} + +// GetGroupStats returns statistics about the group coordinator +func (gc *GroupCoordinator) GetGroupStats() map[string]interface{} { + gc.groupsMu.RLock() + defer gc.groupsMu.RUnlock() + + stats := map[string]interface{}{ + "total_groups": len(gc.groups), + "group_states": make(map[string]int), + } + + stateCount := make(map[GroupState]int) + totalMembers := 0 + + for _, group := range gc.groups { + group.Mu.RLock() + stateCount[group.State]++ + totalMembers += len(group.Members) + group.Mu.RUnlock() + } + + stats["total_members"] = totalMembers + for state, count := range stateCount { + stats["group_states"].(map[string]int)[state.String()] = count + } + + return stats +} diff --git a/weed/mq/kafka/consumer/group_coordinator_test.go b/weed/mq/kafka/consumer/group_coordinator_test.go new file mode 100644 index 000000000..3e7c3d06c --- /dev/null +++ b/weed/mq/kafka/consumer/group_coordinator_test.go @@ -0,0 +1,219 @@ +package consumer + +import ( + "testing" + "time" +) + +func TestGroupCoordinator_CreateGroup(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + groupID := "test-group" + group := gc.GetOrCreateGroup(groupID) + + if group == nil { + t.Fatal("Expected group to be created") + } + + if group.ID != groupID { + t.Errorf("Expected group ID %s, got %s", groupID, group.ID) + } + + if group.State != GroupStateEmpty { + t.Errorf("Expected initial state to be Empty, got %s", group.State) + } + + if group.Generation != 0 { + t.Errorf("Expected initial generation to be 0, got %d", group.Generation) + } + + // Getting the same group should return the existing one + group2 := gc.GetOrCreateGroup(groupID) + if group2 != group { + t.Error("Expected to get the same group instance") + } +} + +func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + // Test valid timeouts + validTimeouts := []int32{6000, 30000, 300000} + for _, timeout := range validTimeouts { + if !gc.ValidateSessionTimeout(timeout) { + t.Errorf("Expected timeout %d to be valid", timeout) + } + } + + // Test invalid timeouts + invalidTimeouts := []int32{1000, 5000, 400000} + for _, timeout := range invalidTimeouts { + if gc.ValidateSessionTimeout(timeout) { + t.Errorf("Expected timeout %d to be invalid", timeout) + } + } +} + +func TestGroupCoordinator_MemberManagement(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + group := gc.GetOrCreateGroup("test-group") + + // Add members + member1 := &GroupMember{ + ID: "member1", + ClientID: "client1", + SessionTimeout: 30000, + Subscription: []string{"topic1", "topic2"}, + State: MemberStateStable, + LastHeartbeat: time.Now(), + } + + member2 := &GroupMember{ + ID: "member2", + ClientID: "client2", + SessionTimeout: 30000, + Subscription: []string{"topic1"}, + State: MemberStateStable, + LastHeartbeat: time.Now(), + } + + group.Mu.Lock() + group.Members[member1.ID] = member1 + group.Members[member2.ID] = member2 + group.Mu.Unlock() + + // Update subscriptions + group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) + + group.Mu.RLock() + updatedMember := group.Members["member1"] + expectedTopics := []string{"topic1", "topic3"} + if len(updatedMember.Subscription) != len(expectedTopics) { + t.Errorf("Expected %d subscribed topics, got %d", len(expectedTopics), len(updatedMember.Subscription)) + } + + // Check group subscribed topics + if len(group.SubscribedTopics) != 2 { // topic1, topic3 + t.Errorf("Expected 2 group subscribed topics, got %d", len(group.SubscribedTopics)) + } + group.Mu.RUnlock() +} + +func TestGroupCoordinator_Stats(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + // Create multiple groups in different states + group1 := gc.GetOrCreateGroup("group1") + group1.Mu.Lock() + group1.State = GroupStateStable + group1.Members["member1"] = &GroupMember{ID: "member1"} + group1.Members["member2"] = &GroupMember{ID: "member2"} + group1.Mu.Unlock() + + group2 := gc.GetOrCreateGroup("group2") + group2.Mu.Lock() + group2.State = GroupStatePreparingRebalance + group2.Members["member3"] = &GroupMember{ID: "member3"} + group2.Mu.Unlock() + + stats := gc.GetGroupStats() + + totalGroups := stats["total_groups"].(int) + if totalGroups != 2 { + t.Errorf("Expected 2 total groups, got %d", totalGroups) + } + + totalMembers := stats["total_members"].(int) + if totalMembers != 3 { + t.Errorf("Expected 3 total members, got %d", totalMembers) + } + + stateCount := stats["group_states"].(map[string]int) + if stateCount["Stable"] != 1 { + t.Errorf("Expected 1 stable group, got %d", stateCount["Stable"]) + } + + if stateCount["PreparingRebalance"] != 1 { + t.Errorf("Expected 1 preparing rebalance group, got %d", stateCount["PreparingRebalance"]) + } +} + +func TestGroupCoordinator_Cleanup(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + // Create a group with an expired member + group := gc.GetOrCreateGroup("test-group") + + expiredMember := &GroupMember{ + ID: "expired-member", + SessionTimeout: 1000, // 1 second + LastHeartbeat: time.Now().Add(-2 * time.Second), // 2 seconds ago + State: MemberStateStable, + } + + activeMember := &GroupMember{ + ID: "active-member", + SessionTimeout: 30000, // 30 seconds + LastHeartbeat: time.Now(), // just now + State: MemberStateStable, + } + + group.Mu.Lock() + group.Members[expiredMember.ID] = expiredMember + group.Members[activeMember.ID] = activeMember + group.Leader = expiredMember.ID // Make expired member the leader + group.Mu.Unlock() + + // Perform cleanup + gc.performCleanup() + + group.Mu.RLock() + defer group.Mu.RUnlock() + + // Expired member should be removed + if _, exists := group.Members[expiredMember.ID]; exists { + t.Error("Expected expired member to be removed") + } + + // Active member should remain + if _, exists := group.Members[activeMember.ID]; !exists { + t.Error("Expected active member to remain") + } + + // Leader should be reset since expired member was leader + if group.Leader == expiredMember.ID { + t.Error("Expected leader to be reset after expired member removal") + } +} + +func TestGroupCoordinator_GenerateMemberID(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + // Generate member IDs with small delay to ensure different timestamps + id1 := gc.GenerateMemberID("client1", "host1") + time.Sleep(1 * time.Nanosecond) // Ensure different timestamp + id2 := gc.GenerateMemberID("client1", "host1") + time.Sleep(1 * time.Nanosecond) // Ensure different timestamp + id3 := gc.GenerateMemberID("client2", "host1") + + // IDs should be unique + if id1 == id2 { + t.Errorf("Expected different member IDs for same client: %s vs %s", id1, id2) + } + + if id1 == id3 || id2 == id3 { + t.Errorf("Expected different member IDs for different clients: %s, %s, %s", id1, id2, id3) + } + + // IDs should contain client and host info + if len(id1) < 10 { // Should be longer than just timestamp + t.Errorf("Expected member ID to contain client and host info, got: %s", id1) + } +} diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index b379d0d78..91a31a005 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -10,9 +10,9 @@ import ( ) type Options struct { - Listen string - AgentAddress string // Optional: SeaweedMQ Agent address for production mode - UseSeaweedMQ bool // Use SeaweedMQ backend instead of in-memory stub + Listen string + AgentAddress string // Optional: SeaweedMQ Agent address for production mode + UseSeaweedMQ bool // Use SeaweedMQ backend instead of in-memory stub } type Server struct { @@ -26,7 +26,7 @@ type Server struct { func NewServer(opts Options) *Server { ctx, cancel := context.WithCancel(context.Background()) - + var handler *protocol.Handler if opts.UseSeaweedMQ && opts.AgentAddress != "" { // Try to create SeaweedMQ handler @@ -43,7 +43,7 @@ func NewServer(opts Options) *Server { handler = protocol.NewHandler() glog.V(1).Infof("Created Kafka gateway with in-memory backend") } - + return &Server{ opts: opts, ctx: ctx, @@ -94,14 +94,14 @@ func (s *Server) Close() error { _ = s.ln.Close() } s.wg.Wait() - + // Close the handler (important for SeaweedMQ mode) if s.handler != nil { if err := s.handler.Close(); err != nil { glog.Warningf("Error closing handler: %v", err) } } - + return nil } diff --git a/weed/mq/kafka/integration/agent_client.go b/weed/mq/kafka/integration/agent_client.go index 642a29310..c972d0d62 100644 --- a/weed/mq/kafka/integration/agent_client.go +++ b/weed/mq/kafka/integration/agent_client.go @@ -19,15 +19,15 @@ type AgentClient struct { agentAddress string conn *grpc.ClientConn client mq_agent_pb.SeaweedMessagingAgentClient - + // Publisher sessions: topic-partition -> session info publishersLock sync.RWMutex publishers map[string]*PublisherSession - + // Subscriber sessions for offset tracking subscribersLock sync.RWMutex subscribers map[string]*SubscriberSession - + ctx context.Context cancel context.CancelFunc } @@ -44,17 +44,17 @@ type PublisherSession struct { // SubscriberSession tracks a subscription for offset management type SubscriberSession struct { - Topic string - Partition int32 - Stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordClient + Topic string + Partition int32 + Stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordClient OffsetLedger *offset.Ledger // Still use for Kafka offset translation } // NewAgentClient creates a new SeaweedMQ Agent client func NewAgentClient(agentAddress string) (*AgentClient, error) { ctx, cancel := context.WithCancel(context.Background()) - - conn, err := grpc.DialContext(ctx, agentAddress, + + conn, err := grpc.DialContext(ctx, agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), // Don't block - fail fast for invalid addresses ) @@ -62,9 +62,9 @@ func NewAgentClient(agentAddress string) (*AgentClient, error) { cancel() return nil, fmt.Errorf("failed to connect to agent %s: %v", agentAddress, err) } - + client := mq_agent_pb.NewSeaweedMessagingAgentClient(conn) - + return &AgentClient{ agentAddress: agentAddress, conn: conn, @@ -79,7 +79,7 @@ func NewAgentClient(agentAddress string) (*AgentClient, error) { // Close shuts down the agent client and all sessions func (ac *AgentClient) Close() error { ac.cancel() - + // Close all publisher sessions ac.publishersLock.Lock() for key, session := range ac.publishers { @@ -87,7 +87,7 @@ func (ac *AgentClient) Close() error { delete(ac.publishers, key) } ac.publishersLock.Unlock() - + // Close all subscriber sessions ac.subscribersLock.Lock() for key, session := range ac.subscribers { @@ -97,14 +97,14 @@ func (ac *AgentClient) Close() error { delete(ac.subscribers, key) } ac.subscribersLock.Unlock() - + return ac.conn.Close() } // GetOrCreatePublisher gets or creates a publisher session for a topic-partition func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*PublisherSession, error) { key := fmt.Sprintf("%s-%d", topic, partition) - + // Try to get existing publisher ac.publishersLock.RLock() if session, exists := ac.publishers[key]; exists { @@ -112,22 +112,22 @@ func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*Pub return session, nil } ac.publishersLock.RUnlock() - + // Create new publisher session ac.publishersLock.Lock() defer ac.publishersLock.Unlock() - + // Double-check after acquiring write lock if session, exists := ac.publishers[key]; exists { return session, nil } - + // Create the session session, err := ac.createPublishSession(topic, partition) if err != nil { return nil, err } - + ac.publishers[key] = session return session, nil } @@ -166,7 +166,7 @@ func (ac *AgentClient) createPublishSession(topic string, partition int32) (*Pub }, }, } - + // Start publish session startReq := &mq_agent_pb.StartPublishSessionRequest{ Topic: &schema_pb.Topic{ @@ -177,22 +177,22 @@ func (ac *AgentClient) createPublishSession(topic string, partition int32) (*Pub RecordType: recordType, PublisherName: "kafka-gateway", } - + startResp, err := ac.client.StartPublishSession(ac.ctx, startReq) if err != nil { return nil, fmt.Errorf("failed to start publish session: %v", err) } - + if startResp.Error != "" { return nil, fmt.Errorf("publish session error: %s", startResp.Error) } - + // Create streaming connection stream, err := ac.client.PublishRecord(ac.ctx) if err != nil { return nil, fmt.Errorf("failed to create publish stream: %v", err) } - + session := &PublisherSession{ SessionID: startResp.SessionId, Topic: topic, @@ -200,7 +200,7 @@ func (ac *AgentClient) createPublishSession(topic string, partition int32) (*Pub Stream: stream, RecordType: recordType, } - + return session, nil } @@ -210,7 +210,7 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, if err != nil { return 0, err } - + // Convert to SeaweedMQ record format record := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ @@ -230,28 +230,28 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, }, }, } - + // Send publish request req := &mq_agent_pb.PublishRecordRequest{ SessionId: session.SessionID, Key: key, Value: record, } - + if err := session.Stream.Send(req); err != nil { return 0, fmt.Errorf("failed to send record: %v", err) } - + // Read acknowledgment (this is a streaming API, so we should read the response) resp, err := session.Stream.Recv() if err != nil { return 0, fmt.Errorf("failed to receive ack: %v", err) } - + if resp.Error != "" { return 0, fmt.Errorf("publish error: %s", resp.Error) } - + session.LastSequence = resp.AckSequence return resp.AckSequence, nil } @@ -259,27 +259,27 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, // GetOrCreateSubscriber gets or creates a subscriber for offset tracking func (ac *AgentClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*SubscriberSession, error) { key := fmt.Sprintf("%s-%d", topic, partition) - + ac.subscribersLock.RLock() if session, exists := ac.subscribers[key]; exists { ac.subscribersLock.RUnlock() return session, nil } ac.subscribersLock.RUnlock() - + // Create new subscriber session ac.subscribersLock.Lock() defer ac.subscribersLock.Unlock() - + if session, exists := ac.subscribers[key]; exists { return session, nil } - + session, err := ac.createSubscribeSession(topic, partition, startOffset) if err != nil { return nil, err } - + ac.subscribers[key] = session return session, nil } @@ -290,7 +290,7 @@ func (ac *AgentClient) createSubscribeSession(topic string, partition int32, sta if err != nil { return nil, fmt.Errorf("failed to create subscribe stream: %v", err) } - + // Send initial subscribe request initReq := &mq_agent_pb.SubscribeRecordRequest{ Init: &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{ @@ -310,38 +310,38 @@ func (ac *AgentClient) createSubscribeSession(topic string, partition int32, sta StartTsNs: startOffset, // Use offset as timestamp for now }, }, - OffsetType: schema_pb.OffsetType_EXACT_TS_NS, - MaxSubscribedPartitions: 1, - SlidingWindowSize: 10, + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + MaxSubscribedPartitions: 1, + SlidingWindowSize: 10, }, } - + if err := stream.Send(initReq); err != nil { return nil, fmt.Errorf("failed to send subscribe init: %v", err) } - + session := &SubscriberSession{ - Topic: topic, - Partition: partition, - Stream: stream, + Topic: topic, + Partition: partition, + Stream: stream, OffsetLedger: offset.NewLedger(), // Keep Kafka offset tracking } - + return session, nil } // ClosePublisher closes a specific publisher session func (ac *AgentClient) ClosePublisher(topic string, partition int32) error { key := fmt.Sprintf("%s-%d", topic, partition) - + ac.publishersLock.Lock() defer ac.publishersLock.Unlock() - + session, exists := ac.publishers[key] if !exists { return nil // Already closed or never existed } - + err := ac.closePublishSessionLocked(session.SessionID) delete(ac.publishers, key) return err @@ -352,7 +352,7 @@ func (ac *AgentClient) closePublishSessionLocked(sessionID int64) error { closeReq := &mq_agent_pb.ClosePublishSessionRequest{ SessionId: sessionID, } - + _, err := ac.client.ClosePublishSession(ac.ctx, closeReq) return err } @@ -362,7 +362,7 @@ func (ac *AgentClient) HealthCheck() error { // Create a timeout context for health check ctx, cancel := context.WithTimeout(ac.ctx, 2*time.Second) defer cancel() - + // Try to start and immediately close a dummy session req := &mq_agent_pb.StartPublishSessionRequest{ Topic: &schema_pb.Topic{ @@ -383,21 +383,21 @@ func (ac *AgentClient) HealthCheck() error { }, PublisherName: "health-check", } - + resp, err := ac.client.StartPublishSession(ctx, req) if err != nil { return fmt.Errorf("health check failed: %v", err) } - + if resp.Error != "" { return fmt.Errorf("health check error: %s", resp.Error) } - + // Close the health check session closeReq := &mq_agent_pb.ClosePublishSessionRequest{ SessionId: resp.SessionId, } _, _ = ac.client.ClosePublishSession(ctx, closeReq) - + return nil } diff --git a/weed/mq/kafka/integration/agent_client_test.go b/weed/mq/kafka/integration/agent_client_test.go index 232b374a3..06e9334a5 100644 --- a/weed/mq/kafka/integration/agent_client_test.go +++ b/weed/mq/kafka/integration/agent_client_test.go @@ -9,122 +9,122 @@ import ( func TestAgentClient_Creation(t *testing.T) { // Skip if no real agent available (would need real SeaweedMQ setup) t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + client, err := NewAgentClient("localhost:17777") // default agent port if err != nil { t.Fatalf("Failed to create agent client: %v", err) } defer client.Close() - + // Test health check err = client.HealthCheck() if err != nil { t.Fatalf("Health check failed: %v", err) } - + t.Logf("Agent client created and health check passed") } // TestAgentClient_PublishRecord tests publishing records func TestAgentClient_PublishRecord(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + client, err := NewAgentClient("localhost:17777") if err != nil { t.Fatalf("Failed to create agent client: %v", err) } defer client.Close() - + // Test publishing a record key := []byte("test-key") value := []byte("test-value") timestamp := time.Now().UnixNano() - + sequence, err := client.PublishRecord("test-topic", 0, key, value, timestamp) if err != nil { t.Fatalf("Failed to publish record: %v", err) } - + if sequence < 0 { t.Errorf("Invalid sequence: %d", sequence) } - + t.Logf("Published record with sequence: %d", sequence) } // TestAgentClient_SessionManagement tests publisher session lifecycle func TestAgentClient_SessionManagement(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + client, err := NewAgentClient("localhost:17777") if err != nil { t.Fatalf("Failed to create agent client: %v", err) } defer client.Close() - + // Create publisher session session, err := client.GetOrCreatePublisher("session-test-topic", 0) if err != nil { t.Fatalf("Failed to create publisher: %v", err) } - + if session.SessionID == 0 { t.Errorf("Invalid session ID: %d", session.SessionID) } - + if session.Topic != "session-test-topic" { t.Errorf("Topic mismatch: got %s, want session-test-topic", session.Topic) } - + if session.Partition != 0 { t.Errorf("Partition mismatch: got %d, want 0", session.Partition) } - + // Close the publisher err = client.ClosePublisher("session-test-topic", 0) if err != nil { t.Errorf("Failed to close publisher: %v", err) } - + t.Logf("Publisher session managed successfully") } // TestAgentClient_ConcurrentPublish tests concurrent publishing func TestAgentClient_ConcurrentPublish(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + client, err := NewAgentClient("localhost:17777") if err != nil { t.Fatalf("Failed to create agent client: %v", err) } defer client.Close() - + // Publish multiple records concurrently numRecords := 10 errors := make(chan error, numRecords) sequences := make(chan int64, numRecords) - + for i := 0; i < numRecords; i++ { go func(index int) { key := []byte("concurrent-key") value := []byte("concurrent-value-" + string(rune(index))) timestamp := time.Now().UnixNano() - + sequence, err := client.PublishRecord("concurrent-test-topic", 0, key, value, timestamp) if err != nil { errors <- err return } - + sequences <- sequence errors <- nil }(i) } - + // Collect results successCount := 0 var lastSequence int64 = -1 - + for i := 0; i < numRecords; i++ { err := <-errors if err != nil { @@ -137,11 +137,11 @@ func TestAgentClient_ConcurrentPublish(t *testing.T) { successCount++ } } - + if successCount < numRecords { t.Errorf("Only %d/%d publishes succeeded", successCount, numRecords) } - - t.Logf("Concurrent publish test: %d/%d successful, last sequence: %d", + + t.Logf("Concurrent publish test: %d/%d successful, last sequence: %d", successCount, numRecords, lastSequence) } diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index e0b13f692..80507fc94 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -13,11 +13,11 @@ import ( // SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage type SeaweedMQHandler struct { agentClient *AgentClient - + // Topic registry - still keep track of Kafka topics topicsMu sync.RWMutex topics map[string]*KafkaTopicInfo - + // Offset ledgers for Kafka offset translation ledgersMu sync.RWMutex ledgers map[TopicPartitionKey]*offset.Ledger @@ -28,7 +28,7 @@ type KafkaTopicInfo struct { Name string Partitions int32 CreatedAt int64 - + // SeaweedMQ integration SeaweedTopic *schema_pb.Topic } @@ -45,13 +45,13 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) { if err != nil { return nil, fmt.Errorf("failed to create agent client: %v", err) } - + // Test the connection if err := agentClient.HealthCheck(); err != nil { agentClient.Close() return nil, fmt.Errorf("agent health check failed: %v", err) } - + return &SeaweedMQHandler{ agentClient: agentClient, topics: make(map[string]*KafkaTopicInfo), @@ -68,18 +68,18 @@ func (h *SeaweedMQHandler) Close() error { func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { h.topicsMu.Lock() defer h.topicsMu.Unlock() - + // Check if topic already exists if _, exists := h.topics[name]; exists { return fmt.Errorf("topic %s already exists", name) } - + // Create SeaweedMQ topic reference seaweedTopic := &schema_pb.Topic{ Namespace: "kafka", Name: name, } - + // Create Kafka topic info topicInfo := &KafkaTopicInfo{ Name: name, @@ -87,10 +87,10 @@ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { CreatedAt: time.Now().UnixNano(), SeaweedTopic: seaweedTopic, } - + // Store in registry h.topics[name] = topicInfo - + // Initialize offset ledgers for all partitions for partitionID := int32(0); partitionID < partitions; partitionID++ { key := TopicPartitionKey{Topic: name, Partition: partitionID} @@ -98,7 +98,7 @@ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { h.ledgers[key] = offset.NewLedger() h.ledgersMu.Unlock() } - + return nil } @@ -106,20 +106,20 @@ func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { func (h *SeaweedMQHandler) DeleteTopic(name string) error { h.topicsMu.Lock() defer h.topicsMu.Unlock() - + topicInfo, exists := h.topics[name] if !exists { return fmt.Errorf("topic %s does not exist", name) } - + // Close all publisher sessions for this topic for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { h.agentClient.ClosePublisher(name, partitionID) } - + // Remove from registry delete(h.topics, name) - + // Clean up offset ledgers h.ledgersMu.Lock() for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { @@ -127,7 +127,7 @@ func (h *SeaweedMQHandler) DeleteTopic(name string) error { delete(h.ledgers, key) } h.ledgersMu.Unlock() - + return nil } @@ -135,7 +135,7 @@ func (h *SeaweedMQHandler) DeleteTopic(name string) error { func (h *SeaweedMQHandler) TopicExists(name string) bool { h.topicsMu.RLock() defer h.topicsMu.RUnlock() - + _, exists := h.topics[name] return exists } @@ -144,7 +144,7 @@ func (h *SeaweedMQHandler) TopicExists(name string) bool { func (h *SeaweedMQHandler) GetTopicInfo(name string) (*KafkaTopicInfo, bool) { h.topicsMu.RLock() defer h.topicsMu.RUnlock() - + info, exists := h.topics[name] return info, exists } @@ -153,7 +153,7 @@ func (h *SeaweedMQHandler) GetTopicInfo(name string) (*KafkaTopicInfo, bool) { func (h *SeaweedMQHandler) ListTopics() []string { h.topicsMu.RLock() defer h.topicsMu.RUnlock() - + topics := make([]string, 0, len(h.topics)) for name := range h.topics { topics = append(topics, name) @@ -167,51 +167,51 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by if !h.TopicExists(topic) { return 0, fmt.Errorf("topic %s does not exist", topic) } - + // Get current timestamp timestamp := time.Now().UnixNano() - + // Publish to SeaweedMQ _, err := h.agentClient.PublishRecord(topic, partition, key, value, timestamp) if err != nil { return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", err) } - + // Update Kafka offset ledger ledger := h.GetOrCreateLedger(topic, partition) kafkaOffset := ledger.AssignOffsets(1) // Assign one Kafka offset - + // Map SeaweedMQ sequence to Kafka offset if err := ledger.AppendRecord(kafkaOffset, timestamp, int32(len(value))); err != nil { // Log the error but don't fail the produce operation fmt.Printf("Warning: failed to update offset ledger: %v\n", err) } - + return kafkaOffset, nil } // GetOrCreateLedger returns the offset ledger for a topic-partition func (h *SeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} - + // Try to get existing ledger h.ledgersMu.RLock() ledger, exists := h.ledgers[key] h.ledgersMu.RUnlock() - + if exists { return ledger } - + // Create new ledger h.ledgersMu.Lock() defer h.ledgersMu.Unlock() - + // Double-check after acquiring write lock if ledger, exists := h.ledgers[key]; exists { return ledger } - + // Create and store new ledger ledger = offset.NewLedger() h.ledgers[key] = ledger @@ -221,10 +221,10 @@ func (h *SeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *off // GetLedger returns the offset ledger for a topic-partition, or nil if not found func (h *SeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger { key := TopicPartitionKey{Topic: topic, Partition: partition} - + h.ledgersMu.RLock() defer h.ledgersMu.RUnlock() - + return h.ledgers[key] } @@ -234,20 +234,20 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs if !h.TopicExists(topic) { return nil, fmt.Errorf("topic %s does not exist", topic) } - + ledger := h.GetLedger(topic, partition) if ledger == nil { // No messages yet, return empty record batch return []byte{}, nil } - + highWaterMark := ledger.GetHighWaterMark() - + // If fetch offset is at or beyond high water mark, no records to return if fetchOffset >= highWaterMark { return []byte{}, nil } - + // For Phase 2, we'll construct a simplified record batch // In a full implementation, this would read from SeaweedMQ subscriber return h.constructKafkaRecordBatch(ledger, fetchOffset, highWaterMark, maxBytes) @@ -259,61 +259,61 @@ func (h *SeaweedMQHandler) constructKafkaRecordBatch(ledger *offset.Ledger, fetc if recordsToFetch <= 0 { return []byte{}, nil } - + // Limit records to prevent overly large batches if recordsToFetch > 100 { recordsToFetch = 100 } - + // For Phase 2, create a stub record batch with placeholder data // This represents what would come from SeaweedMQ subscriber batch := make([]byte, 0, 512) - + // Record batch header baseOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) batch = append(batch, baseOffsetBytes...) // base offset - + // Batch length (placeholder, will be filled at end) batchLengthPos := len(batch) batch = append(batch, 0, 0, 0, 0) - + batch = append(batch, 0, 0, 0, 0) // partition leader epoch batch = append(batch, 2) // magic byte (version 2) - + // CRC placeholder batch = append(batch, 0, 0, 0, 0) - + // Batch attributes batch = append(batch, 0, 0) - + // Last offset delta lastOffsetDelta := uint32(recordsToFetch - 1) lastOffsetDeltaBytes := make([]byte, 4) binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) batch = append(batch, lastOffsetDeltaBytes...) - + // Timestamps currentTime := time.Now().UnixNano() firstTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(firstTimestampBytes, uint64(currentTime)) batch = append(batch, firstTimestampBytes...) - + maxTimestamp := currentTime + recordsToFetch*1000000 // 1ms apart maxTimestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) batch = append(batch, maxTimestampBytes...) - + // Producer info (simplified) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer ID (-1) batch = append(batch, 0xFF, 0xFF) // producer epoch (-1) batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) // base sequence (-1) - + // Record count recordCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(recordCountBytes, uint32(recordsToFetch)) batch = append(batch, recordCountBytes...) - + // Add simple records (placeholders representing SeaweedMQ data) for i := int64(0); i < recordsToFetch; i++ { record := h.constructSingleRecord(i, fetchOffset+i) @@ -321,37 +321,37 @@ func (h *SeaweedMQHandler) constructKafkaRecordBatch(ledger *offset.Ledger, fetc batch = append(batch, recordLength) batch = append(batch, record...) } - + // Fill in the batch length batchLength := uint32(len(batch) - batchLengthPos - 4) binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) - + return batch, nil } // constructSingleRecord creates a single Kafka record func (h *SeaweedMQHandler) constructSingleRecord(index, offset int64) []byte { record := make([]byte, 0, 64) - + // Record attributes record = append(record, 0) - + // Timestamp delta (varint - simplified) record = append(record, byte(index)) - + // Offset delta (varint - simplified) record = append(record, byte(index)) - + // Key length (-1 = null key) record = append(record, 0xFF) - + // Value (represents data that would come from SeaweedMQ) value := fmt.Sprintf("seaweedmq-message-%d", offset) record = append(record, byte(len(value))) record = append(record, []byte(value)...) - + // Headers count (0) record = append(record, 0) - + return record } diff --git a/weed/mq/kafka/integration/seaweedmq_handler_test.go b/weed/mq/kafka/integration/seaweedmq_handler_test.go index dad3582c2..64b6e9148 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler_test.go +++ b/weed/mq/kafka/integration/seaweedmq_handler_test.go @@ -9,261 +9,261 @@ import ( func TestSeaweedMQHandler_Creation(t *testing.T) { // Skip if no real agent available t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + // Test basic operations topics := handler.ListTopics() if topics == nil { t.Errorf("ListTopics returned nil") } - + t.Logf("SeaweedMQ handler created successfully, found %d existing topics", len(topics)) } // TestSeaweedMQHandler_TopicLifecycle tests topic creation and deletion func TestSeaweedMQHandler_TopicLifecycle(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + topicName := "lifecycle-test-topic" - + // Initially should not exist if handler.TopicExists(topicName) { t.Errorf("Topic %s should not exist initially", topicName) } - + // Create the topic err = handler.CreateTopic(topicName, 1) if err != nil { t.Fatalf("Failed to create topic: %v", err) } - + // Now should exist if !handler.TopicExists(topicName) { t.Errorf("Topic %s should exist after creation", topicName) } - + // Get topic info info, exists := handler.GetTopicInfo(topicName) if !exists { t.Errorf("Topic info should exist") } - + if info.Name != topicName { t.Errorf("Topic name mismatch: got %s, want %s", info.Name, topicName) } - + if info.Partitions != 1 { t.Errorf("Partition count mismatch: got %d, want 1", info.Partitions) } - + // Try to create again (should fail) err = handler.CreateTopic(topicName, 1) if err == nil { t.Errorf("Creating existing topic should fail") } - + // Delete the topic err = handler.DeleteTopic(topicName) if err != nil { t.Fatalf("Failed to delete topic: %v", err) } - + // Should no longer exist if handler.TopicExists(topicName) { t.Errorf("Topic %s should not exist after deletion", topicName) } - + t.Logf("Topic lifecycle test completed successfully") } // TestSeaweedMQHandler_ProduceRecord tests message production func TestSeaweedMQHandler_ProduceRecord(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + topicName := "produce-test-topic" - + // Create topic err = handler.CreateTopic(topicName, 1) if err != nil { t.Fatalf("Failed to create topic: %v", err) } defer handler.DeleteTopic(topicName) - + // Produce a record key := []byte("produce-key") value := []byte("produce-value") - + offset, err := handler.ProduceRecord(topicName, 0, key, value) if err != nil { t.Fatalf("Failed to produce record: %v", err) } - + if offset < 0 { t.Errorf("Invalid offset: %d", offset) } - + // Check ledger was updated ledger := handler.GetLedger(topicName, 0) if ledger == nil { t.Errorf("Ledger should exist after producing") } - + hwm := ledger.GetHighWaterMark() if hwm != offset+1 { t.Errorf("High water mark mismatch: got %d, want %d", hwm, offset+1) } - + t.Logf("Produced record at offset %d, HWM: %d", offset, hwm) } // TestSeaweedMQHandler_MultiplePartitions tests multiple partition handling func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + topicName := "multi-partition-test-topic" numPartitions := int32(3) - + // Create topic with multiple partitions err = handler.CreateTopic(topicName, numPartitions) if err != nil { t.Fatalf("Failed to create topic: %v", err) } defer handler.DeleteTopic(topicName) - + // Produce to different partitions for partitionID := int32(0); partitionID < numPartitions; partitionID++ { key := []byte("partition-key") value := []byte("partition-value") - + offset, err := handler.ProduceRecord(topicName, partitionID, key, value) if err != nil { t.Fatalf("Failed to produce to partition %d: %v", partitionID, err) } - + // Verify ledger ledger := handler.GetLedger(topicName, partitionID) if ledger == nil { t.Errorf("Ledger should exist for partition %d", partitionID) } - + t.Logf("Partition %d: produced at offset %d", partitionID, offset) } - + t.Logf("Multi-partition test completed successfully") } // TestSeaweedMQHandler_FetchRecords tests record fetching func TestSeaweedMQHandler_FetchRecords(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + topicName := "fetch-test-topic" - + // Create topic err = handler.CreateTopic(topicName, 1) if err != nil { t.Fatalf("Failed to create topic: %v", err) } defer handler.DeleteTopic(topicName) - + // Produce some records numRecords := 3 for i := 0; i < numRecords; i++ { key := []byte("fetch-key") value := []byte("fetch-value-" + string(rune(i))) - + _, err := handler.ProduceRecord(topicName, 0, key, value) if err != nil { t.Fatalf("Failed to produce record %d: %v", i, err) } } - + // Wait a bit for records to be available time.Sleep(100 * time.Millisecond) - + // Fetch records records, err := handler.FetchRecords(topicName, 0, 0, 1024) if err != nil { t.Fatalf("Failed to fetch records: %v", err) } - + if len(records) == 0 { t.Errorf("No records fetched") } - + t.Logf("Fetched %d bytes of record data", len(records)) - + // Test fetching beyond high water mark ledger := handler.GetLedger(topicName, 0) hwm := ledger.GetHighWaterMark() - + emptyRecords, err := handler.FetchRecords(topicName, 0, hwm, 1024) if err != nil { t.Fatalf("Failed to fetch from HWM: %v", err) } - + if len(emptyRecords) != 0 { t.Errorf("Should get empty records beyond HWM, got %d bytes", len(emptyRecords)) } - + t.Logf("Fetch test completed successfully") } // TestSeaweedMQHandler_ErrorHandling tests error conditions func TestSeaweedMQHandler_ErrorHandling(t *testing.T) { t.Skip("Integration test requires real SeaweedMQ Agent - run manually with agent available") - + handler, err := NewSeaweedMQHandler("localhost:17777") if err != nil { t.Fatalf("Failed to create SeaweedMQ handler: %v", err) } defer handler.Close() - + // Try to produce to non-existent topic _, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value")) if err == nil { t.Errorf("Producing to non-existent topic should fail") } - + // Try to fetch from non-existent topic _, err = handler.FetchRecords("non-existent-topic", 0, 0, 1024) if err == nil { t.Errorf("Fetching from non-existent topic should fail") } - + // Try to delete non-existent topic err = handler.DeleteTopic("non-existent-topic") if err == nil { t.Errorf("Deleting non-existent topic should fail") } - + t.Logf("Error handling test completed successfully") } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 9d9a37f6b..48973ada5 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" ) @@ -38,14 +39,18 @@ type Handler struct { // SeaweedMQ integration (optional, for production use) seaweedMQHandler *integration.SeaweedMQHandler useSeaweedMQ bool + + // Consumer group coordination + groupCoordinator *consumer.GroupCoordinator } // NewHandler creates a new handler in legacy in-memory mode func NewHandler() *Handler { return &Handler{ - topics: make(map[string]*TopicInfo), - ledgers: make(map[TopicPartitionKey]*offset.Ledger), - useSeaweedMQ: false, + topics: make(map[string]*TopicInfo), + ledgers: make(map[TopicPartitionKey]*offset.Ledger), + useSeaweedMQ: false, + groupCoordinator: consumer.NewGroupCoordinator(), } } @@ -55,17 +60,24 @@ func NewSeaweedMQHandler(agentAddress string) (*Handler, error) { if err != nil { return nil, err } - + return &Handler{ - topics: make(map[string]*TopicInfo), // Keep for compatibility + topics: make(map[string]*TopicInfo), // Keep for compatibility ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility seaweedMQHandler: smqHandler, useSeaweedMQ: true, + groupCoordinator: consumer.NewGroupCoordinator(), }, nil } // Close shuts down the handler and all connections func (h *Handler) Close() error { + // Close group coordinator + if h.groupCoordinator != nil { + h.groupCoordinator.Close() + } + + // Close SeaweedMQ handler if present if h.useSeaweedMQ && h.seaweedMQHandler != nil { return h.seaweedMQHandler.Close() } @@ -167,6 +179,10 @@ func (h *Handler) HandleConn(conn net.Conn) error { response, err = h.handleProduce(correlationID, messageBuf[8:]) // skip header case 1: // Fetch response, err = h.handleFetch(correlationID, messageBuf[8:]) // skip header + case 11: // JoinGroup + response, err = h.handleJoinGroup(correlationID, messageBuf[8:]) // skip header + case 14: // SyncGroup + response, err = h.handleSyncGroup(correlationID, messageBuf[8:]) // skip header default: err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion) } @@ -207,7 +223,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // Number of API keys (compact array format in newer versions, but using basic format for simplicity) - response = append(response, 0, 0, 0, 7) // 7 API keys + response = append(response, 0, 0, 0, 9) // 9 API keys // API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 18) // API key 18 @@ -244,6 +260,16 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 11) // max version 11 + // API Key 11 (JoinGroup): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 11) // API key 11 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 7) // max version 7 + + // API Key 14 (SyncGroup): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 14) // API key 14 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 5) // max version 5 + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) diff --git a/weed/mq/kafka/protocol/handler_test.go b/weed/mq/kafka/protocol/handler_test.go index 1043e6090..a4dacc122 100644 --- a/weed/mq/kafka/protocol/handler_test.go +++ b/weed/mq/kafka/protocol/handler_test.go @@ -92,8 +92,8 @@ func TestHandler_ApiVersions(t *testing.T) { // Check number of API keys numAPIKeys := binary.BigEndian.Uint32(respBuf[6:10]) - if numAPIKeys != 7 { - t.Errorf("expected 7 API keys, got: %d", numAPIKeys) + if numAPIKeys != 9 { + t.Errorf("expected 9 API keys, got: %d", numAPIKeys) } // Check API key details: api_key(2) + min_version(2) + max_version(2) @@ -229,7 +229,7 @@ func TestHandler_handleApiVersions(t *testing.T) { t.Fatalf("handleApiVersions: %v", err) } - if len(response) < 54 { // minimum expected size (now has 7 API keys) + if len(response) < 66 { // minimum expected size (now has 9 API keys) t.Fatalf("response too short: %d bytes", len(response)) } @@ -247,8 +247,8 @@ func TestHandler_handleApiVersions(t *testing.T) { // Check number of API keys numAPIKeys := binary.BigEndian.Uint32(response[6:10]) - if numAPIKeys != 7 { - t.Errorf("expected 7 API keys, got: %d", numAPIKeys) + if numAPIKeys != 9 { + t.Errorf("expected 9 API keys, got: %d", numAPIKeys) } // Check first API key (ApiVersions) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go new file mode 100644 index 000000000..03f1d70ff --- /dev/null +++ b/weed/mq/kafka/protocol/joingroup.go @@ -0,0 +1,626 @@ +package protocol + +import ( + "encoding/binary" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" +) + +// JoinGroup API (key 11) - Consumer group protocol +// Handles consumer joining a consumer group and initial coordination + +// JoinGroupRequest represents a JoinGroup request from a Kafka client +type JoinGroupRequest struct { + GroupID string + SessionTimeout int32 + RebalanceTimeout int32 + MemberID string // Empty for new members + GroupInstanceID string // Optional static membership + ProtocolType string // "consumer" for regular consumers + GroupProtocols []GroupProtocol +} + +// GroupProtocol represents a supported assignment protocol +type GroupProtocol struct { + Name string + Metadata []byte +} + +// JoinGroupResponse represents a JoinGroup response to a Kafka client +type JoinGroupResponse struct { + CorrelationID uint32 + ErrorCode int16 + GenerationID int32 + GroupProtocol string + GroupLeader string + MemberID string + Members []JoinGroupMember // Only populated for group leader +} + +// JoinGroupMember represents member info sent to group leader +type JoinGroupMember struct { + MemberID string + GroupInstanceID string + Metadata []byte +} + +// Error codes for JoinGroup +const ( + ErrorCodeNone int16 = 0 + ErrorCodeInvalidGroupID int16 = 24 + ErrorCodeUnknownMemberID int16 = 25 + ErrorCodeInvalidSessionTimeout int16 = 26 + ErrorCodeRebalanceInProgress int16 = 27 + ErrorCodeMemberIDRequired int16 = 79 + ErrorCodeFencedInstanceID int16 = 82 +) + +func (h *Handler) handleJoinGroup(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse JoinGroup request + request, err := h.parseJoinGroupRequest(requestBody) + if err != nil { + return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Validate request + if request.GroupID == "" { + return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + if !h.groupCoordinator.ValidateSessionTimeout(request.SessionTimeout) { + return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidSessionTimeout), nil + } + + // Get or create consumer group + group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) + + group.Mu.Lock() + defer group.Mu.Unlock() + + // Update group's last activity + group.LastActivity = time.Now() + + // Handle member ID logic + var memberID string + var isNewMember bool + + if request.MemberID == "" { + // New member - generate ID + memberID = h.groupCoordinator.GenerateMemberID(request.GroupInstanceID, "unknown-host") + isNewMember = true + } else { + memberID = request.MemberID + // Check if member exists + if _, exists := group.Members[memberID]; !exists { + // Member ID provided but doesn't exist - reject + return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil + } + } + + // Check group state + switch group.State { + case consumer.GroupStateEmpty, consumer.GroupStateStable: + // Can join or trigger rebalance + if isNewMember || len(group.Members) == 0 { + group.State = consumer.GroupStatePreparingRebalance + group.Generation++ + } + case consumer.GroupStatePreparingRebalance, consumer.GroupStateCompletingRebalance: + // Rebalance already in progress + // Allow join but don't change generation until SyncGroup + case consumer.GroupStateDead: + return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Create or update member + member := &consumer.GroupMember{ + ID: memberID, + ClientID: request.GroupInstanceID, + ClientHost: "unknown", // TODO: extract from connection + SessionTimeout: request.SessionTimeout, + RebalanceTimeout: request.RebalanceTimeout, + Subscription: h.extractSubscriptionFromProtocols(request.GroupProtocols), + State: consumer.MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + // Store protocol metadata for leader + if len(request.GroupProtocols) > 0 { + member.Metadata = request.GroupProtocols[0].Metadata + } + + // Add member to group + group.Members[memberID] = member + + // Update group's subscribed topics + h.updateGroupSubscription(group) + + // Select assignment protocol (prefer range, fall back to roundrobin) + groupProtocol := "range" + for _, protocol := range request.GroupProtocols { + if protocol.Name == "range" || protocol.Name == "roundrobin" { + groupProtocol = protocol.Name + break + } + } + group.Protocol = groupProtocol + + // Select group leader (first member or keep existing if still present) + if group.Leader == "" || group.Members[group.Leader] == nil { + group.Leader = memberID + } + + // Build response + response := JoinGroupResponse{ + CorrelationID: correlationID, + ErrorCode: ErrorCodeNone, + GenerationID: group.Generation, + GroupProtocol: groupProtocol, + GroupLeader: group.Leader, + MemberID: memberID, + } + + // If this member is the leader, include all member info + if memberID == group.Leader { + response.Members = make([]JoinGroupMember, 0, len(group.Members)) + for _, m := range group.Members { + response.Members = append(response.Members, JoinGroupMember{ + MemberID: m.ID, + GroupInstanceID: m.ClientID, + Metadata: m.Metadata, + }) + } + } + + return h.buildJoinGroupResponse(response), nil +} + +func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) { + if len(data) < 8 { + return nil, fmt.Errorf("request too short") + } + + offset := 0 + + // GroupID (string) + groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+groupIDLength > len(data) { + return nil, fmt.Errorf("invalid group ID length") + } + groupID := string(data[offset : offset+groupIDLength]) + offset += groupIDLength + + // Session timeout (4 bytes) + if offset+4 > len(data) { + return nil, fmt.Errorf("missing session timeout") + } + sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:])) + offset += 4 + + // Rebalance timeout (4 bytes) - for newer versions + rebalanceTimeout := sessionTimeout // Default to session timeout + if offset+4 <= len(data) { + rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:])) + offset += 4 + } + + // MemberID (string) + if offset+2 > len(data) { + return nil, fmt.Errorf("missing member ID length") + } + memberIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + memberID := "" + if memberIDLength > 0 { + if offset+memberIDLength > len(data) { + return nil, fmt.Errorf("invalid member ID length") + } + memberID = string(data[offset : offset+memberIDLength]) + offset += memberIDLength + } + + // For simplicity, we'll assume basic protocol parsing + // In a full implementation, we'd parse the protocol type and protocols array + + return &JoinGroupRequest{ + GroupID: groupID, + SessionTimeout: sessionTimeout, + RebalanceTimeout: rebalanceTimeout, + MemberID: memberID, + ProtocolType: "consumer", + GroupProtocols: []GroupProtocol{ + {Name: "range", Metadata: []byte{}}, + }, + }, nil +} + +func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { + // Estimate response size + estimatedSize := 32 + len(response.GroupProtocol) + len(response.GroupLeader) + len(response.MemberID) + for _, member := range response.Members { + estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8 + } + + result := make([]byte, 0, estimatedSize) + + // Correlation ID (4 bytes) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) + result = append(result, correlationIDBytes...) + + // Error code (2 bytes) + errorCodeBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) + result = append(result, errorCodeBytes...) + + // Generation ID (4 bytes) + generationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(generationBytes, uint32(response.GenerationID)) + result = append(result, generationBytes...) + + // Group protocol (string) + protocolLength := make([]byte, 2) + binary.BigEndian.PutUint16(protocolLength, uint16(len(response.GroupProtocol))) + result = append(result, protocolLength...) + result = append(result, []byte(response.GroupProtocol)...) + + // Group leader (string) + leaderLength := make([]byte, 2) + binary.BigEndian.PutUint16(leaderLength, uint16(len(response.GroupLeader))) + result = append(result, leaderLength...) + result = append(result, []byte(response.GroupLeader)...) + + // Member ID (string) + memberIDLength := make([]byte, 2) + binary.BigEndian.PutUint16(memberIDLength, uint16(len(response.MemberID))) + result = append(result, memberIDLength...) + result = append(result, []byte(response.MemberID)...) + + // Members array (4 bytes count + members) + memberCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(memberCountBytes, uint32(len(response.Members))) + result = append(result, memberCountBytes...) + + for _, member := range response.Members { + // Member ID (string) + memberLength := make([]byte, 2) + binary.BigEndian.PutUint16(memberLength, uint16(len(member.MemberID))) + result = append(result, memberLength...) + result = append(result, []byte(member.MemberID)...) + + // Group instance ID (string) - can be empty + instanceIDLength := make([]byte, 2) + binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) + result = append(result, instanceIDLength...) + if len(member.GroupInstanceID) > 0 { + result = append(result, []byte(member.GroupInstanceID)...) + } + + // Metadata (bytes) + metadataLength := make([]byte, 4) + binary.BigEndian.PutUint32(metadataLength, uint32(len(member.Metadata))) + result = append(result, metadataLength...) + result = append(result, member.Metadata...) + } + + // Throttle time (4 bytes, 0 = no throttling) + result = append(result, 0, 0, 0, 0) + + return result +} + +func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode int16) []byte { + response := JoinGroupResponse{ + CorrelationID: correlationID, + ErrorCode: errorCode, + GenerationID: -1, + GroupProtocol: "", + GroupLeader: "", + MemberID: "", + Members: []JoinGroupMember{}, + } + + return h.buildJoinGroupResponse(response) +} + +func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string { + // For simplicity, return a default subscription + // In a real implementation, we'd parse the protocol metadata to extract subscribed topics + return []string{"test-topic"} +} + +func (h *Handler) updateGroupSubscription(group *consumer.ConsumerGroup) { + // Update group's subscribed topics from all members + group.SubscribedTopics = make(map[string]bool) + for _, member := range group.Members { + for _, topic := range member.Subscription { + group.SubscribedTopics[topic] = true + } + } +} + +// SyncGroup API (key 14) - Consumer group coordination completion +// Called by group members after JoinGroup to get partition assignments + +// SyncGroupRequest represents a SyncGroup request from a Kafka client +type SyncGroupRequest struct { + GroupID string + GenerationID int32 + MemberID string + GroupInstanceID string + GroupAssignments []GroupAssignment // Only from group leader +} + +// GroupAssignment represents partition assignment for a group member +type GroupAssignment struct { + MemberID string + Assignment []byte // Serialized assignment data +} + +// SyncGroupResponse represents a SyncGroup response to a Kafka client +type SyncGroupResponse struct { + CorrelationID uint32 + ErrorCode int16 + Assignment []byte // Serialized partition assignment for this member +} + +// Additional error codes for SyncGroup +const ( + ErrorCodeIllegalGeneration int16 = 22 + ErrorCodeInconsistentGroupProtocol int16 = 23 +) + +func (h *Handler) handleSyncGroup(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse SyncGroup request + request, err := h.parseSyncGroupRequest(requestBody) + if err != nil { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Validate request + if request.GroupID == "" || request.MemberID == "" { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + // Get consumer group + group := h.groupCoordinator.GetGroup(request.GroupID) + if group == nil { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil + } + + group.Mu.Lock() + defer group.Mu.Unlock() + + // Update group's last activity + group.LastActivity = time.Now() + + // Validate member exists + member, exists := group.Members[request.MemberID] + if !exists { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeUnknownMemberID), nil + } + + // Validate generation + if request.GenerationID != group.Generation { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeIllegalGeneration), nil + } + + // Check if this is the group leader with assignments + if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 { + // Leader is providing assignments - process and store them + err = h.processGroupAssignments(group, request.GroupAssignments) + if err != nil { + return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol), nil + } + + // Move group to stable state + group.State = consumer.GroupStateStable + + // Mark all members as stable + for _, m := range group.Members { + m.State = consumer.MemberStateStable + } + } else if group.State == consumer.GroupStateCompletingRebalance { + // Non-leader member waiting for assignments + // Assignments should already be processed by leader + } else { + // Trigger partition assignment using built-in strategy + topicPartitions := h.getTopicPartitions(group) + group.AssignPartitions(topicPartitions) + + group.State = consumer.GroupStateStable + for _, m := range group.Members { + m.State = consumer.MemberStateStable + } + } + + // Get assignment for this member + assignment := h.serializeMemberAssignment(member.Assignment) + + // Build response + response := SyncGroupResponse{ + CorrelationID: correlationID, + ErrorCode: ErrorCodeNone, + Assignment: assignment, + } + + return h.buildSyncGroupResponse(response), nil +} + +func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) { + if len(data) < 8 { + return nil, fmt.Errorf("request too short") + } + + offset := 0 + + // GroupID (string) + groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+groupIDLength > len(data) { + return nil, fmt.Errorf("invalid group ID length") + } + groupID := string(data[offset : offset+groupIDLength]) + offset += groupIDLength + + // Generation ID (4 bytes) + if offset+4 > len(data) { + return nil, fmt.Errorf("missing generation ID") + } + generationID := int32(binary.BigEndian.Uint32(data[offset:])) + offset += 4 + + // MemberID (string) + if offset+2 > len(data) { + return nil, fmt.Errorf("missing member ID length") + } + memberIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + if offset+memberIDLength > len(data) { + return nil, fmt.Errorf("invalid member ID length") + } + memberID := string(data[offset : offset+memberIDLength]) + offset += memberIDLength + + // For simplicity, we'll parse basic fields + // In a full implementation, we'd parse the full group assignments array + + return &SyncGroupRequest{ + GroupID: groupID, + GenerationID: generationID, + MemberID: memberID, + GroupInstanceID: "", + GroupAssignments: []GroupAssignment{}, + }, nil +} + +func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse) []byte { + estimatedSize := 16 + len(response.Assignment) + result := make([]byte, 0, estimatedSize) + + // Correlation ID (4 bytes) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) + result = append(result, correlationIDBytes...) + + // Error code (2 bytes) + errorCodeBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) + result = append(result, errorCodeBytes...) + + // Assignment (bytes) + assignmentLength := make([]byte, 4) + binary.BigEndian.PutUint32(assignmentLength, uint32(len(response.Assignment))) + result = append(result, assignmentLength...) + result = append(result, response.Assignment...) + + // Throttle time (4 bytes, 0 = no throttling) + result = append(result, 0, 0, 0, 0) + + return result +} + +func (h *Handler) buildSyncGroupErrorResponse(correlationID uint32, errorCode int16) []byte { + response := SyncGroupResponse{ + CorrelationID: correlationID, + ErrorCode: errorCode, + Assignment: []byte{}, + } + + return h.buildSyncGroupResponse(response) +} + +func (h *Handler) processGroupAssignments(group *consumer.ConsumerGroup, assignments []GroupAssignment) error { + // In a full implementation, we'd deserialize the assignment data + // and update each member's partition assignment + // For now, we'll trigger our own assignment logic + + topicPartitions := h.getTopicPartitions(group) + group.AssignPartitions(topicPartitions) + + return nil +} + +func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][]int32 { + topicPartitions := make(map[string][]int32) + + // Get partition info for all subscribed topics + for topic := range group.SubscribedTopics { + // Check if topic exists in our topic registry + h.topicsMu.RLock() + topicInfo, exists := h.topics[topic] + h.topicsMu.RUnlock() + + if exists { + // Create partition list for this topic + partitions := make([]int32, topicInfo.Partitions) + for i := int32(0); i < topicInfo.Partitions; i++ { + partitions[i] = i + } + topicPartitions[topic] = partitions + } else { + // Default to single partition if topic not found + topicPartitions[topic] = []int32{0} + } + } + + return topicPartitions +} + +func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssignment) []byte { + // Build a simple serialized format for partition assignments + // Format: version(2) + num_topics(4) + topics... + // For each topic: topic_name_len(2) + topic_name + num_partitions(4) + partitions... + + if len(assignments) == 0 { + return []byte{0, 1, 0, 0, 0, 0} // Version 1, 0 topics + } + + // Group assignments by topic + topicAssignments := make(map[string][]int32) + for _, assignment := range assignments { + topicAssignments[assignment.Topic] = append(topicAssignments[assignment.Topic], assignment.Partition) + } + + result := make([]byte, 0, 64) + + // Version (2 bytes) - use version 1 + result = append(result, 0, 1) + + // Number of topics (4 bytes) + numTopicsBytes := make([]byte, 4) + binary.BigEndian.PutUint32(numTopicsBytes, uint32(len(topicAssignments))) + result = append(result, numTopicsBytes...) + + // Topics + for topic, partitions := range topicAssignments { + // Topic name length (2 bytes) + topicLenBytes := make([]byte, 2) + binary.BigEndian.PutUint16(topicLenBytes, uint16(len(topic))) + result = append(result, topicLenBytes...) + + // Topic name + result = append(result, []byte(topic)...) + + // Number of partitions (4 bytes) + numPartitionsBytes := make([]byte, 4) + binary.BigEndian.PutUint32(numPartitionsBytes, uint32(len(partitions))) + result = append(result, numPartitionsBytes...) + + // Partitions (4 bytes each) + for _, partition := range partitions { + partitionBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionBytes, uint32(partition)) + result = append(result, partitionBytes...) + } + } + + // User data length (4 bytes) - no user data + result = append(result, 0, 0, 0, 0) + + return result +} diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index c911f01e5..3a341214a 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -9,104 +9,104 @@ import ( func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal Produce request // Request format: client_id + acks(2) + timeout(4) + topics_array - + if len(requestBody) < 8 { // client_id_size(2) + acks(2) + timeout(4) return nil, fmt.Errorf("Produce request too short") } - + // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - + if len(requestBody) < offset+10 { // acks(2) + timeout(4) + topics_count(4) return nil, fmt.Errorf("Produce request missing data") } - + // Parse acks and timeout acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 _ = timeout // unused for now - + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + response := make([]byte, 0, 1024) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) - + // Process each topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } - + // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - + if len(requestBody) < offset+int(topicNameSize)+4 { break } - + topicName := string(requestBody[offset : offset+int(topicNameSize)]) offset += int(topicNameSize) - + // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + // Check if topic exists h.topicsMu.RLock() _, topicExists := h.topics[topicName] h.topicsMu.RUnlock() - + // Response: topic_name_size(2) + topic_name + partitions_array response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, []byte(topicName)...) - + partitionsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount) response = append(response, partitionsCountBytes...) - + // Process each partition for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ { if len(requestBody) < offset+8 { break } - + // Parse partition: partition_id(4) + record_set_size(4) + record_set partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + if len(requestBody) < offset+int(recordSetSize) { break } - + recordSetData := requestBody[offset : offset+int(recordSetSize)] offset += int(recordSetSize) - + // Response: partition_id(4) + error_code(2) + base_offset(8) + log_append_time(8) + log_start_offset(8) partitionIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionIDBytes, partitionID) response = append(response, partitionIDBytes...) - + var errorCode uint16 = 0 var baseOffset int64 = 0 currentTime := time.Now().UnixNano() - + if !topicExists { errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION } else { @@ -127,7 +127,7 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt // Use legacy in-memory mode for tests ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) baseOffset = ledger.AssignOffsets(int64(recordCount)) - + // Append each record to the ledger avgSize := totalSize / recordCount for k := int64(0); k < int64(recordCount); k++ { @@ -136,35 +136,35 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt } } } - + // Error code response = append(response, byte(errorCode>>8), byte(errorCode)) - + // Base offset (8 bytes) baseOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) response = append(response, baseOffsetBytes...) - + // Log append time (8 bytes) - timestamp when appended logAppendTimeBytes := make([]byte, 8) binary.BigEndian.PutUint64(logAppendTimeBytes, uint64(currentTime)) response = append(response, logAppendTimeBytes...) - + // Log start offset (8 bytes) - same as base for now logStartOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(logStartOffsetBytes, uint64(baseOffset)) response = append(response, logStartOffsetBytes...) } } - + // Add throttle time at the end (4 bytes) response = append(response, 0, 0, 0, 0) - + // If acks=0, return empty response (fire and forget) if acks == 0 { return []byte{}, nil } - + return response, nil } @@ -174,24 +174,24 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total if len(recordSetData) < 12 { // minimum record set size return 0, 0, fmt.Errorf("record set too small") } - + // For Phase 1, we'll do a very basic parse to count records // In a full implementation, this would parse the record batch format properly - + // Record batch header: base_offset(8) + length(4) + partition_leader_epoch(4) + magic(1) + ... if len(recordSetData) < 17 { return 0, 0, fmt.Errorf("invalid record batch header") } - + // Skip to record count (at offset 16 in record batch) if len(recordSetData) < 20 { // Assume single record for very small batches return 1, int32(len(recordSetData)), nil } - + // Try to read record count from the batch header recordCount = int32(binary.BigEndian.Uint32(recordSetData[16:20])) - + // Validate record count is reasonable if recordCount <= 0 || recordCount > 1000000 { // sanity check // Fallback to estimating based on size @@ -201,7 +201,7 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total } return estimatedCount, int32(len(recordSetData)), nil } - + return recordCount, int32(len(recordSetData)), nil } @@ -209,10 +209,10 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) { // For Phase 2, we'll extract a simple key-value from the record set // In a full implementation, this would parse the entire batch properly - + // Extract first record from record set (simplified) key, value := h.extractFirstRecord(recordSetData) - + // Publish to SeaweedMQ return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) } @@ -221,14 +221,14 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) { // For Phase 2, create a simple placeholder record // This represents what would be extracted from the actual Kafka record batch - + key := []byte("kafka-key") value := fmt.Sprintf("kafka-message-data-%d", time.Now().UnixNano()) - + // In a real implementation, this would: // 1. Parse the record batch header // 2. Extract individual records with proper key/value/timestamp // 3. Handle compression, transaction markers, etc. - + return key, []byte(value) }