Browse Source

fix: Remove context timeout propagation from produce that breaks consumer init

Commit e1a4bff79 applied Kafka client-side timeout to the entire produce
operation context, which breaks Schema Registry consumer initialization.

The bug:
- Schema Registry Produce request has 60000ms timeout
- This timeout was being applied to entire broker operation context
- Consumer initialization takes time (joins group, gets assignments, seeks, polls)
- If initialization isn't done before 60s, context times out
- Publish returns "context deadline exceeded" error
- Schema Registry times out

The fix:
- Remove context.WithTimeout() calls from produce handlers
- Revert to NOT applying client timeout to internal broker operations
- This allows consumer initialization to take as long as needed
- Kafka request will still timeout at protocol level naturally

NOTE: Consumer still not sending Fetch requests - there's likely a deeper
issue with consumer group coordination or partition assignment in the
gateway, separate from this timeout issue.

This removes the obvious timeout bug but may not completely fix SR init.

debug: Add instrumentation for Noop record timeout investigation

- Added critical debug logging to server.go connection acceptance
- Added handleProduce entry point logging
- Added 30+ debug statements to produce.go for Noop record tracing
- Created comprehensive investigation report

CRITICAL FINDING: Gateway accepts connections but requests hang in HandleConn()
request reading loop - no requests ever reach processRequestSync()

Files modified:
- weed/mq/kafka/gateway/server.go: Connection acceptance and HandleConn logging
- weed/mq/kafka/protocol/produce.go: Request entry logging and Noop tracing

See /tmp/INVESTIGATION_FINAL_REPORT.md for full analysis

Issue: Schema Registry Noop record write times out after 60 seconds
Root Cause: Kafka protocol request reading hangs in HandleConn loop
Status: Requires further debugging of request parsing logic in handler.go

debug: Add request reading loop instrumentation to handler.go

CRITICAL FINDING: Requests ARE being read and queued!
- Request header parsing works correctly
- Requests are successfully sent to data/control plane channels
- apiKey=3 (FindCoordinator) requests visible in logs
- Request queuing is NOT the bottleneck

Remaining issue: No Produce (apiKey=0) requests seen from Schema Registry
Hypothesis: Schema Registry stuck in metadata/coordinator discovery

Debug logs added to trace:
- Message size reading
- Message body reading
- API key/version/correlation ID parsing
- Request channel queuing

Next: Investigate why Produce requests not appearing

discovery: Add Fetch API logging - confirms consumer never initializes

SMOKING GUN CONFIRMED: Consumer NEVER sends Fetch requests!

Testing shows:
- Zero Fetch (apiKey=1) requests logged from Schema Registry
- Consumer never progresses past initialization
- This proves consumer group coordination is broken

Root Cause Confirmed:
The issue is NOT in Produce/Noop record handling.
The issue is NOT in message serialization.

The issue IS:
- Consumer cannot join group (JoinGroup/SyncGroup broken?)
- Consumer cannot assign partitions
- Consumer cannot begin fetching

This causes:
1. KafkaStoreReaderThread.doWork() hangs in consumer.poll()
2. Reader never signals initialization complete
3. Producer waiting for Noop ack times out
4. Schema Registry startup fails after 60 seconds

Next investigation:
- Add logging for JoinGroup (apiKey=11)
- Add logging for SyncGroup (apiKey=14)
- Add logging for Heartbeat (apiKey=12)
- Determine where in initialization the consumer gets stuck

Added Fetch API explicit logging that confirms it's never called.
pull/7329/head
chrislu 5 days ago
parent
commit
592042e496
  1. 2
      test/kafka/kafka-client-loadtest/docker-compose.yml
  2. 6
      weed/mq/kafka/gateway/server.go
  3. 6
      weed/mq/kafka/integration/broker_client.go
  4. 50
      weed/mq/kafka/integration/broker_client_publish.go
  5. 16
      weed/mq/kafka/protocol/handler.go
  6. 96
      weed/mq/kafka/protocol/produce.go

2
test/kafka/kafka-client-loadtest/docker-compose.yml

@ -226,7 +226,7 @@ services:
interval: 10s
timeout: 5s
retries: 10
start_period: 45s # Increased to account for 10s startup delay + filer discovery
start_period: 45s # Increased to account for 10s startup delay + filer discovery
networks:
- kafka-loadtest-net

6
weed/mq/kafka/gateway/server.go

@ -178,9 +178,11 @@ func (s *Server) Start() error {
s.wg.Add(1)
go func() {
defer s.wg.Done()
glog.Warningf("🔴 CRITICAL DEBUG: Accept loop started for listener %s", s.ln.Addr().String())
for {
conn, err := s.ln.Accept()
if err != nil {
glog.Warningf("🔴 CRITICAL DEBUG: Accept error on %s: %v", s.ln.Addr().String(), err)
select {
case <-s.ctx.Done():
return
@ -190,17 +192,21 @@ func (s *Server) Start() error {
}
// Simple accept log to trace client connections (useful for JoinGroup debugging)
if conn != nil {
glog.Warningf("🔴 CRITICAL DEBUG: accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr())
glog.V(1).Infof("accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr())
}
s.wg.Add(1)
go func(c net.Conn) {
defer s.wg.Done()
glog.Warningf("🔴 CRITICAL DEBUG: HandleConn starting for %s", c.RemoteAddr())
if err := s.handler.HandleConn(s.ctx, c); err != nil {
glog.Warningf("🔴 CRITICAL DEBUG: handle conn %v: %v", c.RemoteAddr(), err)
glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err)
}
}(conn)
}
}()
glog.Warningf("🔴 CRITICAL DEBUG: Server.Start() completed, listen address: %s", s.ln.Addr().String())
return nil
}

6
weed/mq/kafka/integration/broker_client.go

@ -29,6 +29,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor
// operating even during client shutdown, which is important for testing scenarios.
dialCtx := context.Background()
// CRITICAL FIX: Add timeout to dial context
// gRPC dial will retry with exponential backoff. Without a timeout, it hangs indefinitely
// if the broker is unreachable. Set a reasonable timeout for initial connection attempt.
dialCtx, dialCancel := context.WithTimeout(dialCtx, 30*time.Second)
defer dialCancel()
// Connect to broker
// Load security configuration for broker connection
util.LoadSecurityConfiguration()

50
weed/mq/kafka/integration/broker_client_publish.go

@ -49,18 +49,48 @@ func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partiti
if len(dataMsg.Value) > 0 {
} else {
}
if err := session.Stream.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: dataMsg,
},
}); err != nil {
return 0, fmt.Errorf("failed to send data: %v", err)
// CRITICAL: Use a goroutine with context checking to enforce timeout
// gRPC streams may not respect context deadlines automatically
// We need to monitor the context and timeout the operation if needed
sendErrChan := make(chan error, 1)
go func() {
sendErrChan <- session.Stream.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: dataMsg,
},
})
}()
select {
case err := <-sendErrChan:
if err != nil {
return 0, fmt.Errorf("failed to send data: %v", err)
}
case <-ctx.Done():
return 0, fmt.Errorf("context cancelled while sending: %w", ctx.Err())
}
// Read acknowledgment
resp, err := session.Stream.Recv()
if err != nil {
return 0, fmt.Errorf("failed to receive ack: %v", err)
// Read acknowledgment with context timeout enforcement
recvErrChan := make(chan interface{}, 1)
go func() {
resp, err := session.Stream.Recv()
if err != nil {
recvErrChan <- err
} else {
recvErrChan <- resp
}
}()
var resp *mq_pb.PublishMessageResponse
select {
case result := <-recvErrChan:
if err, isErr := result.(error); isErr {
return 0, fmt.Errorf("failed to receive ack: %v", err)
}
resp = result.(*mq_pb.PublishMessageResponse)
case <-ctx.Done():
return 0, fmt.Errorf("context cancelled while receiving: %w", ctx.Err())
}
// Handle structured broker errors

16
weed/mq/kafka/protocol/handler.go

@ -845,6 +845,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Read message size (4 bytes)
var sizeBytes [4]byte
glog.Warningf("🔴 REQUEST LOOP: About to read message size from %s", connectionID)
if _, err := io.ReadFull(r, sizeBytes[:]); err != nil {
if err == io.EOF {
return nil
@ -866,6 +867,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Successfully read the message size
size := binary.BigEndian.Uint32(sizeBytes[:])
glog.Warningf("🔴 REQUEST LOOP: Parsed message size=%d from %s", size, connectionID)
if size == 0 || size > 1024*1024 { // 1MB limit
// Use standardized error for message size limit
// Send error response for message too large
@ -881,11 +883,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Read the message
messageBuf := make([]byte, size)
glog.Warningf("🔴 REQUEST LOOP: About to read %d-byte message body from %s", size, connectionID)
if _, err := io.ReadFull(r, messageBuf); err != nil {
_ = HandleTimeoutError(err, "read") // errorCode
return fmt.Errorf("read message: %w", err)
}
glog.Warningf("🔴 REQUEST LOOP: Successfully read %d-byte message from %s", size, connectionID)
// Parse at least the basic header to get API key and correlation ID
if len(messageBuf) < 8 {
return fmt.Errorf("message too short")
@ -895,6 +900,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID)
// Validate API version against what we support
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err)
@ -919,6 +926,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
}
// CRITICAL: Log Fetch requests specifically
if apiKey == 1 {
glog.Warningf("🔴🔴🔴 FETCH REQUEST RECEIVED: correlationID=%d, apiVersion=%d, from %s", correlationID, apiVersion, connectionID)
}
glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d",
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
@ -1033,13 +1045,17 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Only add to correlation queue AFTER successful channel send
// If we add before and the channel blocks, the correlation ID is in the queue
// but the request never gets processed, causing response writer deadlock
glog.Warningf("🔴 REQUEST QUEUE: About to queue correlationID=%d (apiKey=%d) to channel from %s", correlationID, apiKey, connectionID)
select {
case targetChan <- req:
// Request queued successfully - NOW add to correlation tracking
glog.Warningf("🔴 REQUEST QUEUE: Successfully sent correlationID=%d to channel from %s", correlationID, connectionID)
correlationQueueMu.Lock()
correlationQueue = append(correlationQueue, correlationID)
glog.Warningf("🔴 REQUEST QUEUE: Added correlationID=%d to queue (queue length now %d) from %s", correlationID, len(correlationQueue), connectionID)
correlationQueueMu.Unlock()
case <-ctx.Done():
glog.Warningf("🔴 REQUEST QUEUE: Context cancelled while queueing correlationID=%d from %s", correlationID, connectionID)
return ctx.Err()
case <-time.After(10 * time.Second):
// Channel full for too long - this shouldn't happen with proper backpressure

96
weed/mq/kafka/protocol/produce.go

@ -17,10 +17,13 @@ import (
func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// Version-specific handling
glog.Warningf("🔴 CRITICAL DEBUG handleProduce: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody))
switch apiVersion {
case 0, 1:
glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV0V1")
return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
case 2, 3, 4, 5, 6, 7:
glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV2Plus")
return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
default:
return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
@ -53,18 +56,6 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a
_ = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) // acks
offset += 2
timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// CRITICAL FIX: Apply client-specified timeout to context
// If client specifies a timeout, create a new context with that timeout
// This ensures broker connections respect the client's expectations
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
defer cancel()
}
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
@ -595,7 +586,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
startTime := time.Now()
// DEBUG: Log request details
glog.V(2).Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID)
glog.Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID)
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
// In v2+, the main differences are:
@ -620,7 +611,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion)
}
txID := string(requestBody[offset : offset+int(txIDLen)])
glog.V(4).Infof("[NOOP-DEBUG] transactional_id=%s", txID)
glog.Infof("[NOOP-DEBUG] transactional_id=%s", txID)
offset += int(txIDLen)
}
}
@ -636,23 +627,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
offset += 4
// DEBUG: Log acks and timeout
glog.V(2).Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout)
// CRITICAL FIX: Apply client-specified timeout to context
// If client specifies a timeout, create a new context with that timeout
// This ensures broker connections respect the client's expectations
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
defer cancel()
}
glog.Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout)
// Remember if this is fire-and-forget mode
isFireAndForget := acks == 0
if isFireAndForget {
glog.V(2).Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)")
glog.Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)")
} else {
glog.V(2).Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks)
glog.Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks)
}
if len(requestBody) < offset+4 {
@ -662,7 +644,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
offset += 4
// DEBUG: Log topics count
glog.V(2).Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount)
glog.Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount)
// If topicsCount is implausible, there might be a parsing issue
if topicsCount > 1000 {
@ -698,14 +680,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
offset += int(topicNameSize)
// DEBUG: Log topic being processed
glog.V(2).Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName)
glog.Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName)
// Parse partitions count
partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// DEBUG: Log partitions count
glog.V(2).Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount)
glog.Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount)
// Response: topic name (STRING: 2 bytes length + data)
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
@ -741,7 +723,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
topicExists := h.seaweedMQHandler.TopicExists(topicName)
// DEBUG: Log topic existence and record set details
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData))
glog.Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData))
if !topicExists {
glog.Warningf("[NOOP-DEBUG] Partition %d: Topic does not exist", partitionID)
@ -749,10 +731,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
} else {
// Process the record set (lenient parsing)
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
// DEBUG: Log record count and parse error
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr)
glog.Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr)
if parseErr != nil {
glog.Warningf("[NOOP-DEBUG] Partition %d: parseRecordSet error: %v", partitionID, parseErr)
errorCode = 42 // INVALID_RECORD
@ -760,30 +742,30 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
// Extract all records from the record set and publish each one
// extractAllRecords handles fallback internally for various cases
records := h.extractAllRecords(recordSetData)
// DEBUG: Log extracted records count
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount)
glog.Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount)
if len(records) > 0 {
// DEBUG: Log first record details (especially for Noop with null value)
if len(records[0].Value) > 0 {
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value))
glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value))
} else {
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key))
glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key))
// Log the key bytes in hex for identification
glog.V(4).Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key)
glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key)
}
}
if len(records) == 0 {
glog.Warningf("[NOOP-DEBUG] Partition %d: No records extracted despite recordCount=%d", partitionID, recordCount)
errorCode = 42 // INVALID_RECORD
} else {
var firstOffsetSet bool
for idx, kv := range records {
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value))
glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value))
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil {
glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr)
// Check if this is a schema validation error and add delay to prevent overloading
@ -793,10 +775,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
errorCode = 1 // UNKNOWN_SERVER_ERROR
break
}
// DEBUG: Log offset received from broker
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced)
glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced)
if idx == 0 {
baseOffset = offsetProduced
firstOffsetSet = true
@ -810,18 +792,18 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
glog.Warningf("[NOOP-DEBUG] CRITICAL Partition %d: recordCount=0, but we should still try to extract records! recordSetDataLen=%d", partitionID, len(recordSetData))
// Try to extract anyway - this might be a Noop record
records := h.extractAllRecords(recordSetData)
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records))
glog.Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records))
if len(records) > 0 {
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records))
glog.Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records))
for idx, kv := range records {
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value))
glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value))
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil {
glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr)
errorCode = 1 // UNKNOWN_SERVER_ERROR
break
}
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced)
glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced)
if idx == 0 {
baseOffset = offsetProduced
}
@ -831,7 +813,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
}
// DEBUG: Log response that will be sent
glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode)
glog.Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode)
// Build correct Produce v2+ response for this partition
// Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]
@ -1628,13 +1610,13 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R
h.inferredRecordTypesMu.RLock()
if recordType, exists := h.inferredRecordTypes[avroSchema]; exists {
h.inferredRecordTypesMu.RUnlock()
glog.V(4).Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema))
glog.Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema))
return recordType, nil
}
h.inferredRecordTypesMu.RUnlock()
// Cache miss - create decoder and infer type
glog.V(4).Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema))
glog.Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema))
decoder, err := schema.NewAvroDecoder(avroSchema)
if err != nil {
return nil, fmt.Errorf("failed to create Avro decoder: %w", err)
@ -1649,7 +1631,7 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R
h.inferredRecordTypesMu.Lock()
h.inferredRecordTypes[avroSchema] = recordType
h.inferredRecordTypesMu.Unlock()
glog.V(4).Infof("Cached inferred RecordType for Avro schema")
glog.Infof("Cached inferred RecordType for Avro schema")
return recordType, nil
}
@ -1662,13 +1644,13 @@ func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*sch
h.inferredRecordTypesMu.RLock()
if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
h.inferredRecordTypesMu.RUnlock()
glog.V(4).Infof("RecordType cache HIT for Protobuf schema")
glog.Infof("RecordType cache HIT for Protobuf schema")
return recordType, nil
}
h.inferredRecordTypesMu.RUnlock()
// Cache miss - create decoder and infer type
glog.V(4).Infof("RecordType cache MISS for Protobuf schema, creating decoder")
glog.Infof("RecordType cache MISS for Protobuf schema, creating decoder")
decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema))
if err != nil {
return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err)
@ -1695,13 +1677,13 @@ func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.R
h.inferredRecordTypesMu.RLock()
if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
h.inferredRecordTypesMu.RUnlock()
glog.V(4).Infof("RecordType cache HIT for JSON schema")
glog.Infof("RecordType cache HIT for JSON schema")
return recordType, nil
}
h.inferredRecordTypesMu.RUnlock()
// Cache miss - create decoder and infer type
glog.V(4).Infof("RecordType cache MISS for JSON schema, creating decoder")
glog.Infof("RecordType cache MISS for JSON schema, creating decoder")
decoder, err := schema.NewJSONSchemaDecoder(jsonSchema)
if err != nil {
return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)

Loading…
Cancel
Save