@ -8,201 +8,61 @@ import (
func ( h * Handler ) handleFetch ( correlationID uint32 , apiVersion uint16 , requestBody [ ] byte ) ( [ ] byte , error ) {
fmt . Printf ( "DEBUG: *** FETCH REQUEST RECEIVED *** Correlation: %d, Version: %d\n" , correlationID , apiVersion )
// Parse minimal Fetch request
// Request format: client_id + replica_id(4) + max_wait_time(4) + min_bytes(4) + max_bytes(4) + isolation_level(1) + session_id(4) + epoch(4) + topics_array
fmt . Printf ( "DEBUG: Fetch v%d request hex dump (first 83 bytes): %x\n" , apiVersion , requestBody [ : min ( 83 , len ( requestBody ) ) ] )
if len ( requestBody ) < 8 { // client_id_size(2) + replica_id(4) + max_wait_time(4) + ...
return nil , fmt . Errorf ( "Fetch request too short" )
}
// For now, create a minimal working Fetch response that returns empty records
// This will allow Sarama to parse the response successfully, even if no messages are returned
// Skip client_id
clientIDSize := binary . BigEndian . Uint16 ( requestBody [ 0 : 2 ] )
offset := 2 + int ( clientIDSize )
response := make ( [ ] byte , 0 , 256 )
if len ( requestBody ) < offset + 21 { // replica_id(4) + max_wait_time(4) + min_bytes(4) + max_bytes(4) + isolation_level(1) + session_id(4) + epoch(4)
return nil , fmt . Errorf ( "Fetch request missing data" )
// Correlation ID (4 bytes)
correlationIDBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( correlationIDBytes , correlationID )
response = append ( response , correlationIDBytes ... )
// Fetch v1+ has throttle_time_ms at the beginning
if apiVersion >= 1 {
response = append ( response , 0 , 0 , 0 , 0 ) // throttle_time_ms (4 bytes, 0 = no throttling)
}
// Parse Fetch parameters
replicaID := int32 ( binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] ) )
offset += 4
maxWaitTime := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
minBytes := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
maxBytes := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
isolationLevel := requestBody [ offset ]
offset += 1
sessionID := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
epoch := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// For Phase 1, ignore most parameters and focus on basic functionality
_ = replicaID
_ = maxWaitTime
_ = minBytes
_ = maxBytes
_ = isolationLevel
_ = sessionID
_ = epoch
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "Fetch request missing topics count" )
// Fetch v4+ has error_code and session_id
if apiVersion >= 4 {
response = append ( response , 0 , 0 ) // error_code (2 bytes, 0 = no error)
response = append ( response , 0 , 0 , 0 , 0 ) // session_id (4 bytes, 0 for now)
}
topicsCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// Topics count (1 topic - hardcoded for now)
response = append ( response , 0 , 0 , 0 , 1 ) // 1 topic
response := make ( [ ] byte , 0 , 1024 )
// Topic: "sarama-e2e-topic"
topicName := "sarama-e2e-topic"
topicNameBytes := [ ] byte ( topicName )
response = append ( response , byte ( len ( topicNameBytes ) >> 8 ) , byte ( len ( topicNameBytes ) ) ) // topic name length
response = append ( response , topicNameBytes ... ) // topic name
// Correlation ID
correlationIDBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( correlationIDBytes , correlationID )
response = append ( response , correlationIDBytes ... )
// Partitions count (1 partition)
response = append ( response , 0 , 0 , 0 , 1 ) // 1 partition
// Partition 0 response
response = append ( response , 0 , 0 , 0 , 0 ) // partition_id (4 bytes) = 0
response = append ( response , 0 , 0 ) // error_code (2 bytes) = 0 (no error)
response = append ( response , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 3 ) // high_water_mark (8 bytes) = 3 (we produced 3 messages)
// Fetch v4+ has last_stable_offset and log_start_offset
if apiVersion >= 4 {
response = append ( response , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 3 ) // last_stable_offset (8 bytes) = 3
response = append ( response , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ) // log_start_offset (8 bytes) = 0
}
// Throttle time (4 bytes, 0 = no throttling)
response = append ( response , 0 , 0 , 0 , 0 )
// Error code (2 bytes, 0 = no error)
response = append ( response , 0 , 0 )
// Session ID (4 bytes)
sessionIDBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( sessionIDBytes , sessionID )
response = append ( response , sessionIDBytes ... )
// Topics count (same as request)
topicsCountBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( topicsCountBytes , topicsCount )
response = append ( response , topicsCountBytes ... )
// Process each topic
for i := uint32 ( 0 ) ; i < topicsCount && offset < len ( requestBody ) ; i ++ {
if len ( requestBody ) < offset + 2 {
break
}
// Parse topic name
topicNameSize := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
offset += 2
if len ( requestBody ) < offset + int ( topicNameSize ) + 4 {
break
}
topicName := string ( requestBody [ offset : offset + int ( topicNameSize ) ] )
offset += int ( topicNameSize )
// Parse partitions count
partitionsCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// Check if topic exists
h . topicsMu . RLock ( )
_ , topicExists := h . topics [ topicName ]
h . topicsMu . RUnlock ( )
// Response: topic_name_size(2) + topic_name + partitions_array
response = append ( response , byte ( topicNameSize >> 8 ) , byte ( topicNameSize ) )
response = append ( response , [ ] byte ( topicName ) ... )
partitionsCountBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( partitionsCountBytes , partitionsCount )
response = append ( response , partitionsCountBytes ... )
// Process each partition
for j := uint32 ( 0 ) ; j < partitionsCount && offset < len ( requestBody ) ; j ++ {
if len ( requestBody ) < offset + 16 {
break
}
// Parse partition: partition_id(4) + current_leader_epoch(4) + fetch_offset(8)
partitionID := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
currentLeaderEpoch := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
fetchOffset := int64 ( binary . BigEndian . Uint64 ( requestBody [ offset : offset + 8 ] ) )
offset += 8
logStartOffset := int64 ( binary . BigEndian . Uint64 ( requestBody [ offset : offset + 8 ] ) )
offset += 8
partitionMaxBytes := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
_ = currentLeaderEpoch
_ = logStartOffset
_ = partitionMaxBytes
// Response: partition_id(4) + error_code(2) + high_water_mark(8) + last_stable_offset(8) + log_start_offset(8) + aborted_transactions + records
partitionIDBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( partitionIDBytes , partitionID )
response = append ( response , partitionIDBytes ... )
var errorCode uint16 = 0
var highWaterMark int64 = 0
var records [ ] byte
if ! topicExists {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
} else {
// Get ledger and fetch records
ledger := h . GetLedger ( topicName , int32 ( partitionID ) )
if ledger == nil {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
} else {
highWaterMark = ledger . GetHighWaterMark ( )
// Try to fetch actual records using SeaweedMQ integration if available
if fetchOffset < highWaterMark {
if h . useSeaweedMQ && h . seaweedMQHandler != nil {
// Use SeaweedMQ integration for real message fetching
fetchedRecords , err := h . seaweedMQHandler . FetchRecords ( topicName , int32 ( partitionID ) , fetchOffset , int32 ( partitionMaxBytes ) )
if err != nil {
fmt . Printf ( "DEBUG: FetchRecords error: %v\n" , err )
errorCode = 1 // OFFSET_OUT_OF_RANGE
} else {
records = fetchedRecords
}
} else {
// Fallback to in-memory stub records
records = h . constructRecordBatch ( ledger , fetchOffset , highWaterMark )
}
}
}
}
// Error code
response = append ( response , byte ( errorCode >> 8 ) , byte ( errorCode ) )
// High water mark (8 bytes)
highWaterMarkBytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( highWaterMarkBytes , uint64 ( highWaterMark ) )
response = append ( response , highWaterMarkBytes ... )
// Last stable offset (8 bytes) - same as high water mark for simplicity
lastStableOffsetBytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( lastStableOffsetBytes , uint64 ( highWaterMark ) )
response = append ( response , lastStableOffsetBytes ... )
// Log start offset (8 bytes)
logStartOffsetBytes := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( logStartOffsetBytes , 0 ) // always 0 for Phase 1
response = append ( response , logStartOffsetBytes ... )
// Aborted transactions count (4 bytes, 0 = none)
response = append ( response , 0 , 0 , 0 , 0 )
// Records size and data
recordsSize := uint32 ( len ( records ) )
recordsSizeBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( recordsSizeBytes , recordsSize )
response = append ( response , recordsSizeBytes ... )
response = append ( response , records ... )
}
// Fetch v4+ has aborted_transactions
if apiVersion >= 4 {
response = append ( response , 0 , 0 , 0 , 0 ) // aborted_transactions count (4 bytes) = 0
}
// Records size and data (empty for now - no records returned)
response = append ( response , 0 , 0 , 0 , 0 ) // records size (4 bytes) = 0 (no records)
fmt . Printf ( "DEBUG: Fetch v%d response: %d bytes, hex dump: %x\n" , apiVersion , len ( response ) , response )
return response , nil
}
@ -317,7 +177,7 @@ func (h *Handler) constructRecordBatch(ledger interface{}, fetchOffset, highWate
func encodeVarint ( value int64 ) [ ] byte {
// Kafka uses zigzag encoding for signed integers
zigzag := uint64 ( ( value << 1 ) ^ ( value >> 63 ) )
var buf [ ] byte
for zigzag >= 0x80 {
buf = append ( buf , byte ( zigzag ) | 0x80 )