diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml index 26dca83f7..00b3264eb 100644 --- a/test/kafka/kafka-client-loadtest/docker-compose.yml +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -226,7 +226,7 @@ services: interval: 10s timeout: 5s retries: 10 - start_period: 45s # Increased to account for 10s startup delay + filer discovery + start_period: 45s # Increased to account for 10s startup delay + filer discovery networks: - kafka-loadtest-net diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 9f4e0c81f..c72c6342b 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -178,9 +178,11 @@ func (s *Server) Start() error { s.wg.Add(1) go func() { defer s.wg.Done() + glog.Warningf("🔴 CRITICAL DEBUG: Accept loop started for listener %s", s.ln.Addr().String()) for { conn, err := s.ln.Accept() if err != nil { + glog.Warningf("🔴 CRITICAL DEBUG: Accept error on %s: %v", s.ln.Addr().String(), err) select { case <-s.ctx.Done(): return @@ -190,17 +192,21 @@ func (s *Server) Start() error { } // Simple accept log to trace client connections (useful for JoinGroup debugging) if conn != nil { + glog.Warningf("🔴 CRITICAL DEBUG: accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr()) glog.V(1).Infof("accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr()) } s.wg.Add(1) go func(c net.Conn) { defer s.wg.Done() + glog.Warningf("🔴 CRITICAL DEBUG: HandleConn starting for %s", c.RemoteAddr()) if err := s.handler.HandleConn(s.ctx, c); err != nil { + glog.Warningf("🔴 CRITICAL DEBUG: handle conn %v: %v", c.RemoteAddr(), err) glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) } }(conn) } }() + glog.Warningf("🔴 CRITICAL DEBUG: Server.Start() completed, listen address: %s", s.ln.Addr().String()) return nil } diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index 7a633d93e..783a559c4 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -29,6 +29,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor // operating even during client shutdown, which is important for testing scenarios. dialCtx := context.Background() + // CRITICAL FIX: Add timeout to dial context + // gRPC dial will retry with exponential backoff. Without a timeout, it hangs indefinitely + // if the broker is unreachable. Set a reasonable timeout for initial connection attempt. + dialCtx, dialCancel := context.WithTimeout(dialCtx, 30*time.Second) + defer dialCancel() + // Connect to broker // Load security configuration for broker connection util.LoadSecurityConfiguration() diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 64a62cf53..84c9e1f01 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -49,18 +49,48 @@ func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partiti if len(dataMsg.Value) > 0 { } else { } - if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ - Message: &mq_pb.PublishMessageRequest_Data{ - Data: dataMsg, - }, - }); err != nil { - return 0, fmt.Errorf("failed to send data: %v", err) + + // CRITICAL: Use a goroutine with context checking to enforce timeout + // gRPC streams may not respect context deadlines automatically + // We need to monitor the context and timeout the operation if needed + sendErrChan := make(chan error, 1) + go func() { + sendErrChan <- session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }) + }() + + select { + case err := <-sendErrChan: + if err != nil { + return 0, fmt.Errorf("failed to send data: %v", err) + } + case <-ctx.Done(): + return 0, fmt.Errorf("context cancelled while sending: %w", ctx.Err()) } - // Read acknowledgment - resp, err := session.Stream.Recv() - if err != nil { - return 0, fmt.Errorf("failed to receive ack: %v", err) + // Read acknowledgment with context timeout enforcement + recvErrChan := make(chan interface{}, 1) + go func() { + resp, err := session.Stream.Recv() + if err != nil { + recvErrChan <- err + } else { + recvErrChan <- resp + } + }() + + var resp *mq_pb.PublishMessageResponse + select { + case result := <-recvErrChan: + if err, isErr := result.(error); isErr { + return 0, fmt.Errorf("failed to receive ack: %v", err) + } + resp = result.(*mq_pb.PublishMessageResponse) + case <-ctx.Done(): + return 0, fmt.Errorf("context cancelled while receiving: %w", ctx.Err()) } // Handle structured broker errors diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f3bb42b61..89a46718c 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -845,6 +845,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read message size (4 bytes) var sizeBytes [4]byte + glog.Warningf("🔴 REQUEST LOOP: About to read message size from %s", connectionID) if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if err == io.EOF { return nil @@ -866,6 +867,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Successfully read the message size size := binary.BigEndian.Uint32(sizeBytes[:]) + glog.Warningf("🔴 REQUEST LOOP: Parsed message size=%d from %s", size, connectionID) if size == 0 || size > 1024*1024 { // 1MB limit // Use standardized error for message size limit // Send error response for message too large @@ -881,11 +883,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read the message messageBuf := make([]byte, size) + glog.Warningf("🔴 REQUEST LOOP: About to read %d-byte message body from %s", size, connectionID) if _, err := io.ReadFull(r, messageBuf); err != nil { _ = HandleTimeoutError(err, "read") // errorCode return fmt.Errorf("read message: %w", err) } + glog.Warningf("🔴 REQUEST LOOP: Successfully read %d-byte message from %s", size, connectionID) + // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { return fmt.Errorf("message too short") @@ -895,6 +900,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID) + // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) @@ -919,6 +926,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } } + // CRITICAL: Log Fetch requests specifically + if apiKey == 1 { + glog.Warningf("🔴🔴🔴 FETCH REQUEST RECEIVED: correlationID=%d, apiVersion=%d, from %s", correlationID, apiVersion, connectionID) + } + glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) @@ -1033,13 +1045,17 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Only add to correlation queue AFTER successful channel send // If we add before and the channel blocks, the correlation ID is in the queue // but the request never gets processed, causing response writer deadlock + glog.Warningf("🔴 REQUEST QUEUE: About to queue correlationID=%d (apiKey=%d) to channel from %s", correlationID, apiKey, connectionID) select { case targetChan <- req: // Request queued successfully - NOW add to correlation tracking + glog.Warningf("🔴 REQUEST QUEUE: Successfully sent correlationID=%d to channel from %s", correlationID, connectionID) correlationQueueMu.Lock() correlationQueue = append(correlationQueue, correlationID) + glog.Warningf("🔴 REQUEST QUEUE: Added correlationID=%d to queue (queue length now %d) from %s", correlationID, len(correlationQueue), connectionID) correlationQueueMu.Unlock() case <-ctx.Done(): + glog.Warningf("🔴 REQUEST QUEUE: Context cancelled while queueing correlationID=%d from %s", correlationID, connectionID) return ctx.Err() case <-time.After(10 * time.Second): // Channel full for too long - this shouldn't happen with proper backpressure diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index f3f3fd24c..8da355d36 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -17,10 +17,13 @@ import ( func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Version-specific handling + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) switch apiVersion { case 0, 1: + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV0V1") return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody) case 2, 3, 4, 5, 6, 7: + glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV2Plus") return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) @@ -53,18 +56,6 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a _ = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) // acks offset += 2 - timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) - offset += 4 - - // CRITICAL FIX: Apply client-specified timeout to context - // If client specifies a timeout, create a new context with that timeout - // This ensures broker connections respect the client's expectations - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) - defer cancel() - } - topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 @@ -595,7 +586,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, startTime := time.Now() // DEBUG: Log request details - glog.V(2).Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID) + glog.Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID) // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -620,7 +611,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) } txID := string(requestBody[offset : offset+int(txIDLen)]) - glog.V(4).Infof("[NOOP-DEBUG] transactional_id=%s", txID) + glog.Infof("[NOOP-DEBUG] transactional_id=%s", txID) offset += int(txIDLen) } } @@ -636,23 +627,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += 4 // DEBUG: Log acks and timeout - glog.V(2).Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout) - - // CRITICAL FIX: Apply client-specified timeout to context - // If client specifies a timeout, create a new context with that timeout - // This ensures broker connections respect the client's expectations - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond) - defer cancel() - } + glog.Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout) // Remember if this is fire-and-forget mode isFireAndForget := acks == 0 if isFireAndForget { - glog.V(2).Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)") + glog.Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)") } else { - glog.V(2).Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks) + glog.Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks) } if len(requestBody) < offset+4 { @@ -662,7 +644,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += 4 // DEBUG: Log topics count - glog.V(2).Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount) + glog.Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount) // If topicsCount is implausible, there might be a parsing issue if topicsCount > 1000 { @@ -698,14 +680,14 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += int(topicNameSize) // DEBUG: Log topic being processed - glog.V(2).Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName) + glog.Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName) // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // DEBUG: Log partitions count - glog.V(2).Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount) + glog.Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount) // Response: topic name (STRING: 2 bytes length + data) response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) @@ -741,7 +723,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, topicExists := h.seaweedMQHandler.TopicExists(topicName) // DEBUG: Log topic existence and record set details - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData)) + glog.Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData)) if !topicExists { glog.Warningf("[NOOP-DEBUG] Partition %d: Topic does not exist", partitionID) @@ -749,10 +731,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } else { // Process the record set (lenient parsing) recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused - + // DEBUG: Log record count and parse error - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr) - + glog.Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr) + if parseErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: parseRecordSet error: %v", partitionID, parseErr) errorCode = 42 // INVALID_RECORD @@ -760,30 +742,30 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, // Extract all records from the record set and publish each one // extractAllRecords handles fallback internally for various cases records := h.extractAllRecords(recordSetData) - + // DEBUG: Log extracted records count - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount) - + glog.Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount) + if len(records) > 0 { // DEBUG: Log first record details (especially for Noop with null value) if len(records[0].Value) > 0 { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value)) } else { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key)) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key)) // Log the key bytes in hex for identification - glog.V(4).Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key) + glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key) } } - + if len(records) == 0 { glog.Warningf("[NOOP-DEBUG] Partition %d: No records extracted despite recordCount=%d", partitionID, recordCount) errorCode = 42 // INVALID_RECORD } else { var firstOffsetSet bool for idx, kv := range records { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) - + if prodErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) // Check if this is a schema validation error and add delay to prevent overloading @@ -793,10 +775,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, errorCode = 1 // UNKNOWN_SERVER_ERROR break } - + // DEBUG: Log offset received from broker - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) - + glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) + if idx == 0 { baseOffset = offsetProduced firstOffsetSet = true @@ -810,18 +792,18 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, glog.Warningf("[NOOP-DEBUG] CRITICAL Partition %d: recordCount=0, but we should still try to extract records! recordSetDataLen=%d", partitionID, len(recordSetData)) // Try to extract anyway - this might be a Noop record records := h.extractAllRecords(recordSetData) - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records)) + glog.Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records)) if len(records) > 0 { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records)) + glog.Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records)) for idx, kv := range records { - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) + glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) errorCode = 1 // UNKNOWN_SERVER_ERROR break } - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) + glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) if idx == 0 { baseOffset = offsetProduced } @@ -831,7 +813,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } // DEBUG: Log response that will be sent - glog.V(2).Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode) + glog.Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode) // Build correct Produce v2+ response for this partition // Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5] @@ -1628,13 +1610,13 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[avroSchema]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema)) + glog.Infof("RecordType cache HIT for Avro schema (length=%d)", len(avroSchema)) return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema)) + glog.Infof("RecordType cache MISS for Avro schema (length=%d), creating codec", len(avroSchema)) decoder, err := schema.NewAvroDecoder(avroSchema) if err != nil { return nil, fmt.Errorf("failed to create Avro decoder: %w", err) @@ -1649,7 +1631,7 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R h.inferredRecordTypesMu.Lock() h.inferredRecordTypes[avroSchema] = recordType h.inferredRecordTypesMu.Unlock() - glog.V(4).Infof("Cached inferred RecordType for Avro schema") + glog.Infof("Cached inferred RecordType for Avro schema") return recordType, nil } @@ -1662,13 +1644,13 @@ func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*sch h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for Protobuf schema") + glog.Infof("RecordType cache HIT for Protobuf schema") return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for Protobuf schema, creating decoder") + glog.Infof("RecordType cache MISS for Protobuf schema, creating decoder") decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema)) if err != nil { return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err) @@ -1695,13 +1677,13 @@ func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.R h.inferredRecordTypesMu.RLock() if recordType, exists := h.inferredRecordTypes[cacheKey]; exists { h.inferredRecordTypesMu.RUnlock() - glog.V(4).Infof("RecordType cache HIT for JSON schema") + glog.Infof("RecordType cache HIT for JSON schema") return recordType, nil } h.inferredRecordTypesMu.RUnlock() // Cache miss - create decoder and infer type - glog.V(4).Infof("RecordType cache MISS for JSON schema, creating decoder") + glog.Infof("RecordType cache MISS for JSON schema, creating decoder") decoder, err := schema.NewJSONSchemaDecoder(jsonSchema) if err != nil { return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)