diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 77b313c2b..7b5c8fb73 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -662,7 +662,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return } if req.apiKey == 2 { // ListOffsets - glog.Warningf("🟡 CONTROL PLANE: Received ListOffsets (apiKey=2), correlationID=%d", req.correlationID) } glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) @@ -678,11 +677,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } }() if req.apiKey == 2 { // ListOffsets - glog.Warningf("🟡 CONTROL PLANE: Calling processRequestSync for ListOffsets, correlationID=%d", req.correlationID) } response, err = h.processRequestSync(req) if req.apiKey == 2 { // ListOffsets - glog.Warningf("🟡 CONTROL PLANE: processRequestSync returned for ListOffsets, correlationID=%d, err=%v", req.correlationID, err) } }() @@ -969,13 +966,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Log consumer group coordination requests switch apiKey { case 9: - glog.Warningf("🟡 OffsetFetch: correlationID=%d from %s", correlationID, connectionID) case 11: - glog.Warningf("🟡 JoinGroup: correlationID=%d from %s", correlationID, connectionID) case 12: - glog.Warningf("🟡 Heartbeat: correlationID=%d from %s", correlationID, connectionID) case 14: - glog.Warningf("🟡 SyncGroup: correlationID=%d from %s", correlationID, connectionID) } // Extract request body - special handling for ApiVersions requests @@ -1120,7 +1113,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { requestStart := time.Now() apiName := getAPIName(APIKey(req.apiKey)) - glog.Warningf("🟡 processRequestSync: apiKey=%d (%s), version=%d, correlationID=%d", req.apiKey, apiName, req.apiVersion, req.correlationID) glog.V(4).Infof("[API] %s (key=%d, ver=%d, corr=%d)", apiName, req.apiKey, req.apiVersion, req.correlationID) @@ -1139,7 +1131,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { case APIKeyListOffsets: glog.Warningf("🔥🔥🔥 SWITCH MATCHED: APIKeyListOffsets (apiKey=2)!!!") - glog.Warningf("🟡 processRequestSync: Handling ListOffsets (apiKey=2) - about to call handleListOffsets") response, err = h.handleListOffsets(req.correlationID, req.apiVersion, req.requestBody) case APIKeyCreateTopics: @@ -1639,14 +1630,12 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( var topicsToReturn []string if len(requestedTopics) == 0 { topicsToReturn = h.seaweedMQHandler.ListTopics() - glog.Warningf("🟡 Metadata v3/v4: No specific topics requested, returning ALL topics: %v", topicsToReturn) } else { for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) } } - glog.Warningf("🟡 Metadata v3/v4: Requested topics: %v, matched topics: %v", requestedTopics, topicsToReturn) } var buf bytes.Buffer @@ -1667,7 +1656,6 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) - glog.Warningf("🟡 Metadata v3/v4: Returning broker - nodeID=%d, host=%s, port=%d", nodeID, host, port) binary.Write(&buf, binary.BigEndian, nodeID) // Host (STRING: 2 bytes length + data) - validate length fits in int16 @@ -1742,9 +1730,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( if maxDisplay > 50 { maxDisplay = 50 } - glog.Warningf("🟡 Metadata v3/v4 FINAL RESPONSE: size=%d bytes, first50bytes=%v", len(response), response[:maxDisplay]) if len(response) > 100 { - glog.Warningf("🟡 Metadata v3/v4 RESPONSE HEX (first 100 bytes): %x", response[:100]) } return response, nil @@ -1816,7 +1802,6 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // NOTE: Correlation ID is handled by writeResponseWithCorrelationID // Do NOT include it in the response body - glog.Warningf("🟡 Metadata v%d: About to build response with topics=%v", apiVersion, topicsToReturn) // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling @@ -1830,7 +1815,6 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) - glog.Warningf("🟡 Metadata v%d: Returning broker - nodeID=%d, host=%s, port=%d", apiVersion, nodeID, host, port) binary.Write(&buf, binary.BigEndian, nodeID) // Host (STRING: 2 bytes length + data) - validate length fits in int16 @@ -1918,9 +1902,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, if maxDisplay > 50 { maxDisplay = 50 } - glog.Warningf("🟡 Metadata v%d FINAL RESPONSE: size=%d bytes, first50bytes=%v", apiVersion, len(response), response[:maxDisplay]) if len(response) > 100 { - glog.Warningf("🟡 Metadata v%d RESPONSE HEX (first 100 bytes): %x", apiVersion, response[:100]) } return response, nil @@ -2047,7 +2029,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Process each requested topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { - glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name size", i) break } @@ -2056,7 +2037,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req offset += 2 if len(requestBody) < offset+int(topicNameSize)+4 { - glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name (%d bytes needed, %d available)", i, topicNameSize+4, len(requestBody)-offset) break } @@ -2148,7 +2128,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // This prevents ErrIncompleteResponse when request parsing fails mid-way if actualTopicsCount != topicsCount { binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) - glog.Warningf("🟡 ListOffsets: Updated response count from %d to %d, response size=%d", topicsCount, actualTopicsCount, len(response)) } else { glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } @@ -2950,7 +2929,6 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { - glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody)) var response []byte var err error @@ -2978,9 +2956,7 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques } if err != nil { - glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err) } else { - glog.Warningf("🟡 handleMetadata EXIT OK: apiVersion=%d, correlationID=%d, responseLen=%d", apiVersion, correlationID, len(response)) } return response, err }