Browse Source

fix Node ID Mismatch, and clean up log messages

pull/7329/head
chrislu 5 days ago
parent
commit
017e7d32cf
  1. 1
      test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
  2. 19
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
  3. 6
      weed/mq/kafka/protocol/consumer_coordination.go
  4. 2
      weed/mq/kafka/protocol/consumer_group_metadata.go
  5. 8
      weed/mq/kafka/protocol/fetch.go
  6. 4
      weed/mq/kafka/protocol/fetch_partition_reader.go
  7. 8
      weed/mq/kafka/protocol/find_coordinator.go
  8. 127
      weed/mq/kafka/protocol/handler.go
  9. 61
      weed/mq/kafka/protocol/joingroup.go
  10. 11
      weed/mq/kafka/protocol/produce.go

1
test/kafka/kafka-client-loadtest/cmd/loadtest/main.go

@ -253,6 +253,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
time.Sleep(2 * time.Second)
// Start consumers
// NOTE: With unique ClientIDs, all consumers can start simultaneously without connection storms
for i := 0; i < cfg.Consumers.Count; i++ {
wg.Add(1)
go func(id int) {

19
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
@ -106,6 +107,9 @@ func New(cfg *config.Config, collector *metrics.Collector, id int, recordTracker
func (c *Consumer) initSaramaConsumer() error {
config := sarama.NewConfig()
// Enable Sarama debug logging to diagnose connection issues
sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[Sarama Consumer %d] ", c.id), log.LstdFlags)
// Consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
@ -135,9 +139,24 @@ func (c *Consumer) initSaramaConsumer() error {
// This allows Sarama to fetch from multiple partitions in parallel
config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
// Connection retry and timeout configuration
config.Net.DialTimeout = 30 * time.Second // Increase from default 30s
config.Net.ReadTimeout = 30 * time.Second // Increase from default 30s
config.Net.WriteTimeout = 30 * time.Second // Increase from default 30s
config.Metadata.Retry.Max = 5 // Retry metadata fetch up to 5 times
config.Metadata.Retry.Backoff = 500 * time.Millisecond
config.Metadata.Timeout = 30 * time.Second // Increase metadata timeout
// Version
config.Version = sarama.V2_8_0_0
// CRITICAL: Set unique ClientID to ensure each consumer gets a unique member ID
// Without this, all consumers from the same process get the same member ID and only 1 joins!
// Sarama uses ClientID as part of the member ID generation
// Use consumer ID directly - no timestamp needed since IDs are already unique per process
config.ClientID = fmt.Sprintf("loadtest-consumer-%d", c.id)
log.Printf("Consumer %d: Setting Sarama ClientID to: %s", c.id, config.ClientID)
// Create consumer group
consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
if err != nil {

6
weed/mq/kafka/protocol/consumer_coordination.go

@ -464,6 +464,9 @@ func (h *Handler) buildLeaveGroupFullResponse(response LeaveGroupResponse) []byt
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
// For LeaveGroup v1+, throttle_time_ms comes first (4 bytes)
result = append(result, 0, 0, 0, 0)
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
@ -500,9 +503,6 @@ func (h *Handler) buildLeaveGroupFullResponse(response LeaveGroupResponse) []byt
result = append(result, memberErrorBytes...)
}
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result
}

2
weed/mq/kafka/protocol/consumer_group_metadata.go

@ -26,7 +26,7 @@ type ConnectionContext struct {
ConsumerGroup string // Consumer group (set by JoinGroup)
MemberID string // Consumer group member ID (set by JoinGroup)
// Per-connection broker client for isolated gRPC streams
// CRITICAL: Each Kafka connection MUST have its own gRPC streams to avoid interference
// Each Kafka connection MUST have its own gRPC streams to avoid interference
// when multiple consumers or requests are active on different connections
BrokerClient interface{} // Will be set to *integration.BrokerClient

8
weed/mq/kafka/protocol/fetch.go

@ -166,7 +166,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}())
// Collect results from persistent readers
// CRITICAL: Dispatch all requests concurrently, then wait for all results in parallel
// Dispatch all requests concurrently, then wait for all results in parallel
// to avoid sequential timeout accumulation
type pendingFetch struct {
topicName string
@ -242,7 +242,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
// Phase 2: Wait for all results with adequate timeout for CI environments
// CRITICAL: We MUST return a result for every requested partition or Sarama will error
// We MUST return a result for every requested partition or Sarama will error
results := make([]*partitionFetchResult, len(pending))
// Use 95% of client's MaxWaitTime to ensure we return BEFORE client timeout
// This maximizes data collection time while leaving a safety buffer for:
@ -286,7 +286,7 @@ done:
// Now assemble the response in the correct order using fetched results
// ====================================================================
// CRITICAL: Verify we have results for all requested partitions
// Verify we have results for all requested partitions
// Sarama requires a response block for EVERY requested partition to avoid ErrIncompleteResponse
expectedResultCount := 0
for _, topic := range fetchRequest.Topics {
@ -1126,7 +1126,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return nil
}
// CRITICAL FIX: For system topics like _schemas, _consumer_offsets, etc.,
// For system topics like _schemas, _consumer_offsets, etc.,
// return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.)
// and should NOT be processed as RecordValue protobuf messages.
if strings.HasPrefix(topicName, "_") {

4
weed/mq/kafka/protocol/fetch_partition_reader.go

@ -90,7 +90,7 @@ func (pr *partitionReader) preFetchLoop(ctx context.Context) {
}
// handleRequests serves fetch requests SEQUENTIALLY to prevent subscriber storm
// CRITICAL: Sequential processing is essential for SMQ backend because:
// Sequential processing is essential for SMQ backend because:
// 1. GetStoredRecords may create a new subscriber on each call
// 2. Concurrent calls create multiple subscribers for the same partition
// 3. This overwhelms the broker and causes partition shutdowns
@ -138,7 +138,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
}
result.highWaterMark = hwm
// CRITICAL: If requested offset >= HWM, return immediately with empty result
// If requested offset >= HWM, return immediately with empty result
// This prevents overwhelming the broker with futile read attempts when no data is available
if req.requestedOffset >= hwm {
result.recordBatch = []byte{}

8
weed/mq/kafka/protocol/find_coordinator.go

@ -48,7 +48,6 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, apiVersion uint16,
func (h *Handler) handleFindCoordinatorV0(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse FindCoordinator v0 request: Key (STRING) only
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 50 {
dumpLen = 50
@ -84,7 +83,7 @@ func (h *Handler) handleFindCoordinatorV0(correlationID uint32, requestBody []by
return nil, fmt.Errorf("failed to find coordinator for group %s: %w", coordinatorKey, err)
}
// CRITICAL FIX: Return hostname instead of IP address for client connectivity
// Return hostname instead of IP address for client connectivity
// Clients need to connect to the same hostname they originally connected to
_ = coordinatorHost // originalHost
coordinatorHost = h.getClientConnectableHost(coordinatorHost)
@ -128,7 +127,6 @@ func (h *Handler) handleFindCoordinatorV0(correlationID uint32, requestBody []by
func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse FindCoordinator request (v0-2 non-flex): Key (STRING), v1+ adds KeyType (INT8)
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 50 {
dumpLen = 50
@ -167,7 +165,7 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by
return nil, fmt.Errorf("failed to find coordinator for group %s: %w", coordinatorKey, err)
}
// CRITICAL FIX: Return hostname instead of IP address for client connectivity
// Return hostname instead of IP address for client connectivity
// Clients need to connect to the same hostname they originally connected to
_ = coordinatorHost // originalHost
coordinatorHost = h.getClientConnectableHost(coordinatorHost)
@ -237,7 +235,7 @@ func (h *Handler) handleFindCoordinatorV3(correlationID uint32, requestBody []by
offset := 0
// CRITICAL FIX: The first byte is the tagged fields from the REQUEST HEADER that weren't consumed
// The first byte is the tagged fields from the REQUEST HEADER that weren't consumed
// Skip the tagged fields count (should be 0x00 for no tagged fields)
if len(requestBody) > 0 && requestBody[0] == 0x00 {
glog.V(4).Infof("FindCoordinator V3: Skipping header tagged fields byte (0x00)")

127
weed/mq/kafka/protocol/handler.go

@ -6,6 +6,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"net"
"os"
@ -52,6 +53,27 @@ func (h *Handler) GetAdvertisedAddress(gatewayAddr string) (string, int) {
return host, port
}
// generateNodeID generates a deterministic node ID from a gateway address.
// This must match the logic in gateway/coordinator_registry.go to ensure consistency
// between Metadata and FindCoordinator responses.
func generateNodeID(gatewayAddress string) int32 {
if gatewayAddress == "" {
return 1 // Default fallback
}
h := fnv.New32a()
_, _ = h.Write([]byte(gatewayAddress))
// Use only positive values and avoid 0
return int32(h.Sum32()&0x7fffffff) + 1
}
// GetNodeID returns the consistent node ID for this gateway.
// This is used by both Metadata and FindCoordinator handlers to ensure
// clients see the same broker/coordinator node ID across all APIs.
func (h *Handler) GetNodeID() int32 {
gatewayAddr := h.GetGatewayAddress()
return generateNodeID(gatewayAddr)
}
// TopicInfo holds basic information about a topic
type TopicInfo struct {
Name string
@ -485,7 +507,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// CRITICAL: Create per-connection BrokerClient for isolated gRPC streams
// Create per-connection BrokerClient for isolated gRPC streams
// This prevents different connections from interfering with each other's Fetch requests
// In mock/unit test mode, this may not be available, so we continue without it
var connBrokerClient *integration.BrokerClient
@ -533,7 +555,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
consecutiveTimeouts := 0
const maxConsecutiveTimeouts = 3 // Give up after 3 timeouts in a row
// CRITICAL: Separate control plane from data plane
// Separate control plane from data plane
// Control plane: Metadata, Heartbeat, JoinGroup, etc. (must be fast, never block)
// Data plane: Fetch, Produce (can be slow, may block on I/O)
//
@ -548,7 +570,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
var wg sync.WaitGroup
// Response writer - maintains request/response order per connection
// CRITICAL: While we process requests concurrently (control/data plane),
// While we process requests concurrently (control/data plane),
// we MUST track the order requests arrive and send responses in that same order.
// Solution: Track received correlation IDs in a queue, send responses in that queue order.
correlationQueue := make([]uint32, 0, 100)
@ -623,7 +645,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey)
// CRITICAL: Wrap request processing with panic recovery to prevent deadlocks
// Wrap request processing with panic recovery to prevent deadlocks
// If processRequestSync panics, we MUST still send a response to avoid blocking the response writer
var response []byte
var err error
@ -701,7 +723,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
glog.V(4).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey)
// CRITICAL: Wrap request processing with panic recovery to prevent deadlocks
// Wrap request processing with panic recovery to prevent deadlocks
// If processRequestSync panics, we MUST still send a response to avoid blocking the response writer
var response []byte
var err error
@ -768,7 +790,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}()
defer func() {
// CRITICAL: Close channels in correct order to avoid panics
// Close channels in correct order to avoid panics
// 1. Close input channels to stop accepting new requests
close(controlChan)
close(dataChan)
@ -828,9 +850,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return nil
}
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// CRITICAL FIX: Track consecutive timeouts to detect CLOSE_WAIT connections
// When remote peer closes, connection enters CLOSE_WAIT and reads keep timing out
// After several consecutive timeouts with no data, assume connection is dead
// Track consecutive timeouts to detect stale connections
consecutiveTimeouts++
if consecutiveTimeouts >= maxConsecutiveTimeouts {
return nil
@ -846,7 +866,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Successfully read the message size
size := binary.BigEndian.Uint32(sizeBytes[:])
// Debug("Read message size: %d bytes", size)
if size == 0 || size > 1024*1024 { // 1MB limit
// Use standardized error for message size limit
// Send error response for message too large
@ -876,8 +895,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
// Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
// Validate API version against what we support
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err)
@ -886,8 +903,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
if writeErr != nil {
return fmt.Errorf("build error response: %w", writeErr)
}
// CRITICAL: Send error response through response queue to maintain sequential ordering
// This prevents deadlocks in the response writer which expects all correlation IDs in sequence
// Send error response through response queue to maintain sequential ordering
select {
case responseChan <- &kafkaResponse{
correlationID: correlationID,
@ -903,8 +919,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
}
// CRITICAL DEBUG: Log that validation passed
glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing",
glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d",
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
// Extract request body - special handling for ApiVersions requests
@ -945,29 +960,25 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Parse header using flexible version utilities for other APIs
header, parsedRequestBody, parseErr := ParseRequestHeader(messageBuf)
if parseErr != nil {
// CRITICAL: Log the parsing error for debugging
glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d",
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf))
glog.Errorf("Request header parsing failed: API=%d (%s) v%d, correlation=%d, error=%v",
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr)
// Fall back to basic header parsing if flexible version parsing fails
// Basic header parsing fallback (original logic)
bodyOffset := 8
if len(messageBuf) < bodyOffset+2 {
glog.Errorf("FALLBACK PARSING FAILED: missing client_id length, msgLen=%d", len(messageBuf))
return fmt.Errorf("invalid header: missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2]))
bodyOffset += 2
if clientIDLen >= 0 {
if len(messageBuf) < bodyOffset+int(clientIDLen) {
glog.Errorf("FALLBACK PARSING FAILED: client_id truncated, clientIDLen=%d, msgLen=%d", clientIDLen, len(messageBuf))
return fmt.Errorf("invalid header: client_id truncated")
}
bodyOffset += int(clientIDLen)
}
requestBody = messageBuf[bodyOffset:]
glog.V(2).Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody))
} else {
// Use the successfully parsed request body
requestBody = parsedRequestBody
@ -995,7 +1006,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
}
// CRITICAL: Route request to appropriate processor
// Route request to appropriate processor
// Control plane: Fast, never blocks (Metadata, Heartbeat, etc.)
// Data plane: Can be slow (Fetch, Produce)
@ -1019,7 +1030,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
targetChan = controlChan
}
// CRITICAL: Only add to correlation queue AFTER successful channel send
// Only add to correlation queue AFTER successful channel send
// If we add before and the channel blocks, the correlation ID is in the queue
// but the request never gets processed, causing response writer deadlock
select {
@ -1032,7 +1043,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return ctx.Err()
case <-time.After(10 * time.Second):
// Channel full for too long - this shouldn't happen with proper backpressure
glog.Errorf("[%s] CRITICAL: Failed to queue correlation=%d after 10s timeout - channel full!", connectionID, correlationID)
glog.Errorf("[%s] Failed to queue correlation=%d - channel full (10s timeout)", connectionID, correlationID)
return fmt.Errorf("request queue full: correlation=%d", correlationID)
}
}
@ -1044,7 +1055,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
requestStart := time.Now()
apiName := getAPIName(APIKey(req.apiKey))
// Debug: Log API calls at verbose level 2 (disabled by default)
glog.V(4).Infof("[API] %s (key=%d, ver=%d, corr=%d)",
apiName, req.apiKey, req.apiVersion, req.correlationID)
@ -1169,7 +1179,7 @@ func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]
// Error code (2 bytes) - always fixed-length
response = append(response, 0, 0) // No error
// API Keys Array - CRITICAL FIX: Use correct encoding based on version
// API Keys Array - use correct encoding based on version
if apiVersion >= 3 {
// FLEXIBLE FORMAT: Compact array with varint length - THIS FIXES THE ADMINCLIENT BUG!
response = append(response, CompactArrayLength(uint32(len(SupportedApiKeys)))...)
@ -1219,11 +1229,16 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
// Get consistent node ID for this gateway
nodeID := h.GetNodeID()
nodeIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID))
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4)
response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions)
response = append(response, nodeIDBytes...) // Use consistent node ID
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
@ -1298,15 +1313,15 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID))
response = append(response, partitionIDBytes...)
response = append(response, 0, 0, 0, 1) // leader = 1 (this broker)
response = append(response, nodeIDBytes...) // leader = this broker
// replicas: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
// replicas: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
response = append(response, nodeIDBytes...)
// isr: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
// isr: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
response = append(response, nodeIDBytes...)
}
}
@ -1341,11 +1356,16 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// NOTE: Correlation ID is handled by writeResponseWithHeader
// Do NOT include it in the response body
// Get consistent node ID for this gateway
nodeID := h.GetNodeID()
nodeIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID))
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING)
response = append(response, 0, 0, 0, 1) // node_id = 1
response = append(response, nodeIDBytes...) // Use consistent node ID
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
@ -1370,7 +1390,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
response = append(response, 0, 0) // empty string
// ControllerID (4 bytes) - v1 addition
response = append(response, 0, 0, 0, 1) // controller_id = 1
response = append(response, nodeIDBytes...) // controller_id = this broker
// Topics array length (4 bytes)
topicsCountBytes := make([]byte, 4)
@ -1412,15 +1432,15 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID))
response = append(response, partitionIDBytes...)
response = append(response, 0, 0, 0, 1) // leader_id = 1
response = append(response, nodeIDBytes...) // leader_id = this broker
// replicas: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
// replicas: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
response = append(response, nodeIDBytes...)
// isr: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
// isr: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
response = append(response, nodeIDBytes...)
}
}
@ -1460,7 +1480,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
nodeID := int32(1) // Single gateway node
nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@ -1488,7 +1508,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@ -1571,7 +1591,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
nodeID := int32(1) // Single gateway node
nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@ -1599,7 +1619,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@ -1653,7 +1673,7 @@ func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) (
// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field (REGULAR FORMAT, NOT FLEXIBLE)
func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) {
// CRITICAL: Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format
// Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format
// Only v9+ uses compact format (flexible responses)
return h.handleMetadataV5ToV8(correlationID, requestBody, 7)
}
@ -1721,7 +1741,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
nodeID := int32(1) // Single gateway node
nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@ -1749,7 +1769,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
binary.Write(&buf, binary.BigEndian, int32(1))
binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@ -2011,7 +2031,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
actualTopicsCount++
}
// CRITICAL FIX: Update the topics count in the response header with the actual count
// Update the topics count in the response header with the actual count
// This prevents ErrIncompleteResponse when request parsing fails mid-way
if actualTopicsCount != topicsCount {
binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount)
@ -3045,7 +3065,7 @@ func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool {
case APIKeySyncGroup:
return apiVersion >= 4
case APIKeyApiVersions:
// CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields)
// AdminClient compatibility requires header version 0 (no tagged fields)
// Even though ApiVersions v3+ technically supports flexible responses, AdminClient
// expects the header to NOT include tagged fields. This is a known quirk.
return false // Always use non-flexible header for ApiVersions
@ -3281,12 +3301,6 @@ func (h *Handler) parseDescribeConfigsRequest(requestBody []byte, apiVersion uin
var resourcesLength uint32
if isFlexible {
// Debug: log the first 8 bytes of the request body
debugBytes := requestBody[offset:]
if len(debugBytes) > 8 {
debugBytes = debugBytes[:8]
}
// FIX: Skip top-level tagged fields for DescribeConfigs v4+ flexible protocol
// The request body starts with tagged fields count (usually 0x00 = empty)
_, consumed, err := DecodeTaggedFields(requestBody[offset:])
@ -3944,9 +3958,8 @@ func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int3
// createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema
// that can handle both Avro and JSON messages when schema management is enabled
func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error {
// CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management
// System topics like _schemas should be PLAIN Kafka topics without schema management
// Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself
// This was causing issues with Schema Registry bootstrap
glog.V(1).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
return h.seaweedMQHandler.CreateTopic(topicName, partitions)

61
weed/mq/kafka/protocol/joingroup.go

@ -83,6 +83,16 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
var isNewMember bool
var existingMember *consumer.GroupMember
// Use the actual ClientID from Kafka protocol header for unique member ID generation
clientKey := connContext.ClientID
if clientKey == "" {
// Fallback to deterministic key if ClientID not available
clientKey = fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
glog.Warningf("[JoinGroup] No ClientID in ConnectionContext for group %s, using fallback: %s", request.GroupID, clientKey)
} else {
glog.V(1).Infof("[JoinGroup] Using ClientID from ConnectionContext for group %s: %s", request.GroupID, clientKey)
}
// Check for static membership first
if request.GroupInstanceID != "" {
existingMember = h.groupCoordinator.FindStaticMemberLocked(group, request.GroupInstanceID)
@ -96,8 +106,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
}
} else {
// Dynamic membership logic
clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
if request.MemberID == "" {
// New member - check if we already have a member for this client
var existingMemberID string
@ -156,12 +164,9 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
groupInstanceID = &request.GroupInstanceID
}
// Use deterministic client identifier based on group + session timeout + protocol
clientKey := fmt.Sprintf("%s-%d-%s", request.GroupID, request.SessionTimeout, request.ProtocolType)
member := &consumer.GroupMember{
ID: memberID,
ClientID: clientKey, // Use deterministic client key for member identification
ClientID: clientKey, // Use actual Kafka ClientID for unique member identification
ClientHost: clientHost, // Now extracted from actual connection
GroupInstanceID: groupInstanceID,
SessionTimeout: request.SessionTimeout,
@ -267,8 +272,6 @@ func (h *Handler) handleJoinGroup(connContext *ConnectionContext, correlationID
Version: apiVersion,
}
// Debug logging for JoinGroup response
// If this member is the leader, include all member info for assignment
if memberID == group.Leader {
response.Members = make([]JoinGroupMember, 0, len(group.Members))
@ -311,7 +314,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr
var groupID string
if isFlexible {
// Flexible protocol uses compact strings
endIdx := offset + 20 // Show more bytes for debugging
endIdx := offset + 20
if endIdx > len(data) {
endIdx = len(data)
}
@ -572,8 +575,6 @@ func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGr
}
func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
// Debug logging for JoinGroup response
// Flexible response for v6+
if IsFlexibleVersion(11, response.Version) {
out := make([]byte, 0, 256)
@ -774,7 +775,6 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
// extractSubscriptionFromProtocolsEnhanced uses improved metadata parsing with better error handling
func (h *Handler) extractSubscriptionFromProtocolsEnhanced(protocols []GroupProtocol) []string {
// Analyze protocol metadata for debugging
debugInfo := AnalyzeProtocolMetadata(protocols)
for _, info := range debugInfo {
if info.ParsedOK {
@ -865,6 +865,8 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
// Check if this is the group leader with assignments
if request.MemberID == group.Leader && len(request.GroupAssignments) > 0 {
// Leader is providing assignments - process and store them
glog.V(2).Infof("SyncGroup: Leader %s providing client-side assignments for group %s (%d members)",
request.MemberID, request.GroupID, len(request.GroupAssignments))
err = h.processGroupAssignments(group, request.GroupAssignments)
if err != nil {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInconsistentGroupProtocol, apiVersion), nil
@ -880,8 +882,11 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
} else if group.State == consumer.GroupStateCompletingRebalance {
// Non-leader member waiting for assignments
// Assignments should already be processed by leader
glog.V(2).Infof("SyncGroup: Non-leader %s waiting for assignments in group %s",
request.MemberID, request.GroupID)
} else {
// Trigger partition assignment using built-in strategy
glog.V(2).Infof("SyncGroup: Using server-side assignment for group %s", request.GroupID)
topicPartitions := h.getTopicPartitions(group)
group.AssignPartitions(topicPartitions)
@ -902,6 +907,10 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
assignment = h.serializeMemberAssignment(member.Assignment)
}
// Log member assignment details
glog.V(2).Infof("SyncGroup: Member %s in group %s assigned %d partitions: %v",
request.MemberID, request.GroupID, len(member.Assignment), member.Assignment)
// Build response
response := SyncGroupResponse{
CorrelationID: correlationID,
@ -909,7 +918,6 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
Assignment: assignment,
}
// Log assignment details for debugging
assignmentPreview := assignment
if len(assignmentPreview) > 100 {
assignmentPreview = assignment[:100]
@ -1093,7 +1101,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGr
offset += int(assignLength)
}
// CRITICAL FIX: Flexible format requires tagged fields after each assignment struct
// Flexible format requires tagged fields after each assignment struct
if offset < len(data) {
_, taggedConsumed, tagErr := DecodeTaggedFields(data[offset:])
if tagErr == nil {
@ -1172,7 +1180,7 @@ func (h *Handler) buildSyncGroupResponse(response SyncGroupResponse, apiVersion
// Assignment - FLEXIBLE V4+ FIX
if IsFlexibleVersion(14, apiVersion) {
// FLEXIBLE FORMAT: Assignment as compact bytes
// CRITICAL FIX: Use CompactStringLength for compact bytes (not CompactArrayLength)
// Use CompactStringLength for compact bytes (not CompactArrayLength)
// Compact bytes use the same encoding as compact strings: 0 = null, 1 = empty, n+1 = length n
assignmentLen := len(response.Assignment)
if assignmentLen == 0 {
@ -1305,16 +1313,19 @@ func (h *Handler) getTopicPartitions(group *consumer.ConsumerGroup) map[string][
// Get partition info for all subscribed topics
for topic := range group.SubscribedTopics {
// Check if topic exists using SeaweedMQ handler
if h.seaweedMQHandler.TopicExists(topic) {
// For now, assume 1 partition per topic (can be extended later)
// In a real implementation, this would query SeaweedMQ for actual partition count
partitions := []int32{0}
topicPartitions[topic] = partitions
} else {
// Default to single partition if topic not found
topicPartitions[topic] = []int32{0}
// Get actual partition count from topic info
topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topic)
partitionCount := h.GetDefaultPartitions() // Use configurable default
if exists && topicInfo != nil {
partitionCount = topicInfo.Partitions
}
// Create partition list: [0, 1, 2, ...]
partitions := make([]int32, partitionCount)
for i := int32(0); i < partitionCount; i++ {
partitions[i] = i
}
topicPartitions[topic] = partitions
}
return topicPartitions
@ -1324,7 +1335,7 @@ func (h *Handler) serializeSchemaRegistryAssignment(group *consumer.ConsumerGrou
// Schema Registry expects a JSON assignment in the format:
// {"error":0,"master":"member-id","master_identity":{"host":"localhost","port":8081,"master_eligibility":true,"scheme":"http","version":"7.4.0-ce"}}
// CRITICAL FIX: Extract the actual leader's identity from the leader's metadata
// Extract the actual leader's identity from the leader's metadata
// to avoid localhost/hostname mismatch that causes Schema Registry to forward
// requests to itself
leaderMember, exists := group.Members[group.Leader]

11
weed/mq/kafka/protocol/produce.go

@ -94,7 +94,6 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a
// Check if topic exists, auto-create if it doesn't (simulates auto.create.topics.enable=true)
topicExists := h.seaweedMQHandler.TopicExists(topicName)
// Debug: show all existing topics
_ = h.seaweedMQHandler.ListTopics() // existingTopics
if !topicExists {
// Use schema-aware topic creation for auto-created topics with configurable default partitions
@ -102,7 +101,7 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a
if err := h.createTopicWithSchemaSupport(topicName, defaultPartitions); err != nil {
} else {
// Ledger initialization REMOVED - SMQ handles offsets natively
topicExists = true // CRITICAL FIX: Update the flag after creating the topic
topicExists = true
}
}
@ -624,8 +623,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
_ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) // timeout
offset += 4
// Debug: Log acks and timeout values
// Remember if this is fire-and-forget mode
isFireAndForget := acks == 0
if isFireAndForget {
@ -854,7 +851,7 @@ func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, par
if key == nil {
key = []byte{} // Use empty byte slice for null keys
}
// CRITICAL: Store the original Confluent Wire Format bytes (magic byte + schema ID + payload)
// Store the original Confluent Wire Format bytes (magic byte + schema ID + payload)
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
value := decodedMsg.Envelope.OriginalBytes
@ -1185,7 +1182,7 @@ func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, pa
var err error
valueDecodedMsg, err = h.schemaManager.DecodeMessage(value)
if err != nil {
// CRITICAL: If message has schema ID (magic byte 0x00), decoding MUST succeed
// If message has schema ID (magic byte 0x00), decoding MUST succeed
// Do not fall back to raw storage - this would corrupt the data model
time.Sleep(100 * time.Millisecond)
return 0, fmt.Errorf("message has schema ID but decoding failed (schema registry may be unavailable): %w", err)
@ -1264,7 +1261,7 @@ func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, pa
// Send to SeaweedMQ
if valueDecodedMsg != nil || keyDecodedMsg != nil {
// CRITICAL FIX: Store the DECODED RecordValue (not the original Confluent Wire Format)
// Store the DECODED RecordValue (not the original Confluent Wire Format)
// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
// which can be re-encoded to Confluent Wire Format during fetch if needed
return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)

Loading…
Cancel
Save