@ -662,7 +662,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return
}
if req . apiKey == 2 { // ListOffsets
glog . Warningf ( "🟡 CONTROL PLANE: Received ListOffsets (apiKey=2), correlationID=%d" , req . correlationID )
}
glog . V ( 4 ) . Infof ( "[%s] Control plane processing correlation=%d, apiKey=%d" , connectionID , req . correlationID , req . apiKey )
@ -678,11 +677,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
} ( )
if req . apiKey == 2 { // ListOffsets
glog . Warningf ( "🟡 CONTROL PLANE: Calling processRequestSync for ListOffsets, correlationID=%d" , req . correlationID )
}
response , err = h . processRequestSync ( req )
if req . apiKey == 2 { // ListOffsets
glog . Warningf ( "🟡 CONTROL PLANE: processRequestSync returned for ListOffsets, correlationID=%d, err=%v" , req . correlationID , err )
}
} ( )
@ -969,13 +966,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Log consumer group coordination requests
switch apiKey {
case 9 :
glog . Warningf ( "🟡 OffsetFetch: correlationID=%d from %s" , correlationID , connectionID )
case 11 :
glog . Warningf ( "🟡 JoinGroup: correlationID=%d from %s" , correlationID , connectionID )
case 12 :
glog . Warningf ( "🟡 Heartbeat: correlationID=%d from %s" , correlationID , connectionID )
case 14 :
glog . Warningf ( "🟡 SyncGroup: correlationID=%d from %s" , correlationID , connectionID )
}
// Extract request body - special handling for ApiVersions requests
@ -1120,7 +1113,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
requestStart := time . Now ( )
apiName := getAPIName ( APIKey ( req . apiKey ) )
glog . Warningf ( "🟡 processRequestSync: apiKey=%d (%s), version=%d, correlationID=%d" , req . apiKey , apiName , req . apiVersion , req . correlationID )
glog . V ( 4 ) . Infof ( "[API] %s (key=%d, ver=%d, corr=%d)" ,
apiName , req . apiKey , req . apiVersion , req . correlationID )
@ -1139,7 +1131,6 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
case APIKeyListOffsets :
glog . Warningf ( "🔥🔥🔥 SWITCH MATCHED: APIKeyListOffsets (apiKey=2)!!!" )
glog . Warningf ( "🟡 processRequestSync: Handling ListOffsets (apiKey=2) - about to call handleListOffsets" )
response , err = h . handleListOffsets ( req . correlationID , req . apiVersion , req . requestBody )
case APIKeyCreateTopics :
@ -1639,14 +1630,12 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
var topicsToReturn [ ] string
if len ( requestedTopics ) == 0 {
topicsToReturn = h . seaweedMQHandler . ListTopics ( )
glog . Warningf ( "🟡 Metadata v3/v4: No specific topics requested, returning ALL topics: %v" , topicsToReturn )
} else {
for _ , name := range requestedTopics {
if h . seaweedMQHandler . TopicExists ( name ) {
topicsToReturn = append ( topicsToReturn , name )
}
}
glog . Warningf ( "🟡 Metadata v3/v4: Requested topics: %v, matched topics: %v" , requestedTopics , topicsToReturn )
}
var buf bytes . Buffer
@ -1667,7 +1656,6 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
nodeID := h . GetNodeID ( ) // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
glog . Warningf ( "🟡 Metadata v3/v4: Returning broker - nodeID=%d, host=%s, port=%d" , nodeID , host , port )
binary . Write ( & buf , binary . BigEndian , nodeID )
// Host (STRING: 2 bytes length + data) - validate length fits in int16
@ -1742,9 +1730,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
if maxDisplay > 50 {
maxDisplay = 50
}
glog . Warningf ( "🟡 Metadata v3/v4 FINAL RESPONSE: size=%d bytes, first50bytes=%v" , len ( response ) , response [ : maxDisplay ] )
if len ( response ) > 100 {
glog . Warningf ( "🟡 Metadata v3/v4 RESPONSE HEX (first 100 bytes): %x" , response [ : 100 ] )
}
return response , nil
@ -1816,7 +1802,6 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
glog . Warningf ( "🟡 Metadata v%d: About to build response with topics=%v" , apiVersion , topicsToReturn )
// ThrottleTimeMs (4 bytes) - v3+ addition
binary . Write ( & buf , binary . BigEndian , int32 ( 0 ) ) // No throttling
@ -1830,7 +1815,6 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
nodeID := h . GetNodeID ( ) // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
glog . Warningf ( "🟡 Metadata v%d: Returning broker - nodeID=%d, host=%s, port=%d" , apiVersion , nodeID , host , port )
binary . Write ( & buf , binary . BigEndian , nodeID )
// Host (STRING: 2 bytes length + data) - validate length fits in int16
@ -1918,9 +1902,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
if maxDisplay > 50 {
maxDisplay = 50
}
glog . Warningf ( "🟡 Metadata v%d FINAL RESPONSE: size=%d bytes, first50bytes=%v" , apiVersion , len ( response ) , response [ : maxDisplay ] )
if len ( response ) > 100 {
glog . Warningf ( "🟡 Metadata v%d RESPONSE HEX (first 100 bytes): %x" , apiVersion , response [ : 100 ] )
}
return response , nil
@ -2047,7 +2029,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
// Process each requested topic
for i := uint32 ( 0 ) ; i < topicsCount && offset < len ( requestBody ) ; i ++ {
if len ( requestBody ) < offset + 2 {
glog . Warningf ( "🟡 ListOffsets: Breaking at topic %d - insufficient data for name size" , i )
break
}
@ -2056,7 +2037,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
offset += 2
if len ( requestBody ) < offset + int ( topicNameSize ) + 4 {
glog . Warningf ( "🟡 ListOffsets: Breaking at topic %d - insufficient data for name (%d bytes needed, %d available)" , i , topicNameSize + 4 , len ( requestBody ) - offset )
break
}
@ -2148,7 +2128,6 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
// This prevents ErrIncompleteResponse when request parsing fails mid-way
if actualTopicsCount != topicsCount {
binary . BigEndian . PutUint32 ( response [ topicsCountOffset : topicsCountOffset + 4 ] , actualTopicsCount )
glog . Warningf ( "🟡 ListOffsets: Updated response count from %d to %d, response size=%d" , topicsCount , actualTopicsCount , len ( response ) )
} else {
glog . Infof ( "🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d" , topicsCount , actualTopicsCount , len ( response ) )
}
@ -2950,7 +2929,6 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey,
// handleMetadata routes to the appropriate version-specific handler
func ( h * Handler ) handleMetadata ( correlationID uint32 , apiVersion uint16 , requestBody [ ] byte ) ( [ ] byte , error ) {
glog . Warningf ( "🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d" , apiVersion , correlationID , len ( requestBody ) )
var response [ ] byte
var err error
@ -2978,9 +2956,7 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
}
if err != nil {
glog . Warningf ( "🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v" , apiVersion , correlationID , err )
} else {
glog . Warningf ( "🟡 handleMetadata EXIT OK: apiVersion=%d, correlationID=%d, responseLen=%d" , apiVersion , correlationID , len ( response ) )
}
return response , err
}