From 28c9516ecd2425ed19ffecf0406626ec9e2a0754 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 19:05:09 -0700 Subject: [PATCH] cleanup: Remove all emoji logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed all logging statements containing emoji characters: - 🔴 red circle (debug logs) - 🔥 fire (critical debug markers) - 🟢 green circle (info logs) - Other emoji symbols Also removed unused replicaID variable that was only used for debug logging. Code is now clean with production-quality logging. --- weed/mq/kafka/protocol/handler.go | 31 ------------------------------- weed/mq/kafka/protocol/produce.go | 3 --- 2 files changed, 34 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 7b5c8fb73..f4795ae14 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -869,7 +869,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read message size (4 bytes) var sizeBytes [4]byte - glog.Warningf("🔴 REQUEST LOOP: About to read message size from %s", connectionID) if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { if err == io.EOF { return nil @@ -891,7 +890,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Successfully read the message size size := binary.BigEndian.Uint32(sizeBytes[:]) - glog.Warningf("🔴 REQUEST LOOP: Parsed message size=%d from %s", size, connectionID) if size == 0 || size > 1024*1024 { // 1MB limit // Use standardized error for message size limit // Send error response for message too large @@ -907,13 +905,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Read the message messageBuf := make([]byte, size) - glog.Warningf("🔴 REQUEST LOOP: About to read %d-byte message body from %s", size, connectionID) if _, err := io.ReadFull(r, messageBuf); err != nil { _ = HandleTimeoutError(err, "read") // errorCode return fmt.Errorf("read message: %w", err) } - glog.Warningf("🔴 REQUEST LOOP: Successfully read %d-byte message from %s", size, connectionID) // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { @@ -929,7 +925,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { if maxLen > 16 { maxLen = 16 } - glog.Warningf("🔴 RAW REQUEST: correlationID=%d, firstTwoBytes=[%02x %02x] = apiKey=%d, first16bytes=%v", correlationID, messageBuf[0], messageBuf[1], apiKey, messageBuf[:maxLen]) // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { @@ -957,7 +952,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // CRITICAL: Log Fetch requests specifically if apiKey == 1 { - glog.Warningf("🔴🔴🔴 FETCH REQUEST RECEIVED: correlationID=%d, apiVersion=%d, from %s", correlationID, apiVersion, connectionID) } glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", @@ -1074,30 +1068,23 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Route to appropriate channel based on API key var targetChan chan *kafkaRequest if apiKey == 2 { // ListOffsets - glog.Warningf("🔴 BEFORE ROUTING: ListOffsets request ready to route - correlationID=%d, requestBodyLen=%d", correlationID, len(requestBody)) } if isDataPlaneAPI(apiKey) { targetChan = dataChan - glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to DATA plane", apiKey) } else { targetChan = controlChan - glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to CONTROL plane", apiKey) } // Only add to correlation queue AFTER successful channel send // If we add before and the channel blocks, the correlation ID is in the queue // but the request never gets processed, causing response writer deadlock - glog.Warningf("🔴 REQUEST QUEUE: About to queue correlationID=%d (apiKey=%d) to channel from %s", correlationID, apiKey, connectionID) select { case targetChan <- req: // Request queued successfully - NOW add to correlation tracking - glog.Warningf("🔴 REQUEST QUEUE: Successfully sent correlationID=%d to channel from %s", correlationID, connectionID) correlationQueueMu.Lock() correlationQueue = append(correlationQueue, correlationID) - glog.Warningf("🔴 REQUEST QUEUE: Added correlationID=%d to queue (queue length now %d) from %s", correlationID, len(correlationQueue), connectionID) correlationQueueMu.Unlock() case <-ctx.Done(): - glog.Warningf("🔴 REQUEST QUEUE: Context cancelled while queueing correlationID=%d from %s", correlationID, connectionID) return ctx.Err() case <-time.After(10 * time.Second): // Channel full for too long - this shouldn't happen with proper backpressure @@ -1122,15 +1109,12 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { switch APIKey(req.apiKey) { case APIKeyApiVersions: - glog.Warningf("🔥 SWITCH MATCHED: APIKeyApiVersions") response, err = h.handleApiVersions(req.correlationID, req.apiVersion) case APIKeyMetadata: - glog.Warningf("🔥 SWITCH MATCHED: APIKeyMetadata") response, err = h.handleMetadata(req.correlationID, req.apiVersion, req.requestBody) case APIKeyListOffsets: - glog.Warningf("🔥🔥🔥 SWITCH MATCHED: APIKeyListOffsets (apiKey=2)!!!") response, err = h.handleListOffsets(req.correlationID, req.apiVersion, req.requestBody) case APIKeyCreateTopics: @@ -1968,20 +1952,12 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 - glog.Warningf("🔥🔥🔥 ListOffsets HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) maxBytes := len(requestBody) if maxBytes > 64 { maxBytes = 64 } - glog.Warningf("🔥🔥🔥 ListOffsets first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) - // Log the specific replica ID and topics being requested - if len(requestBody) >= 4 { - replicaID := int32(binary.BigEndian.Uint32(requestBody[0:4])) - glog.Warningf("🔥🔥🔥 ListOffsets replica_id=%d", replicaID) - } - // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { @@ -2129,16 +2105,13 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req if actualTopicsCount != topicsCount { binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) } else { - glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } - glog.Warningf("🔥🔥🔥 ListOffsets HANDLER RETURNING: correlationID=%d, responseLen=%d", correlationID, len(response)) if len(response) > 0 { respPreview := len(response) if respPreview > 32 { respPreview = 32 } - glog.Warningf("🔥🔥🔥 ListOffsets response first %d bytes (hex): %x", respPreview, response[:respPreview]) } return response, nil @@ -3915,13 +3888,11 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) // v4+: Uses flexible format with tagged fields - glog.Warningf("🔴🔴🔴 InitProducerId HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) maxBytes := len(requestBody) if maxBytes > 64 { maxBytes = 64 } - glog.Warningf("🔴🔴🔴 InitProducerId request first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) offset := 0 @@ -4028,12 +3999,10 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, response = append(response, 0x00) // Empty response body tagged fields } - glog.Warningf("🔴🔴🔴 InitProducerId HANDLER RETURNING: correlationID=%d, responseLen=%d bytes", correlationID, len(response)) respPreview := len(response) if respPreview > 32 { respPreview = 32 } - glog.Warningf("🔴🔴🔴 InitProducerId response (hex): %x", response[:respPreview]) return response, nil } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 8da355d36..a51bb9ef2 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -17,13 +17,10 @@ import ( func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // Version-specific handling - glog.Warningf("🔴 CRITICAL DEBUG handleProduce: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) switch apiVersion { case 0, 1: - glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV0V1") return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody) case 2, 3, 4, 5, 6, 7: - glog.Warningf("🔴 CRITICAL DEBUG handleProduce: Using handleProduceV2Plus") return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)