diff --git a/test/kafka/connection_close_debug_test.go b/test/kafka/connection_close_debug_test.go new file mode 100644 index 000000000..1c666d26b --- /dev/null +++ b/test/kafka/connection_close_debug_test.go @@ -0,0 +1,87 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestConnectionCloseDebug captures the exact moment kafka-go closes the connection +func TestConnectionCloseDebug(t *testing.T) { + // Start gateway server + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: ":0", // random port + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + // Get the actual listening address + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", brokerAddr) + + // Get handler and configure it + handler := gatewayServer.GetHandler() + handler.SetBrokerAddress(host, port) + + // Add test topic + topicName := "close-debug-topic" + handler.AddTopicForTesting(topicName, 1) + + t.Log("=== Testing connection close timing ===") + + // Create a custom dialer that logs connection events + dialer := &kafka.Dialer{ + Timeout: 5 * time.Second, + Resolver: &net.Resolver{}, + } + + // Create reader with very short timeouts to see the pattern quickly + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: "close-debug-group", + MinBytes: 1, + MaxBytes: 10e6, + MaxWait: 1 * time.Second, // Very short wait + Dialer: dialer, + }) + defer reader.Close() + + // Try to read with a very short timeout + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + t.Log("Starting ReadMessage - this should trigger the connection close pattern...") + + _, err := reader.ReadMessage(ctx) + if err != nil { + t.Logf("ReadMessage failed (expected): %v", err) + t.Logf("Error type: %T", err) + + // Check if it's a specific type of error that gives us clues + if netErr, ok := err.(net.Error); ok { + t.Logf("Network error - Timeout: %v, Temporary: %v", netErr.Timeout(), netErr.Temporary()) + } + } else { + t.Log("ReadMessage succeeded unexpectedly") + } + + t.Log("=== Connection close debug completed ===") + + // The key insight is in the debug logs above - we should see the exact pattern + // of when kafka-go closes connections after JoinGroup responses +} diff --git a/test/kafka/metadata_format_test.go b/test/kafka/metadata_format_test.go new file mode 100644 index 000000000..564e85499 --- /dev/null +++ b/test/kafka/metadata_format_test.go @@ -0,0 +1,87 @@ +package kafka + +import ( + "encoding/binary" + "fmt" + "testing" +) + +// TestMetadataFormat tests different metadata formats to find kafka-go compatibility +func TestMetadataFormat(t *testing.T) { + // Test different subscription metadata formats that kafka-go might expect + + t.Log("=== Testing different subscription metadata formats ===") + + // Format 1: Our current format (version 0, topics, userdata) + format1 := generateSubscriptionMetadata([]string{"test-topic"}, 0) + t.Logf("Format 1 (current): %d bytes: %x", len(format1), format1) + + // Format 2: Version 1 format (might include owned partitions) + format2 := generateSubscriptionMetadata([]string{"test-topic"}, 1) + t.Logf("Format 2 (version 1): %d bytes: %x", len(format2), format2) + + // Format 3: Empty metadata (let kafka-go handle it) + format3 := []byte{} + t.Logf("Format 3 (empty): %d bytes: %x", len(format3), format3) + + // Format 4: Minimal valid metadata + format4 := []byte{0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x09, 't', 'e', 's', 't', '-', 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x00} + t.Logf("Format 4 (minimal): %d bytes: %x", len(format4), format4) + + // Test each format by creating a modified JoinGroup handler + for i, metadata := range [][]byte{format1, format2, format3, format4} { + t.Logf("\n--- Testing Format %d ---", i+1) + success := testMetadataFormat(t, metadata, fmt.Sprintf("format-%d", i+1)) + if success { + t.Logf("✅ Format %d might be compatible!", i+1) + } else { + t.Logf("❌ Format %d rejected by kafka-go", i+1) + } + } +} + +func generateSubscriptionMetadata(topics []string, version int) []byte { + metadata := make([]byte, 0, 64) + + // Version (2 bytes) + metadata = append(metadata, byte(version>>8), byte(version)) + + // Topics count (4 bytes) + topicsCount := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCount, uint32(len(topics))) + metadata = append(metadata, topicsCount...) + + // Topics (string array) + for _, topic := range topics { + topicLen := make([]byte, 2) + binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) + metadata = append(metadata, topicLen...) + metadata = append(metadata, []byte(topic)...) + } + + if version >= 1 { + // OwnedPartitions (for version 1+) - empty for now + metadata = append(metadata, 0x00, 0x00, 0x00, 0x00) // empty owned partitions + } + + // UserData (4 bytes length + data) + metadata = append(metadata, 0x00, 0x00, 0x00, 0x00) // empty user data + + return metadata +} + +func testMetadataFormat(t *testing.T, metadata []byte, testName string) bool { + // This is a placeholder for testing different metadata formats + // In a real test, we'd: + // 1. Start a gateway with modified JoinGroup handler that uses this metadata + // 2. Connect with kafka-go consumer + // 3. Check if it proceeds to SyncGroup + + // For now, just log the format + t.Logf("Testing %s with metadata: %x", testName, metadata) + + // TODO: Implement actual kafka-go integration test + // This would require modifying the JoinGroup handler to use specific metadata + + return false // Placeholder +} diff --git a/weed/mq/kafka/consumer/group_coordinator.go b/weed/mq/kafka/consumer/group_coordinator.go index e7e8eab3f..32249c54e 100644 --- a/weed/mq/kafka/consumer/group_coordinator.go +++ b/weed/mq/kafka/consumer/group_coordinator.go @@ -190,10 +190,10 @@ func (gc *GroupCoordinator) ListGroups() []string { // GenerateMemberID creates a deterministic member ID based on client info func (gc *GroupCoordinator) GenerateMemberID(clientID, clientHost string) string { - // Use hash of client info to create deterministic member ID - // This ensures the same client gets the same member ID across calls + // EXPERIMENT: Use simpler member ID format like real Kafka brokers + // Real Kafka uses format like: "consumer-1-uuid" or "consumer-groupId-uuid" hash := fmt.Sprintf("%x", sha256.Sum256([]byte(clientID+"-"+clientHost))) - return fmt.Sprintf("%s-%s-%s", clientID, clientHost, hash[:8]) + return fmt.Sprintf("consumer-%s", hash[:16]) // Shorter, simpler format } // ValidateSessionTimeout checks if session timeout is within acceptable range diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 7af218514..dfa70e988 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -175,37 +175,15 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque JoinedAt: time.Now(), } - // Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata + // Store protocol metadata for leader - EXPERIMENT: Use client's metadata as-is if len(request.GroupProtocols) > 0 { - // If client sends empty metadata, generate subscription metadata for available topics + // EXPERIMENT: Always use client's metadata, even if empty + member.Metadata = request.GroupProtocols[0].Metadata + fmt.Printf("DEBUG: JoinGroup using client metadata as-is (%d bytes): %x\n", len(member.Metadata), member.Metadata) + + // If client sends empty metadata, that might be intentional if len(request.GroupProtocols[0].Metadata) == 0 { - // Generate subscription metadata for all available topics - // Format: version(2) + topics_count(4) + topics[] - availableTopics := h.getAvailableTopics() - fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) - - metadata := make([]byte, 0, 64) - // Version (2 bytes) - use version 0 to exclude OwnedPartitions - metadata = append(metadata, 0, 0) - // Topics count (4 bytes) - topicsCount := make([]byte, 4) - binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics))) - metadata = append(metadata, topicsCount...) - // Topics (string array) - for _, topic := range availableTopics { - topicLen := make([]byte, 2) - binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) - metadata = append(metadata, topicLen...) - metadata = append(metadata, []byte(topic)...) - } - // UserData (nullable bytes) - encode empty (length 0) - userDataLen := make([]byte, 4) - binary.BigEndian.PutUint32(userDataLen, 0) - metadata = append(metadata, userDataLen...) - member.Metadata = metadata - fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) - } else { - member.Metadata = request.GroupProtocols[0].Metadata + fmt.Printf("DEBUG: JoinGroup client sent empty metadata - using as-is (kafka-go might handle this)\n") } } @@ -263,7 +241,8 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque fmt.Printf("DEBUG: JoinGroup member '%s' is NOT the leader (leader is '%s'), empty members array\n", memberID, group.Leader) } - return h.buildJoinGroupResponse(response), nil + // EXPERIMENT: Return minimal hardcoded response to test kafka-go compatibility + return h.buildMinimalJoinGroupResponse(correlationID, apiVersion), nil } func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) { @@ -453,6 +432,54 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in return h.buildJoinGroupResponse(response) } +// buildMinimalJoinGroupResponse creates a minimal hardcoded response for testing +func (h *Handler) buildMinimalJoinGroupResponse(correlationID uint32, apiVersion uint16) []byte { + // Create the absolute minimal JoinGroup response that should work with kafka-go + response := make([]byte, 0, 64) + + // Correlation ID (4 bytes) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Throttle time (4 bytes) - v2+ only + if apiVersion >= 2 { + response = append(response, 0, 0, 0, 0) // No throttling + } + + // Error code (2 bytes) - 0 = success + response = append(response, 0, 0) + + // Generation ID (4 bytes) - use 1 + response = append(response, 0, 0, 0, 1) + + // Group protocol (STRING) - "range" + response = append(response, 0, 5) // length + response = append(response, []byte("range")...) + + // Group leader (STRING) - "test-member" + response = append(response, 0, 11) // length + response = append(response, []byte("test-member")...) + + // Member ID (STRING) - "test-member" (same as leader) + response = append(response, 0, 11) // length + response = append(response, []byte("test-member")...) + + // Members array (4 bytes count + members) + response = append(response, 0, 0, 0, 1) // 1 member + + // Member 0: + // Member ID (STRING) - "test-member" + response = append(response, 0, 11) // length + response = append(response, []byte("test-member")...) + + // Member metadata (BYTES) - empty + response = append(response, 0, 0, 0, 0) // 0 bytes + + fmt.Printf("DEBUG: JoinGroup minimal response (%d bytes): %x\n", len(response), response) + return response +} + func (h *Handler) extractSubscriptionFromProtocols(protocols []GroupProtocol) []string { // TODO: CRITICAL - Consumer subscription extraction is hardcoded to "test-topic" // This breaks real Kafka consumers which send their actual subscriptions