diff --git a/test/kafka/debug_consumer_group_test.go b/test/kafka/debug_consumer_group_test.go new file mode 100644 index 000000000..8ed15d476 --- /dev/null +++ b/test/kafka/debug_consumer_group_test.go @@ -0,0 +1,68 @@ +package kafka + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" +) + +func TestDebugConsumerGroupWorkflow(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("debug-topic", 1) + t.Logf("Added topic: debug-topic") + + // Create a simple consumer that will trigger the consumer group workflow + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{addr}, + Topic: "debug-topic", + GroupID: "debug-group", + MinBytes: 1, + MaxBytes: 1024, + }) + defer reader.Close() + + // Try to read a message (this will trigger the consumer group workflow) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + t.Logf("=== Starting consumer group workflow ===") + + // This should trigger: FindCoordinator -> JoinGroup -> (assignTopicPartitions -> readPartitions) -> SyncGroup + _, err := reader.ReadMessage(ctx) + + if err != nil { + if err == context.DeadlineExceeded { + t.Logf("Expected timeout - checking if SyncGroup was called") + } else { + t.Logf("ReadMessage error: %v", err) + } + } else { + t.Logf("Unexpected success - message read") + } + + t.Logf("=== Consumer group workflow completed ===") +} diff --git a/test/kafka/debug_readpartitions_test.go b/test/kafka/debug_readpartitions_test.go new file mode 100644 index 000000000..35bf6b2b3 --- /dev/null +++ b/test/kafka/debug_readpartitions_test.go @@ -0,0 +1,58 @@ +package kafka + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" +) + +func TestDebugReadPartitions(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("readpartitions-topic", 1) + t.Logf("Added topic: readpartitions-topic") + + // Test direct readPartitions call (this is what assignTopicPartitions calls) + conn, err := kafka.Dial("tcp", addr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + t.Logf("=== Testing direct readPartitions call ===") + + // This is the exact call that assignTopicPartitions makes + partitions, err := conn.ReadPartitions("readpartitions-topic") + + if err != nil { + t.Logf("❌ ReadPartitions failed: %v", err) + t.Logf("This explains why kafka-go disconnects after JoinGroup!") + } else { + t.Logf("✅ ReadPartitions succeeded: %d partitions", len(partitions)) + for i, p := range partitions { + t.Logf(" Partition[%d]: Topic=%s, ID=%d, Leader=%s:%d", i, p.Topic, p.ID, p.Leader.Host, p.Leader.Port) + } + } +} diff --git a/test/kafka/go.mod b/test/kafka/go.mod new file mode 100644 index 000000000..88ddd6861 --- /dev/null +++ b/test/kafka/go.mod @@ -0,0 +1,37 @@ +module github.com/seaweedfs/seaweedfs/test/kafka + +go 1.24.0 + +toolchain go1.24.7 + +require ( + github.com/IBM/sarama v1.46.0 + github.com/seaweedfs/seaweedfs v0.0.0-00010101000000-000000000000 + github.com/segmentio/kafka-go v0.4.49 +) + +replace github.com/seaweedfs/seaweedfs => ../../ + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.36.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/grpc v1.75.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) diff --git a/test/kafka/go.sum b/test/kafka/go.sum new file mode 100644 index 000000000..006abbc89 --- /dev/null +++ b/test/kafka/go.sum @@ -0,0 +1,134 @@ +github.com/IBM/sarama v1.46.0 h1:+YTM1fNd6WKMchlnLKRUB5Z0qD4M8YbvwIIPLvJD53s= +github.com/IBM/sarama v1.46.0/go.mod h1:0lOcuQziJ1/mBGHkdp5uYrltqQuKQKM5O5FOWUQVVvo= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= +github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c h1:qXWI/sQtv5UKboZ/zUk7h+mrf/lXORyI+n9DKDAusdg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= +google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/kafka/kafka_go_internal_debug_test.go b/test/kafka/kafka_go_internal_debug_test.go new file mode 100644 index 000000000..9ba007476 --- /dev/null +++ b/test/kafka/kafka_go_internal_debug_test.go @@ -0,0 +1,187 @@ +package kafka + +import ( + "bufio" + "bytes" + "fmt" + "reflect" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" +) + +// TestKafkaGoInternalDebug attempts to debug kafka-go's internal parsing by intercepting the read operations +func TestKafkaGoInternalDebug(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go gatewayServer.Start() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("internal-debug-topic", 1) + + // Test: Manually simulate what kafka-go does + t.Logf("=== Simulating kafka-go ReadPartitions workflow ===") + + conn, err := kafka.Dial("tcp", addr) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + defer conn.Close() + + // Get the underlying connection to intercept reads + t.Logf("Testing with manual response capture...") + + // Try to capture the exact response bytes that kafka-go receives + testManualMetadataRequest(addr, t) +} + +func testManualMetadataRequest(addr string, t *testing.T) { + // Create a raw TCP connection to capture exact bytes + conn, err := kafka.Dial("tcp", addr) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + defer conn.Close() + + t.Logf("=== Manual Metadata Request Test ===") + + // First, let's see what happens when we call ReadPartitions and capture any intermediate state + // We'll use reflection to access internal fields if possible + + // Try to access the internal reader + connValue := reflect.ValueOf(conn).Elem() + t.Logf("Connection type: %T", conn) + t.Logf("Connection fields: %d", connValue.NumField()) + + for i := 0; i < connValue.NumField(); i++ { + field := connValue.Type().Field(i) + if field.Name == "rbuf" || field.Name == "rlock" { + t.Logf("Found field: %s (type: %s)", field.Name, field.Type) + } + } + + // Try ReadPartitions with detailed error capture + t.Logf("Calling ReadPartitions...") + partitions, err := conn.ReadPartitions("internal-debug-topic") + if err != nil { + t.Logf("ReadPartitions failed: %v", err) + + // Try to get more details about the error + t.Logf("Error type: %T", err) + t.Logf("Error string: %s", err.Error()) + + // Check if it's related to bufio + if err.Error() == "multiple Read calls return no data or error" { + t.Logf("This is the bufio.Reader error we're looking for!") + t.Logf("This typically means the underlying connection is closed or not providing data") + } + + return + } + + t.Logf("ReadPartitions succeeded: %d partitions", len(partitions)) +} + +// TestBufferedReaderBehavior tests how bufio.Reader behaves with our response +func TestBufferedReaderBehavior(t *testing.T) { + // Create a sample Metadata v1 response like our gateway sends + sampleResponse := []byte{ + // Correlation ID (4 bytes) + 0x00, 0x00, 0x00, 0x02, + // Brokers count (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Broker: NodeID (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Host length (2 bytes) = 9 + 0x00, 0x09, + // Host "127.0.0.1" + 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, + // Port (4 bytes) = 50000 + 0x00, 0x00, 0xc3, 0x50, + // Rack (2 bytes) = 0 (empty string) + 0x00, 0x00, + // Controller ID (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Topics count (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Topic: Error code (2 bytes) = 0 + 0x00, 0x00, + // Topic name length (2 bytes) = 19 + 0x00, 0x13, + // Topic name "internal-debug-topic" + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2d, 0x64, 0x65, 0x62, 0x75, 0x67, 0x2d, 0x74, 0x6f, 0x70, 0x69, 0x63, + // IsInternal (1 byte) = 0 + 0x00, + // Partitions count (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Partition: Error code (2 bytes) = 0 + 0x00, 0x00, + // Partition ID (4 bytes) = 0 + 0x00, 0x00, 0x00, 0x00, + // Leader ID (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Replicas count (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // Replica ID (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // ISR count (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + // ISR ID (4 bytes) = 1 + 0x00, 0x00, 0x00, 0x01, + } + + t.Logf("Sample response length: %d bytes", len(sampleResponse)) + t.Logf("Sample response hex: %x", sampleResponse) + + // Test reading this with bufio.Reader + reader := bufio.NewReader(bytes.NewReader(sampleResponse)) + + // Try to read correlation ID + correlationBytes, err := reader.Peek(4) + if err != nil { + t.Errorf("Failed to peek correlation ID: %v", err) + return + } + t.Logf("Correlation ID bytes: %x", correlationBytes) + + // Discard the correlation ID + n, err := reader.Discard(4) + if err != nil { + t.Errorf("Failed to discard correlation ID: %v", err) + return + } + t.Logf("Discarded %d bytes", n) + + // Try to read brokers count + brokersBytes, err := reader.Peek(4) + if err != nil { + t.Errorf("Failed to peek brokers count: %v", err) + return + } + t.Logf("Brokers count bytes: %x", brokersBytes) + + // Continue reading to see if we can parse the entire response + remaining := reader.Buffered() + t.Logf("Remaining buffered bytes: %d", remaining) + + // Read all remaining bytes + allBytes, err := reader.ReadBytes(0x01) // Read until we find a 0x01 byte (which should be common) + if err != nil { + t.Logf("ReadBytes error (expected): %v", err) + } + t.Logf("Read %d bytes before error", len(allBytes)) +} diff --git a/test/kafka/network_capture_test.go b/test/kafka/network_capture_test.go new file mode 100644 index 000000000..1979e37c9 --- /dev/null +++ b/test/kafka/network_capture_test.go @@ -0,0 +1,308 @@ +package kafka + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestNetworkCapture captures the exact bytes sent over the network +func TestNetworkCapture(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go gatewayServer.Start() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("capture-topic", 1) + + // Test: Capture exact network traffic + testNetworkTraffic(addr, t) +} + +func testNetworkTraffic(addr string, t *testing.T) { + // Create raw TCP connection + conn, err := net.Dial("tcp", addr) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + defer conn.Close() + + // Send ApiVersions request first + t.Logf("=== Sending ApiVersions Request ===") + apiVersionsReq := buildRawApiVersionsRequest() + t.Logf("ApiVersions request (%d bytes): %x", len(apiVersionsReq), apiVersionsReq) + + if _, err := conn.Write(apiVersionsReq); err != nil { + t.Fatalf("Failed to send ApiVersions: %v", err) + } + + // Read ApiVersions response + apiVersionsResp, err := readRawResponse(conn, t) + if err != nil { + t.Fatalf("Failed to read ApiVersions response: %v", err) + } + t.Logf("ApiVersions response (%d bytes): %x", len(apiVersionsResp), apiVersionsResp) + + // Send Metadata v1 request + t.Logf("=== Sending Metadata v1 Request ===") + metadataReq := buildRawMetadataV1Request([]string{"capture-topic"}) + t.Logf("Metadata request (%d bytes): %x", len(metadataReq), metadataReq) + + if _, err := conn.Write(metadataReq); err != nil { + t.Fatalf("Failed to send Metadata: %v", err) + } + + // Read Metadata response with detailed analysis + metadataResp, err := readRawResponse(conn, t) + if err != nil { + t.Fatalf("Failed to read Metadata response: %v", err) + } + t.Logf("Metadata response (%d bytes): %x", len(metadataResp), metadataResp) + + // Analyze the response structure + analyzeMetadataResponse(metadataResp, t) +} + +func buildRawApiVersionsRequest() []byte { + // Build ApiVersions request manually + clientID := "test-client" + + // Calculate payload size: API key (2) + version (2) + correlation ID (4) + client ID length (2) + client ID + payloadSize := 2 + 2 + 4 + 2 + len(clientID) + + req := make([]byte, 4) // Start with message size + + // Message size + binary.BigEndian.PutUint32(req[0:4], uint32(payloadSize)) + + // API key (ApiVersions = 18) + req = append(req, 0, 18) + // Version + req = append(req, 0, 0) + // Correlation ID + req = append(req, 0, 0, 0, 1) + // Client ID length + clientIDLen := uint16(len(clientID)) + req = append(req, byte(clientIDLen>>8), byte(clientIDLen)) + // Client ID + req = append(req, []byte(clientID)...) + + return req +} + +func buildRawMetadataV1Request(topics []string) []byte { + clientID := "test-client" + + // Calculate payload size: API key (2) + version (2) + correlation ID (4) + client ID length (2) + client ID + topics array + payloadSize := 2 + 2 + 4 + 2 + len(clientID) + 4 // Base size + topics array length + for _, topic := range topics { + payloadSize += 2 + len(topic) // topic length (2) + topic name + } + + req := make([]byte, 4) // Start with message size + + // Message size + binary.BigEndian.PutUint32(req[0:4], uint32(payloadSize)) + + // API key (Metadata = 3) + req = append(req, 0, 3) + // Version + req = append(req, 0, 1) + // Correlation ID + req = append(req, 0, 0, 0, 2) + // Client ID length + clientIDLen := uint16(len(clientID)) + req = append(req, byte(clientIDLen>>8), byte(clientIDLen)) + // Client ID + req = append(req, []byte(clientID)...) + + // Topics array + topicsLen := uint32(len(topics)) + req = append(req, byte(topicsLen>>24), byte(topicsLen>>16), byte(topicsLen>>8), byte(topicsLen)) + for _, topic := range topics { + topicLen := uint16(len(topic)) + req = append(req, byte(topicLen>>8), byte(topicLen)) + req = append(req, []byte(topic)...) + } + + return req +} + +func readRawResponse(conn net.Conn, t *testing.T) ([]byte, error) { + // Read response size first + sizeBuf := make([]byte, 4) + if _, err := io.ReadFull(conn, sizeBuf); err != nil { + return nil, fmt.Errorf("failed to read response size: %v", err) + } + + size := binary.BigEndian.Uint32(sizeBuf) + t.Logf("Response size header: %d bytes", size) + + // Read response data + data := make([]byte, size) + if _, err := io.ReadFull(conn, data); err != nil { + return nil, fmt.Errorf("failed to read response data: %v", err) + } + + return data, nil +} + +func analyzeMetadataResponse(data []byte, t *testing.T) { + t.Logf("=== Analyzing Metadata Response ===") + + if len(data) < 4 { + t.Errorf("Response too short: %d bytes", len(data)) + return + } + + offset := 0 + + // Read correlation ID + correlationID := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + t.Logf("Correlation ID: %d", correlationID) + + // Read brokers count + if offset+4 > len(data) { + t.Errorf("Not enough data for brokers count at offset %d", offset) + return + } + brokersCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + t.Logf("Brokers count: %d", brokersCount) + + // Read each broker + for i := 0; i < int(brokersCount); i++ { + t.Logf("Reading broker %d at offset %d", i, offset) + + // Node ID + if offset+4 > len(data) { + t.Errorf("Not enough data for broker %d node ID", i) + return + } + nodeID := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + + // Host + if offset+2 > len(data) { + t.Errorf("Not enough data for broker %d host length", i) + return + } + hostLen := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + if offset+int(hostLen) > len(data) { + t.Errorf("Not enough data for broker %d host", i) + return + } + host := string(data[offset : offset+int(hostLen)]) + offset += int(hostLen) + + // Port + if offset+4 > len(data) { + t.Errorf("Not enough data for broker %d port", i) + return + } + port := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + + // Rack (v1 addition) + if offset+2 > len(data) { + t.Errorf("Not enough data for broker %d rack length", i) + return + } + rackLen := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + rack := "" + if rackLen > 0 { + if offset+int(rackLen) > len(data) { + t.Errorf("Not enough data for broker %d rack", i) + return + } + rack = string(data[offset : offset+int(rackLen)]) + offset += int(rackLen) + } + + t.Logf("Broker %d: NodeID=%d, Host=%s, Port=%d, Rack=%s", i, nodeID, host, port, rack) + } + + // Controller ID (v1 addition) + if offset+4 > len(data) { + t.Errorf("Not enough data for controller ID at offset %d", offset) + return + } + controllerID := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + t.Logf("Controller ID: %d", controllerID) + + // Topics count + if offset+4 > len(data) { + t.Errorf("Not enough data for topics count at offset %d", offset) + return + } + topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 + t.Logf("Topics count: %d", topicsCount) + + // Analyze remaining bytes + remaining := len(data) - offset + t.Logf("Remaining bytes after topics count: %d", remaining) + t.Logf("Remaining data: %x", data[offset:]) + + if remaining == 0 { + t.Errorf("ERROR: No data remaining for topics! This might be the issue.") + return + } + + // Try to read first topic + if topicsCount > 0 { + t.Logf("Reading first topic at offset %d", offset) + + // Error code + if offset+2 > len(data) { + t.Errorf("Not enough data for topic error code") + return + } + errorCode := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + // Topic name + if offset+2 > len(data) { + t.Errorf("Not enough data for topic name length") + return + } + nameLen := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 + + if offset+int(nameLen) > len(data) { + t.Errorf("Not enough data for topic name") + return + } + name := string(data[offset : offset+int(nameLen)]) + offset += int(nameLen) + + t.Logf("Topic: ErrorCode=%d, Name=%s", errorCode, name) + + // Check remaining structure... + remaining = len(data) - offset + t.Logf("Remaining bytes after first topic name: %d", remaining) + } +} diff --git a/test/kafka/parsing_debug_test.go b/test/kafka/parsing_debug_test.go new file mode 100644 index 000000000..b8686d632 --- /dev/null +++ b/test/kafka/parsing_debug_test.go @@ -0,0 +1,450 @@ +package kafka + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestParsingDebug attempts to manually replicate kafka-go's parsing logic +func TestParsingDebug(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go gatewayServer.Start() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("parsing-topic", 1) + + // Get the actual response from our gateway + response := captureMetadataResponse(addr, t) + if response == nil { + return + } + + // Manually parse using kafka-go's logic + t.Logf("=== Manual Parsing Simulation ===") + simulateKafkaGoParsingV1(response, t) +} + +func captureMetadataResponse(addr string, t *testing.T) []byte { + // Create raw TCP connection and get response + conn, err := net.Dial("tcp", addr) + if err != nil { + t.Errorf("Failed to dial: %v", err) + return nil + } + defer conn.Close() + + // Send ApiVersions first + apiVersionsReq := buildSimpleApiVersionsRequest() + if _, err := conn.Write(apiVersionsReq); err != nil { + t.Errorf("Failed to send ApiVersions: %v", err) + return nil + } + + // Read ApiVersions response + if _, err := readSimpleResponse(conn); err != nil { + t.Errorf("Failed to read ApiVersions response: %v", err) + return nil + } + + // Send Metadata v1 request + metadataReq := buildSimpleMetadataV1Request([]string{"parsing-topic"}) + if _, err := conn.Write(metadataReq); err != nil { + t.Errorf("Failed to send Metadata: %v", err) + return nil + } + + // Read Metadata response + response, err := readSimpleResponse(conn) + if err != nil { + t.Errorf("Failed to read Metadata response: %v", err) + return nil + } + + t.Logf("Captured Metadata response (%d bytes): %x", len(response), response) + return response +} + +func buildSimpleApiVersionsRequest() []byte { + clientID := "parser" + payloadSize := 2 + 2 + 4 + 2 + len(clientID) + + req := make([]byte, 4) + binary.BigEndian.PutUint32(req[0:4], uint32(payloadSize)) + req = append(req, 0, 18) // ApiVersions + req = append(req, 0, 0) // version 0 + req = append(req, 0, 0, 0, 1) // correlation ID + req = append(req, 0, byte(len(clientID))) // client ID length + req = append(req, []byte(clientID)...) + return req +} + +func buildSimpleMetadataV1Request(topics []string) []byte { + clientID := "parser" + payloadSize := 2 + 2 + 4 + 2 + len(clientID) + 4 + for _, topic := range topics { + payloadSize += 2 + len(topic) + } + + req := make([]byte, 4) + binary.BigEndian.PutUint32(req[0:4], uint32(payloadSize)) + req = append(req, 0, 3) // Metadata + req = append(req, 0, 1) // version 1 + req = append(req, 0, 0, 0, 2) // correlation ID + req = append(req, 0, byte(len(clientID))) // client ID length + req = append(req, []byte(clientID)...) + + // Topics array + topicsLen := uint32(len(topics)) + req = append(req, byte(topicsLen>>24), byte(topicsLen>>16), byte(topicsLen>>8), byte(topicsLen)) + for _, topic := range topics { + topicLen := uint16(len(topic)) + req = append(req, byte(topicLen>>8), byte(topicLen)) + req = append(req, []byte(topic)...) + } + + return req +} + +func readSimpleResponse(conn net.Conn) ([]byte, error) { + // Read response size + sizeBuf := make([]byte, 4) + if _, err := io.ReadFull(conn, sizeBuf); err != nil { + return nil, err + } + + size := binary.BigEndian.Uint32(sizeBuf) + + // Read response data + data := make([]byte, size) + if _, err := io.ReadFull(conn, data); err != nil { + return nil, err + } + + return data, nil +} + +// simulateKafkaGoParsingV1 manually replicates kafka-go's parsing logic +func simulateKafkaGoParsingV1(data []byte, t *testing.T) { + reader := bufio.NewReader(bytes.NewReader(data)) + totalSize := len(data) + remainingSize := totalSize + + t.Logf("Starting parse of %d bytes", totalSize) + + // Simulate kafka-go's metadataResponseV1 struct parsing + // type metadataResponseV1 struct { + // Brokers []brokerMetadataV1 + // ControllerID int32 + // Topics []topicMetadataV1 + // } + + // Parse correlation ID (this is handled before the struct parsing) + correlationID, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read correlation ID: %v", err) + return + } + t.Logf("Correlation ID: %d, remaining: %d", correlationID, remainingSize) + + // Parse Brokers array + brokersCount, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read brokers count: %v", err) + return + } + t.Logf("Brokers count: %d, remaining: %d", brokersCount, remainingSize) + + // Parse each broker (brokerMetadataV1) + for i := 0; i < int(brokersCount); i++ { + t.Logf("Parsing broker %d at remaining: %d", i, remainingSize) + + // NodeID (int32) + nodeID, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read broker %d nodeID: %v", i, err) + return + } + + // Host (string) + host, err := readStringFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read broker %d host: %v", i, err) + return + } + + // Port (int32) + port, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read broker %d port: %v", i, err) + return + } + + // Rack (string) - v1 addition + rack, err := readStringFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read broker %d rack: %v", i, err) + return + } + + t.Logf("Broker %d: NodeID=%d, Host=%s, Port=%d, Rack=%s, remaining: %d", + i, nodeID, host, port, rack, remainingSize) + } + + // Parse ControllerID (int32) + controllerID, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read controller ID: %v", err) + return + } + t.Logf("Controller ID: %d, remaining: %d", controllerID, remainingSize) + + // Parse Topics array + topicsCount, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read topics count: %v", err) + return + } + t.Logf("Topics count: %d, remaining: %d", topicsCount, remainingSize) + + // Parse each topic (topicMetadataV1) + for i := 0; i < int(topicsCount); i++ { + t.Logf("Parsing topic %d at remaining: %d", i, remainingSize) + + // TopicErrorCode (int16) + errorCode, err := readInt16FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read topic %d error code: %v", i, err) + return + } + + // TopicName (string) + name, err := readStringFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read topic %d name: %v", i, err) + return + } + + // Internal (bool) - v1 addition + internal, err := readBoolFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read topic %d internal: %v", i, err) + return + } + + t.Logf("Topic %d: ErrorCode=%d, Name=%s, Internal=%v, remaining: %d", + i, errorCode, name, internal, remainingSize) + + // Parse Partitions array + partitionsCount, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read topic %d partitions count: %v", i, err) + return + } + t.Logf("Topic %d partitions count: %d, remaining: %d", i, partitionsCount, remainingSize) + + // Parse each partition (partitionMetadataV1) + for j := 0; j < int(partitionsCount); j++ { + t.Logf("Parsing partition %d at remaining: %d", j, remainingSize) + + // PartitionErrorCode (int16) + partErrorCode, err := readInt16FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read partition %d error code: %v", j, err) + return + } + + // PartitionID (int32) + partitionID, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read partition %d ID: %v", j, err) + return + } + + // Leader (int32) + leader, err := readInt32FromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read partition %d leader: %v", j, err) + return + } + + // Replicas ([]int32) + replicas, err := readInt32ArrayFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read partition %d replicas: %v", j, err) + return + } + + // Isr ([]int32) + isr, err := readInt32ArrayFromReader(reader, &remainingSize, t) + if err != nil { + t.Errorf("Failed to read partition %d ISR: %v", j, err) + return + } + + t.Logf("Partition %d: ErrorCode=%d, ID=%d, Leader=%d, Replicas=%v, ISR=%v, remaining: %d", + j, partErrorCode, partitionID, leader, replicas, isr, remainingSize) + } + } + + t.Logf("=== PARSING COMPLETE ===") + t.Logf("Final remaining bytes: %d", remainingSize) + + if remainingSize == 0 { + t.Logf("✅ SUCCESS: All bytes consumed correctly!") + } else { + t.Errorf("❌ FAILURE: %d bytes left unread - this is the expectZeroSize issue!", remainingSize) + + // Show the remaining bytes + remaining := make([]byte, remainingSize) + if n, err := reader.Read(remaining); err == nil { + t.Logf("Remaining bytes: %x", remaining[:n]) + } + } +} + +// Helper functions to simulate kafka-go's reading logic +func readInt32FromReader(reader *bufio.Reader, remainingSize *int, t *testing.T) (int32, error) { + if *remainingSize < 4 { + return 0, fmt.Errorf("not enough bytes for int32: need 4, have %d", *remainingSize) + } + + bytes, err := reader.Peek(4) + if err != nil { + return 0, err + } + + value := int32(binary.BigEndian.Uint32(bytes)) + + n, err := reader.Discard(4) + if err != nil { + return 0, err + } + + *remainingSize -= n + return value, nil +} + +func readInt16FromReader(reader *bufio.Reader, remainingSize *int, t *testing.T) (int16, error) { + if *remainingSize < 2 { + return 0, fmt.Errorf("not enough bytes for int16: need 2, have %d", *remainingSize) + } + + bytes, err := reader.Peek(2) + if err != nil { + return 0, err + } + + value := int16(binary.BigEndian.Uint16(bytes)) + + n, err := reader.Discard(2) + if err != nil { + return 0, err + } + + *remainingSize -= n + return value, nil +} + +func readStringFromReader(reader *bufio.Reader, remainingSize *int, t *testing.T) (string, error) { + // Read length first (int16) + if *remainingSize < 2 { + return "", fmt.Errorf("not enough bytes for string length: need 2, have %d", *remainingSize) + } + + lengthBytes, err := reader.Peek(2) + if err != nil { + return "", err + } + + length := int(binary.BigEndian.Uint16(lengthBytes)) + + // Discard length bytes + n, err := reader.Discard(2) + if err != nil { + return "", err + } + *remainingSize -= n + + // Read string data + if *remainingSize < length { + return "", fmt.Errorf("not enough bytes for string data: need %d, have %d", length, *remainingSize) + } + + if length == 0 { + return "", nil + } + + stringBytes := make([]byte, length) + n, err = reader.Read(stringBytes) + if err != nil { + return "", err + } + + *remainingSize -= n + return string(stringBytes), nil +} + +func readBoolFromReader(reader *bufio.Reader, remainingSize *int, t *testing.T) (bool, error) { + if *remainingSize < 1 { + return false, fmt.Errorf("not enough bytes for bool: need 1, have %d", *remainingSize) + } + + bytes, err := reader.Peek(1) + if err != nil { + return false, err + } + + value := bytes[0] != 0 + + n, err := reader.Discard(1) + if err != nil { + return false, err + } + + *remainingSize -= n + return value, nil +} + +func readInt32ArrayFromReader(reader *bufio.Reader, remainingSize *int, t *testing.T) ([]int32, error) { + // Read array length first (int32) + length, err := readInt32FromReader(reader, remainingSize, t) + if err != nil { + return nil, fmt.Errorf("failed to read array length: %v", err) + } + + if length < 0 { + return nil, nil // Null array + } + + result := make([]int32, length) + for i := 0; i < int(length); i++ { + value, err := readInt32FromReader(reader, remainingSize, t) + if err != nil { + return nil, fmt.Errorf("failed to read array element %d: %v", i, err) + } + result[i] = value + } + + return result, nil +} diff --git a/test/kafka/sarama_test.go b/test/kafka/sarama_test.go new file mode 100644 index 000000000..924ef55fd --- /dev/null +++ b/test/kafka/sarama_test.go @@ -0,0 +1,324 @@ +package kafka + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestSaramaCompatibility tests our Kafka gateway with IBM Sarama client +func TestSaramaCompatibility(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go gatewayServer.Start() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("sarama-test-topic", 1) + t.Logf("Added topic: sarama-test-topic") + + // Test 1: Basic Sarama client connection and metadata + t.Logf("=== Test 1: Sarama Metadata Request ===") + testSaramaMetadata(addr, t) + + // Test 2: Sarama producer + t.Logf("=== Test 2: Sarama Producer ===") + testSaramaProducer(addr, t) + + // Test 3: Sarama consumer + t.Logf("=== Test 3: Sarama Consumer ===") + testSaramaConsumer(addr, t) + + // Test 4: Sarama consumer group + t.Logf("=== Test 4: Sarama Consumer Group ===") + testSaramaConsumerGroup(addr, t) +} + +func testSaramaMetadata(addr string, t *testing.T) { + // Create Sarama config + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 // Use a well-supported version + config.ClientID = "sarama-test-client" + + // Create client + client, err := sarama.NewClient([]string{addr}, config) + if err != nil { + t.Errorf("Failed to create Sarama client: %v", err) + return + } + defer client.Close() + + t.Logf("Sarama client created successfully") + + // Test metadata request + topics, err := client.Topics() + if err != nil { + t.Errorf("Failed to get topics: %v", err) + return + } + + t.Logf("Topics from Sarama: %v", topics) + + // Test partition metadata + partitions, err := client.Partitions("sarama-test-topic") + if err != nil { + t.Errorf("Failed to get partitions: %v", err) + return + } + + t.Logf("Partitions for sarama-test-topic: %v", partitions) + + // Test broker metadata + brokers := client.Brokers() + t.Logf("Brokers from Sarama: %d brokers", len(brokers)) + for i, broker := range brokers { + t.Logf("Broker %d: ID=%d, Addr=%s", i, broker.ID(), broker.Addr()) + } + + t.Logf("✅ Sarama metadata test passed!") +} + +func testSaramaProducer(addr string, t *testing.T) { + // Create Sarama config for producer + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.ClientID = "sarama-producer" + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = 3 + config.Producer.Return.Successes = true + + // Create producer + producer, err := sarama.NewSyncProducer([]string{addr}, config) + if err != nil { + t.Errorf("Failed to create Sarama producer: %v", err) + return + } + defer producer.Close() + + t.Logf("Sarama producer created successfully") + + // Send a test message + message := &sarama.ProducerMessage{ + Topic: "sarama-test-topic", + Key: sarama.StringEncoder("test-key"), + Value: sarama.StringEncoder("Hello from Sarama!"), + } + + partition, offset, err := producer.SendMessage(message) + if err != nil { + t.Errorf("Failed to send message: %v", err) + return + } + + t.Logf("✅ Message sent successfully! Partition: %d, Offset: %d", partition, offset) +} + +func testSaramaConsumer(addr string, t *testing.T) { + // Create Sarama config for consumer + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.ClientID = "sarama-consumer" + config.Consumer.Return.Errors = true + + // Create consumer + consumer, err := sarama.NewConsumer([]string{addr}, config) + if err != nil { + t.Errorf("Failed to create Sarama consumer: %v", err) + return + } + defer consumer.Close() + + t.Logf("Sarama consumer created successfully") + + // Create partition consumer + partitionConsumer, err := consumer.ConsumePartition("sarama-test-topic", 0, sarama.OffsetOldest) + if err != nil { + t.Errorf("Failed to create partition consumer: %v", err) + return + } + defer partitionConsumer.Close() + + t.Logf("Partition consumer created successfully") + + // Try to consume a message with timeout + select { + case message := <-partitionConsumer.Messages(): + t.Logf("✅ Consumed message: Key=%s, Value=%s, Offset=%d", + string(message.Key), string(message.Value), message.Offset) + case err := <-partitionConsumer.Errors(): + t.Errorf("Consumer error: %v", err) + case <-time.After(5 * time.Second): + t.Logf("⚠️ No messages received within timeout (this might be expected if no messages were produced)") + } +} + +func testSaramaConsumerGroup(addr string, t *testing.T) { + // Create Sarama config for consumer group + config := sarama.NewConfig() + config.Version = sarama.V2_6_0_0 + config.ClientID = "sarama-consumer-group" + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup([]string{addr}, "sarama-test-group", config) + if err != nil { + t.Errorf("Failed to create Sarama consumer group: %v", err) + return + } + defer consumerGroup.Close() + + t.Logf("Sarama consumer group created successfully") + + // Create a consumer group handler + handler := &SaramaConsumerGroupHandler{t: t} + + // Start consuming with timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Start the consumer group in a goroutine + go func() { + for { + // Check if context was cancelled + if ctx.Err() != nil { + return + } + + // Consume should be called inside an infinite loop + if err := consumerGroup.Consume(ctx, []string{"sarama-test-topic"}, handler); err != nil { + t.Logf("Consumer group error: %v", err) + return + } + } + }() + + // Wait for the context to be cancelled or for messages + select { + case <-ctx.Done(): + t.Logf("Consumer group test completed") + case <-time.After(10 * time.Second): + t.Logf("Consumer group test timed out") + } + + if handler.messageReceived { + t.Logf("✅ Consumer group test passed!") + } else { + t.Logf("⚠️ No messages received in consumer group (this might be expected)") + } +} + +// SaramaConsumerGroupHandler implements sarama.ConsumerGroupHandler +type SaramaConsumerGroupHandler struct { + t *testing.T + messageReceived bool +} + +func (h *SaramaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group session setup") + return nil +} + +func (h *SaramaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group session cleanup") + return nil +} + +func (h *SaramaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + h.t.Logf("Consumer group claim started for topic: %s, partition: %d", claim.Topic(), claim.Partition()) + + for { + select { + case message := <-claim.Messages(): + if message == nil { + return nil + } + h.t.Logf("✅ Consumer group received message: Key=%s, Value=%s, Offset=%d", + string(message.Key), string(message.Value), message.Offset) + h.messageReceived = true + session.MarkMessage(message, "") + + case <-session.Context().Done(): + h.t.Logf("Consumer group session context cancelled") + return nil + } + } +} + +// TestSaramaMetadataOnly tests just the metadata functionality that's failing with kafka-go +func TestSaramaMetadataOnly(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go gatewayServer.Start() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", addr) + + // Add test topic + handler := gatewayServer.GetHandler() + handler.AddTopicForTesting("metadata-only-topic", 1) + + // Test with different Sarama versions to see if any fail like kafka-go + versions := []sarama.KafkaVersion{ + sarama.V2_0_0_0, + sarama.V2_1_0_0, + sarama.V2_6_0_0, + sarama.V3_0_0_0, + } + + for _, version := range versions { + t.Logf("=== Testing Sarama with Kafka version %s ===", version.String()) + + config := sarama.NewConfig() + config.Version = version + config.ClientID = fmt.Sprintf("sarama-test-%s", version.String()) + + client, err := sarama.NewClient([]string{addr}, config) + if err != nil { + t.Errorf("Failed to create Sarama client for version %s: %v", version.String(), err) + continue + } + + // Test the same operation that fails with kafka-go: getting topic metadata + topics, err := client.Topics() + if err != nil { + t.Errorf("❌ Sarama %s failed to get topics: %v", version.String(), err) + } else { + t.Logf("✅ Sarama %s successfully got topics: %v", version.String(), topics) + } + + // Test partition metadata (this is similar to kafka-go's ReadPartitions) + partitions, err := client.Partitions("metadata-only-topic") + if err != nil { + t.Errorf("❌ Sarama %s failed to get partitions: %v", version.String(), err) + } else { + t.Logf("✅ Sarama %s successfully got partitions: %v", version.String(), partitions) + } + + client.Close() + } +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index a80de9a66..b27e5d107 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -313,10 +313,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - // Force kafka-go to use v0 to avoid readPartitions parsing issues + // kafka-go negotiates v1,v6 - try v6 since our v6 works with Sarama response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 0) // max version 0 + response = append(response, 0, 6) // max version 6 // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -487,24 +487,8 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] } func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { - // Precise Metadata v1 implementation based on kafka-go's metadataResponseV1 struct: - // type metadataResponseV1 struct { - // Brokers []metadataBrokerV1 `kafka:"min=v0,max=v8"` - // ControllerID int32 `kafka:"min=v1,max=v8"` - // Topics []metadataTopicV1 `kafka:"min=v0,max=v8"` - // } - // type metadataBrokerV1 struct { - // NodeID int32 `kafka:"min=v0,max=v8"` - // Host string `kafka:"min=v0,max=v8"` - // Port int32 `kafka:"min=v0,max=v8"` - // Rack string `kafka:"min=v1,max=v8"` // NOTE: Non-nullable string in v1 - // } - // type metadataTopicV1 struct { - // ErrorCode int16 `kafka:"min=v0,max=v8"` - // Name string `kafka:"min=v0,max=v8"` - // IsInternal bool `kafka:"min=v1,max=v8"` - // Partitions []metadataPartitionV1 `kafka:"min=v0,max=v8"` - // } + // Simplified Metadata v1 implementation - based on working v0 + v1 additions + // v1 adds: ControllerID (after brokers), Rack (for brokers), IsInternal (for topics) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) @@ -527,67 +511,78 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] } h.topicsMu.RUnlock() - var buf bytes.Buffer + // Build response using same approach as v0 but with v1 additions + response := make([]byte, 0, 256) // Correlation ID (4 bytes) - binary.Write(&buf, binary.BigEndian, correlationID) + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) - // Brokers array (4 bytes length + brokers) - binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker + // Brokers array length (4 bytes) - 1 broker (this gateway) + response = append(response, 0, 0, 0, 1) - // Broker 0 - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID + // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) + response = append(response, 0, 0, 0, 1) // node_id = 1 - // Host (STRING: 2 bytes length + data) + // Use dynamic broker address set by the server host := h.brokerHost - binary.Write(&buf, binary.BigEndian, int16(len(host))) - buf.WriteString(host) + port := h.brokerPort + fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", host, port) + + // Host (STRING: 2 bytes length + bytes) + hostLen := uint16(len(host)) + response = append(response, byte(hostLen>>8), byte(hostLen)) + response = append(response, []byte(host)...) // Port (4 bytes) - binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(port)) + response = append(response, portBytes...) - // Rack (STRING: 2 bytes length + data) - v1 addition, non-nullable - binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + // Rack (STRING: 2 bytes length + bytes) - v1 addition, non-nullable empty string + response = append(response, 0, 0) // empty string - // ControllerID (4 bytes) - v1 addition (comes after ALL brokers) - binary.Write(&buf, binary.BigEndian, int32(1)) + // ControllerID (4 bytes) - v1 addition + response = append(response, 0, 0, 0, 1) // controller_id = 1 - // Topics array (4 bytes length + topics) - binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + // Topics array length (4 bytes) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) + response = append(response, topicsCountBytes...) + // Topics for _, topicName := range topicsToReturn { - // ErrorCode (2 bytes) - binary.Write(&buf, binary.BigEndian, int16(0)) + // error_code (2 bytes) + response = append(response, 0, 0) - // Name (STRING: 2 bytes length + data) - binary.Write(&buf, binary.BigEndian, int16(len(topicName))) - buf.WriteString(topicName) + // topic name (STRING: 2 bytes length + bytes) + topicLen := uint16(len(topicName)) + response = append(response, byte(topicLen>>8), byte(topicLen)) + response = append(response, []byte(topicName)...) - // IsInternal (1 byte) - v1 addition - buf.WriteByte(0) // false + // is_internal (1 byte) - v1 addition + response = append(response, 0) // false - // Partitions array (4 bytes length + partitions) - binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition + // partitions array length (4 bytes) - 1 partition + response = append(response, 0, 0, 0, 1) - // Partition 0 - binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode - binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + // partition 0: error_code(2) + partition_id(4) + leader_id(4) + replicas(ARRAY) + isr(ARRAY) + response = append(response, 0, 0) // error_code + response = append(response, 0, 0, 0, 0) // partition_id = 0 + response = append(response, 0, 0, 0, 1) // leader_id = 1 - // ReplicaNodes array (4 bytes length + nodes) - binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + // replicas: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) - // IsrNodes array (4 bytes length + nodes) - binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + // isr: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) } - response := buf.Bytes() - fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) - fmt.Printf("DEBUG: Metadata v1 response hex dump (%d bytes): %x\n", len(response), response) - + fmt.Printf("DEBUG: Metadata v1 response size: %d bytes\n", len(response)) return response, nil }