@ -869,7 +869,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Read message size (4 bytes)
var sizeBytes [ 4 ] byte
glog . Warningf ( "🔴 REQUEST LOOP: About to read message size from %s" , connectionID )
if _ , err := io . ReadFull ( r , sizeBytes [ : ] ) ; err != nil {
if err == io . EOF {
return nil
@ -891,7 +890,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Successfully read the message size
size := binary . BigEndian . Uint32 ( sizeBytes [ : ] )
glog . Warningf ( "🔴 REQUEST LOOP: Parsed message size=%d from %s" , size , connectionID )
if size == 0 || size > 1024 * 1024 { // 1MB limit
// Use standardized error for message size limit
// Send error response for message too large
@ -907,13 +905,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Read the message
messageBuf := make ( [ ] byte , size )
glog . Warningf ( "🔴 REQUEST LOOP: About to read %d-byte message body from %s" , size , connectionID )
if _ , err := io . ReadFull ( r , messageBuf ) ; err != nil {
_ = HandleTimeoutError ( err , "read" ) // errorCode
return fmt . Errorf ( "read message: %w" , err )
}
glog . Warningf ( "🔴 REQUEST LOOP: Successfully read %d-byte message from %s" , size , connectionID )
// Parse at least the basic header to get API key and correlation ID
if len ( messageBuf ) < 8 {
@ -929,7 +925,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
if maxLen > 16 {
maxLen = 16
}
glog . Warningf ( "🔴 RAW REQUEST: correlationID=%d, firstTwoBytes=[%02x %02x] = apiKey=%d, first16bytes=%v" , correlationID , messageBuf [ 0 ] , messageBuf [ 1 ] , apiKey , messageBuf [ : maxLen ] )
// Validate API version against what we support
if err := h . validateAPIVersion ( apiKey , apiVersion ) ; err != nil {
@ -957,7 +952,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// CRITICAL: Log Fetch requests specifically
if apiKey == 1 {
glog . Warningf ( "🔴🔴🔴 FETCH REQUEST RECEIVED: correlationID=%d, apiVersion=%d, from %s" , correlationID , apiVersion , connectionID )
}
glog . V ( 4 ) . Infof ( "API version validated: Key=%d (%s), Version=%d, Correlation=%d" ,
@ -1074,30 +1068,23 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Route to appropriate channel based on API key
var targetChan chan * kafkaRequest
if apiKey == 2 { // ListOffsets
glog . Warningf ( "🔴 BEFORE ROUTING: ListOffsets request ready to route - correlationID=%d, requestBodyLen=%d" , correlationID , len ( requestBody ) )
}
if isDataPlaneAPI ( apiKey ) {
targetChan = dataChan
glog . Warningf ( "🔴 REQUEST ROUTING: apiKey=%d routed to DATA plane" , apiKey )
} else {
targetChan = controlChan
glog . Warningf ( "🔴 REQUEST ROUTING: apiKey=%d routed to CONTROL plane" , apiKey )
}
// Only add to correlation queue AFTER successful channel send
// If we add before and the channel blocks, the correlation ID is in the queue
// but the request never gets processed, causing response writer deadlock
glog . Warningf ( "🔴 REQUEST QUEUE: About to queue correlationID=%d (apiKey=%d) to channel from %s" , correlationID , apiKey , connectionID )
select {
case targetChan <- req :
// Request queued successfully - NOW add to correlation tracking
glog . Warningf ( "🔴 REQUEST QUEUE: Successfully sent correlationID=%d to channel from %s" , correlationID , connectionID )
correlationQueueMu . Lock ( )
correlationQueue = append ( correlationQueue , correlationID )
glog . Warningf ( "🔴 REQUEST QUEUE: Added correlationID=%d to queue (queue length now %d) from %s" , correlationID , len ( correlationQueue ) , connectionID )
correlationQueueMu . Unlock ( )
case <- ctx . Done ( ) :
glog . Warningf ( "🔴 REQUEST QUEUE: Context cancelled while queueing correlationID=%d from %s" , correlationID , connectionID )
return ctx . Err ( )
case <- time . After ( 10 * time . Second ) :
// Channel full for too long - this shouldn't happen with proper backpressure
@ -1122,15 +1109,12 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
switch APIKey ( req . apiKey ) {
case APIKeyApiVersions :
glog . Warningf ( "🔥 SWITCH MATCHED: APIKeyApiVersions" )
response , err = h . handleApiVersions ( req . correlationID , req . apiVersion )
case APIKeyMetadata :
glog . Warningf ( "🔥 SWITCH MATCHED: APIKeyMetadata" )
response , err = h . handleMetadata ( req . correlationID , req . apiVersion , req . requestBody )
case APIKeyListOffsets :
glog . Warningf ( "🔥🔥🔥 SWITCH MATCHED: APIKeyListOffsets (apiKey=2)!!!" )
response , err = h . handleListOffsets ( req . correlationID , req . apiVersion , req . requestBody )
case APIKeyCreateTopics :
@ -1968,20 +1952,12 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
// Parse minimal request to understand what's being asked (header already stripped)
offset := 0
glog . Warningf ( "🔥🔥🔥 ListOffsets HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d" , correlationID , apiVersion , len ( requestBody ) )
maxBytes := len ( requestBody )
if maxBytes > 64 {
maxBytes = 64
}
glog . Warningf ( "🔥🔥🔥 ListOffsets first %d bytes (hex): %x" , maxBytes , requestBody [ : maxBytes ] )
// Log the specific replica ID and topics being requested
if len ( requestBody ) >= 4 {
replicaID := int32 ( binary . BigEndian . Uint32 ( requestBody [ 0 : 4 ] ) )
glog . Warningf ( "🔥🔥🔥 ListOffsets replica_id=%d" , replicaID )
}
// v1+ has replica_id(4)
if apiVersion >= 1 {
if len ( requestBody ) < offset + 4 {
@ -2129,16 +2105,13 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
if actualTopicsCount != topicsCount {
binary . BigEndian . PutUint32 ( response [ topicsCountOffset : topicsCountOffset + 4 ] , actualTopicsCount )
} else {
glog . Infof ( "🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d" , topicsCount , actualTopicsCount , len ( response ) )
}
glog . Warningf ( "🔥🔥🔥 ListOffsets HANDLER RETURNING: correlationID=%d, responseLen=%d" , correlationID , len ( response ) )
if len ( response ) > 0 {
respPreview := len ( response )
if respPreview > 32 {
respPreview = 32
}
glog . Warningf ( "🔥🔥🔥 ListOffsets response first %d bytes (hex): %x" , respPreview , response [ : respPreview ] )
}
return response , nil
@ -3915,13 +3888,11 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
// v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16)
// v4+: Uses flexible format with tagged fields
glog . Warningf ( "🔴🔴🔴 InitProducerId HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d" , correlationID , apiVersion , len ( requestBody ) )
maxBytes := len ( requestBody )
if maxBytes > 64 {
maxBytes = 64
}
glog . Warningf ( "🔴🔴🔴 InitProducerId request first %d bytes (hex): %x" , maxBytes , requestBody [ : maxBytes ] )
offset := 0
@ -4028,12 +3999,10 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
response = append ( response , 0x00 ) // Empty response body tagged fields
}
glog . Warningf ( "🔴🔴🔴 InitProducerId HANDLER RETURNING: correlationID=%d, responseLen=%d bytes" , correlationID , len ( response ) )
respPreview := len ( response )
if respPreview > 32 {
respPreview = 32
}
glog . Warningf ( "🔴🔴🔴 InitProducerId response (hex): %x" , response [ : respPreview ] )
return response , nil
}