From b96563946b2b969c9ff827a67dc2a003cf0d08e1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 22:51:42 -0700 Subject: [PATCH] remove _schemas debug --- .../integration/broker_client_publish.go | 5 ----- weed/mq/kafka/protocol/handler.go | 4 +--- weed/util/log_buffer/log_buffer.go | 20 ------------------- 3 files changed, 1 insertion(+), 28 deletions(-) diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go index 33ce6329c..388e22d9f 100644 --- a/weed/mq/kafka/integration/broker_client_publish.go +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -61,11 +61,6 @@ func (bc *BrokerClient) PublishRecord(ctx context.Context, topic string, partiti return 0, fmt.Errorf("failed to receive ack: %v", err) } - if topic == "_schemas" { - glog.V(3).Infof("[GATEWAY RECV] topic=%s partition=%d resp.AssignedOffset=%d resp.AckTsNs=%d", - topic, partition, resp.AssignedOffset, resp.AckTsNs) - } - // Handle structured broker errors if kafkaErrorCode, errorMsg, handleErr := HandleBrokerResponse(resp); handleErr != nil { return 0, handleErr diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 0af158dd8..c224d6473 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1964,9 +1964,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req responseOffset = earliestOffset } responseTimestamp = 0 // No specific timestamp for earliest - if strings.HasPrefix(string(topicName), "_schemas") { - glog.Infof("SCHEMA REGISTRY LISTOFFSETS EARLIEST: topic=%s partition=%d returning offset=%d", string(topicName), partitionID, responseOffset) - } + case -1: // latest offset // Get the actual latest offset from SMQ if h.seaweedMQHandler == nil { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 89b0627a1..4795227fa 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -3,7 +3,6 @@ package log_buffer import ( "bytes" "math" - "strings" "sync" "sync/atomic" "time" @@ -591,12 +590,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if isOffsetBased { requestedOffset := lastReadPosition.Offset - // DEBUG: Log buffer state for _schemas topic - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load()) - } - // Check if the requested offset is in the current buffer range if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { // If current buffer is empty (pos=0), check if data is on disk or not yet written @@ -614,10 +607,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // Case 3: try disk read (historical data might exist) if requestedOffset < logBuffer.offset { // Data was in the buffer range but buffer is now empty = flushed to disk - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset) - } return nil, -2, ResumeFromDiskError } // requestedOffset == logBuffer.offset: Current position @@ -625,20 +614,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // (historical data might exist from previous runs) if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 { // Initial state: try disk read before waiting for new data - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0") - } return nil, -2, ResumeFromDiskError } // Otherwise, wait for new data to arrive - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset) - } return nil, logBuffer.offset, nil } - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos) - } return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil }