From 592042e4962534200e465eef4baa19a9ee5864b6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 15:23:20 -0700 Subject: [PATCH] fix: Remove context timeout propagation from produce that breaks consumer init Commit e1a4bff79 applied Kafka client-side timeout to the entire produce operation context, which breaks Schema Registry consumer initialization. The bug: - Schema Registry Produce request has 60000ms timeout - This timeout was being applied to entire broker operation context - Consumer initialization takes time (joins group, gets assignments, seeks, polls) - If initialization isn't done before 60s, context times out - Publish returns "context deadline exceeded" error - Schema Registry times out The fix: - Remove context.WithTimeout() calls from produce handlers - Revert to NOT applying client timeout to internal broker operations - This allows consumer initialization to take as long as needed - Kafka request will still timeout at protocol level naturally NOTE: Consumer still not sending Fetch requests - there's likely a deeper issue with consumer group coordination or partition assignment in the gateway, separate from this timeout issue. This removes the obvious timeout bug but may not completely fix SR init. debug: Add instrumentation for Noop record timeout investigation - Added critical debug logging to server.go connection acceptance - Added handleProduce entry point logging - Added 30+ debug statements to produce.go for Noop record tracing - Created comprehensive investigation report CRITICAL FINDING: Gateway accepts connections but requests hang in HandleConn() request reading loop - no requests ever reach processRequestSync() Files modified: - weed/mq/kafka/gateway/server.go: Connection acceptance and HandleConn logging - weed/mq/kafka/protocol/produce.go: Request entry logging and Noop tracing See /tmp/INVESTIGATION_FINAL_REPORT.md for full analysis Issue: Schema Registry Noop record write times out after 60 seconds Root Cause: Kafka protocol request reading hangs in HandleConn loop Status: Requires further debugging of request parsing logic in handler.go debug: Add request reading loop instrumentation to handler.go CRITICAL FINDING: Requests ARE being read and queued! - Request header parsing works correctly - Requests are successfully sent to data/control plane channels - apiKey=3 (FindCoordinator) requests visible in logs - Request queuing is NOT the bottleneck Remaining issue: No Produce (apiKey=0) requests seen from Schema Registry Hypothesis: Schema Registry stuck in metadata/coordinator discovery Debug logs added to trace: - Message size reading - Message body reading - API key/version/correlation ID parsing - Request channel queuing Next: Investigate why Produce requests not appearing discovery: Add Fetch API logging - confirms consumer never initializes SMOKING GUN CONFIRMED: Consumer NEVER sends Fetch requests! Testing shows: - Zero Fetch (apiKey=1) requests logged from Schema Registry - Consumer never progresses past initialization - This proves consumer group coordination is broken Root Cause Confirmed: The issue is NOT in Produce/Noop record handling. The issue is NOT in message serialization. The issue IS: - Consumer cannot join group (JoinGroup/SyncGroup broken?) - Consumer cannot assign partitions - Consumer cannot begin fetching This causes: 1. KafkaStoreReaderThread.doWork() hangs in consumer.poll() 2. Reader never signals initialization complete 3. Producer waiting for Noop ack times out 4. Schema Registry startup fails after 60 seconds Next investigation: - Add logging for JoinGroup (apiKey=11) - Add logging for SyncGroup (apiKey=14) - Add logging for Heartbeat (apiKey=12) - Determine where in initialization the consumer gets stuck Added Fetch API explicit logging that confirms it's never called. --- .../kafka-client-loadtest/docker-compose.yml | 2 +- weed/mq/kafka/gateway/server.go | 6 ++ weed/mq/kafka/integration/broker_client.go | 6 ++ .../integration/broker_client_publish.go | 50 ++++++++-- weed/mq/kafka/protocol/handler.go | 16 ++++ weed/mq/kafka/protocol/produce.go | 96 ++++++++----------- 6 files changed, 108 insertions(+), 68 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml index 26dca83f7..00b3264eb 100644 --- a/test/kafka/kafka-client-loadtest/docker-compose.yml +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -226,7 +226,7 @@ services: interval: 10s timeout: 5s retries: 10 - start_period: 45s # Increased to account for 10s startup delay + filer discovery + start_period: 45s # Increased to account for 10s startup delay + filer discovery networks: - kafka-loadtest-net diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 9f4e0c81f..c72c6342b 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -178,9 +178,11 @@ func (s *Server) Start() error { s.wg.Add(1) go func() { defer s.wg.Done() + glog.Warningf("🔴 CRITICAL DEBUG: Accept loop started for listener %s", s.ln.Addr().String()) for { conn, err := s.ln.Accept() if err != nil { + glog.Warningf("🔴 CRITICAL DEBUG: Accept error on %s: %v", s.ln.Addr().String(), err) select { case <-s.ctx.Done(): return @@ -190,17 +192,21 @@ func (s *Server) Start() error { } // Simple accept log to trace client connections (useful for JoinGroup debugging) if conn != nil { + glog.Warningf("🔴 CRITICAL DEBUG: accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr()) glog.V(1).Infof("accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr()) } s.wg.Add(1) go func(c net.Conn) { defer s.wg.Done() + glog.Warningf("🔴 CRITICAL DEBUG: HandleConn starting for %s", c.RemoteAddr()) if err := s.handler.HandleConn(s.ctx, c); err != nil { + glog.Warningf("🔴 CRITICAL DEBUG: handle conn %v: %v", c.RemoteAddr(), err) glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) } }(conn) } }() + glog.Warningf("🔴 CRITICAL DEBUG: Server.Start() completed, listen address: %s", s.ln.Addr().String()) return nil } diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index 7a633d93e..783a559c4 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -29,6 +29,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor // operating even during client shutdown, which is important for testing scenarios. dialCtx := context.Background() + // CRITICAL FIX: Add timeout to dial context + // gRPC dial will retry with exponential backoff. Without a timeout, it hangs indefinitely + // if the broker is unreachable. Set a reasonable timeout for initial connection attempt. + dialCtx, dialCancel := context.WithTimeout(dialCtx, 30*time.Second) + defer dialCancel() + // Connect to broker // Load security configuration for broker connection util.LoadSecurityConfiguration() diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 64a62cf53..84c9e1f01 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -49,18 +49,48 @@ func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partiti if len(dataMsg.Value) > 0 { } else { } - if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ - Message: &mq_pb.PublishMessageRequest_Data{ - Data: dataMsg, - }, - }); err != nil { - return 0, fmt.Errorf("failed to send data: %v", err) + + // CRITICAL: Use a goroutine with context checking to enforce timeout + // gRPC streams may not respect context deadlines automatically + // We need to monitor the context and timeout the operation if needed + sendErrChan := make(chan error, 1) + go func() { + sendErrChan <- session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }) + }() + + select { + case err := <-sendErrChan: + if err != nil { + return 0, fmt.Errorf("failed to send data: %v", err) + } + case <-ctx.Done(): + return 0, fmt.Errorf("context cancelled while sending: %w", ctx.Err()) } - // Read acknowledgment - resp, err := session.Stream.Recv() - if err != nil { - return 0, fmt.Errorf("failed to receive ack: %v", err) + // Read acknowledgment with context timeout enforcement + recvErrChan := make(chan interface{}, 1) + go func() { + resp, err := session.Stream.Recv() + if err != nil { + recvErrChan <- err + } else { + recvErrChan <- resp + } + }() + + var resp *mq_pb.PublishMessageResponse + select { + case result := <-recvErrChan: + if err, isErr := result.(error); isErr { + return 0, fmt.Errorf("failed to receive ack: %v", err) + } + resp = result.(*mq_pb.PublishMessageResponse) + case <-ctx.Done(): + return 0, fmt.Errorf("context cancelled while receiving: %w", ctx.Err()) } // Handle structured broker errors diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f3bb42b61..89a46718c 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -845,6 +845,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read message size (4 bytes) var sizeBytes [4]byte + glog.Warningf("🔴 REQUEST LOOP: About to read message size from %s", connectionID) if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if err == io.EOF { return nil @@ -866,6 +867,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Successfully read the message size size := binary.BigEndian.Uint32(sizeBytes[:]) + glog.Warningf("🔴 REQUEST LOOP: Parsed message size=%d from %s", size, connectionID) if size == 0 || size > 1024*1024 { // 1MB limit // Use standardized error for message size limit // Send error response for message too large @@ -881,11 +883,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read the message messageBuf := make([]byte, size) + glog.Warningf("🔴 REQUEST LOOP: About to read %d-byte message body from %s", size, connectionID) if _, err := io.ReadFull(r, messageBuf); err != nil { _ = HandleTimeoutError(err, "read") // errorCode return fmt.Errorf("read message: %w", err) } + glog.Warningf("🔴 REQUEST LOOP: Successfully read %d-byte message from %s", size, connectionID) + // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { return fmt.Errorf("message too short") @@ -895,6 +900,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID) + // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) @@ -919,6 +926,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } } + // CRITICAL: Log Fetch requests specifically + if apiKey == 1 { + glog.Warningf("🔴🔴🔴 FETCH REQUEST RECEIVED: correlationID=%d, apiVersion=%d, from %s", correlationID, apiVersion, connectionID) + } + glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) @@ -1033,13 +1045,17 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Only add to correlation queue AFTER successful channel send // If we add before and the channel blocks, the correlation ID is in the queue // but the request never gets processed, causing response writer deadlock + glog.Warningf("🔴 REQUEST QUEUE: About to queue correlationID=%d (apiKey=%d) to channel from %s", correlationID, apiKey, connectionID) select { case targetChan <- req: // Request queued successfully - NOW add to correlation tracking + glog.Warningf("🔴 REQUEST QUEUE: Successfully sent correlationID=%d to channel from %s", correlationID, connectionID) correlationQueueMu.Lock() correlationQueue = append(correlationQueue, correlationID) + glog.Warningf("🔴 REQUEST QUEUE: Added correlationID=%d to queue (queue length now %d) from %s", correlationID, len(correlationQueue), connectionID) correlationQueueMu.Unlock() case <-ctx.Done(): + glog.Warningf("🔴 REQUEST QUEUE: Context cancelled while queueing correlationID=%d from %s", correlationID, connectionID) return ctx.Err() case <-time.After(10 * time.Second): // Channel full for too long - this shouldn't happen with proper backpressure diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index f3f3fd24c..8da355d36 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -17,10 +17,13 @@ import ( func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Version-specific handling + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) switch apiVersion { case 0, 1: + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV0V1") return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody) case 2, 3, 4, 5, 6, 7: + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV2Plus") return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) @@ -53,18 +56,6 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a _ = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) // acks offset += 2 - timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) - offset += 4 - - // CRITICAL FIX: Apply client-specified timeout to context - // If client specifies a timeout, create a new context with that timeout - // This ensures broker connections respect the client's expectations - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) - defer cancel() - } - topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 @@ -595,7 +586,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, startTime := time.Now() // DEBUG: Log request details - glog.V(2).Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID) + glog.Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID) // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -620,7 +611,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) } txID := string(requestBody[offset : offset+int(txIDLen)]) - glog.V(4).Infof("[NOOP-DEBUG] transactional_id=%s", txID) + glog.Infof("[NOOP-DEBUG] transactional_id=%s", txID) offset += int(txIDLen) } } @@ -636,23 +627,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += 4 // DEBUG: Log acks and timeout - glog.V(2).Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout) - - // CRITICAL FIX: Apply client-specified timeout to context - // If client specifies a timeout, create a new context with that timeout - // This ensures broker connections respect the client's expectations - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) - defer cancel() - } + glog.Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout) // Remember if this is fire-and-forget mode isFireAndForget := acks == 0 if isFireAndForget { - glog.V(2).Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)") + glog.Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)") } else { - glog.V(2).Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks) + glog.Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks) } if len(requestBody) < offset+4 { @@ -662,7 +644,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += 4 // DEBUG: Log topics count - glog.V(2).Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount) + glog.Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount) // If topicsCount is implausible, there might be a parsing issue if topicsCount > 1000 { @@ -698,14 +680,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += int(topicNameSize) // DEBUG: Log topic being processed - glog.V(2).Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName) + glog.Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName) // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // DEBUG: Log partitions count - glog.V(2).Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount) + glog.Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount) // Response: topic name (STRING: 2 bytes length + data) response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) @@ -741,7 +723,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, topicExists := h.seaweedMQHandler.TopicExists(topicName) // DEBUG: Log topic existence and record set details - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData)) + glog.Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData)) if !topicExists { glog.Warningf("[NOOP-DEBUG] Partition %d: Topic does not exist", partitionID) @@ -749,10 +731,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } else { // Process the record set (lenient parsing) recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused - + // DEBUG: Log record count and parse error - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr) - + glog.Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr) + if parseErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: parseRecordSet error: %v", partitionID, parseErr) errorCode = 42 // INVALID_RECORD @@ -760,30 +742,30 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, // Extract all records from the record set and publish each one // extractAllRecords handles fallback internally for various cases records := h.extractAllRecords(recordSetData) - + // DEBUG: Log extracted records count - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount) - + glog.Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount) + if len(records) > 0 { // DEBUG: Log first record details (especially for Noop with null value) if len(records[0].Value) > 0 { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value)) } else { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key)) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key)) // Log the key bytes in hex for identification - glog.V(4).Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key) } } - + if len(records) == 0 { glog.Warningf("[NOOP-DEBUG] Partition %d: No records extracted despite recordCount=%d", partitionID, recordCount) errorCode = 42 // INVALID_RECORD } else { var firstOffsetSet bool for idx, kv := range records { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) - + if prodErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) // Check if this is a schema validation error and add delay to prevent overloading @@ -793,10 +775,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, errorCode = 1 // UNKNOWN_SERVER_ERROR break } - + // DEBUG: Log offset received from broker - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) - + glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) + if idx == 0 { baseOffset = offsetProduced firstOffsetSet = true @@ -810,18 +792,18 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, glog.Warningf("[NOOP-DEBUG] CRITICAL Partition %d: recordCount=0, but we should still try to extract records! recordSetDataLen=%d", partitionID, len(recordSetData)) // Try to extract anyway - this might be a Noop record records := h.extractAllRecords(recordSetData) - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records)) + glog.Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records)) if len(records) > 0 { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records)) + glog.Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records)) for idx, kv := range records { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) errorCode = 1 // UNKNOWN_SERVER_ERROR break } - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) + glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) if idx == 0 { baseOffset = offsetProduced } @@ -831,7 +813,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } // DEBUG: Log response that will be sent - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode) + glog.Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode) // Build correct Produce v2+ response for this partition // Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5] @@ -1628,13 +1610,13 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[avroSchema]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema)) + glog.Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema)) return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema)) + glog.Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema)) decoder, err := schema.NewAvroDecoder(avroSchema) if err != nil { return nil, fmt.Errorf("failed to create Avro decoder: %w", err) @@ -1649,7 +1631,7 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R h.inferredRecordTypesMu.Lock() h.inferredRecordTypes[avroSchema] = recordType h.inferredRecordTypesMu.Unlock() - glog.V(4).Infof("Cached inferred RecordType for Avro schema") + glog.Infof("Cached inferred RecordType for Avro schema") return recordType, nil } @@ -1662,13 +1644,13 @@ func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*sch h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for Protobuf schema") + glog.Infof("RecordType cache HIT for Protobuf schema") return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for Protobuf schema, creating decoder") + glog.Infof("RecordType cache MISS for Protobuf schema, creating decoder") decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema)) if err != nil { return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err) @@ -1695,13 +1677,13 @@ func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.R h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for JSON schema") + glog.Infof("RecordType cache HIT for JSON schema") return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for JSON schema, creating decoder") + glog.Infof("RecordType cache MISS for JSON schema, creating decoder") decoder, err := schema.NewJSONSchemaDecoder(jsonSchema) if err != nil { return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)