diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 5e37fff6f..07896518e 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -27,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" ) // GetAdvertisedAddress returns the host:port that should be advertised to clients @@ -597,8 +598,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { wg.Add(1) go func() { defer wg.Done() - glog.V(4).Infof("[%s] Response writer started", connectionID) - defer glog.V(4).Infof("[%s] Response writer exiting", connectionID) + glog.V(2).Infof("[%s] Response writer started", connectionID) + defer glog.V(2).Infof("[%s] Response writer exiting", connectionID) pendingResponses := make(map[uint32]*kafkaResponse) nextToSend := 0 // Index in correlationQueue @@ -609,7 +610,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // responseChan closed, exit return } - glog.V(4).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID) + // Only log at V(3) for debugging, not V(4) in hot path + glog.V(3).Infof("[%s] Response writer received correlation=%d", connectionID, resp.correlationID) correlationQueueMu.Lock() pendingResponses[resp.correlationID] = resp @@ -619,7 +621,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { readyResp, exists := pendingResponses[expectedID] if !exists { // Response not ready yet, stop sending - glog.V(3).Infof("[%s] Response writer: waiting for correlation=%d (nextToSend=%d, queueLen=%d)", connectionID, expectedID, nextToSend, len(correlationQueue)) break } @@ -627,14 +628,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { if readyResp.err != nil { glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) } else { - glog.V(4).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil { - glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) - glog.Errorf("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) + glog.Errorf("[%s] Response writer WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) correlationQueueMu.Unlock() return } - glog.V(4).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID) } // Remove from pending and advance @@ -644,7 +642,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { correlationQueueMu.Unlock() case <-ctx.Done(): // Context cancelled, exit immediately to prevent deadlock - glog.V(4).Infof("[%s] Response writer: context cancelled, exiting", connectionID) + glog.V(2).Infof("[%s] Response writer: context cancelled, exiting", connectionID) return } } @@ -661,10 +659,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - if req.apiKey == 2 { // ListOffsets - } - glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) - + // Removed V(4) logging from hot path - only log errors and important events + // Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer var response []byte @@ -676,14 +672,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { err = fmt.Errorf("internal server error: panic in request handler: %v", r) } }() - if req.apiKey == 2 { // ListOffsets - } response, err = h.processRequestSync(req) - if req.apiKey == 2 { // ListOffsets - } }() - glog.V(4).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) select { case responseChan <- &kafkaResponse{ correlationID: req.correlationID, @@ -692,15 +683,16 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(4).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID) + // Response sent successfully - no logging here case <-ctx.Done(): // Connection closed, stop processing return case <-time.After(5 * time.Second): + glog.Warningf("[%s] Control plane: timeout sending response correlation=%d", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting - glog.V(4).Infof("[%s] Control plane: context cancelled, draining remaining requests", connectionID) + glog.V(2).Infof("[%s] Control plane: context cancelled, draining remaining requests", connectionID) for { select { case req, ok := <-controlChan: @@ -744,7 +736,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - glog.V(4).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + // Removed V(4) logging from hot path - only log errors and important events // Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer @@ -760,7 +752,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response, err = h.processRequestSync(req) }() - glog.V(4).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) // Use select with context to avoid sending on closed channel select { case responseChan <- &kafkaResponse{ @@ -770,15 +761,16 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(4).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID) + // Response sent successfully - no logging here case <-ctx.Done(): // Connection closed, stop processing return case <-time.After(5 * time.Second): + glog.Warningf("[%s] Data plane: timeout sending response correlation=%d", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting - glog.V(4).Infof("[%s] Data plane: context cancelled, draining remaining requests", connectionID) + glog.V(2).Infof("[%s] Data plane: context cancelled, draining remaining requests", connectionID) for { select { case req, ok := <-dataChan: @@ -786,7 +778,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return } // Process remaining requests with a short timeout - glog.V(3).Infof("[%s] Data plane: processing drained request correlation=%d", connectionID, req.correlationID) response, err := h.processRequestSync(req) select { case responseChan <- &kafkaResponse{ @@ -796,14 +787,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(3).Infof("[%s] Data plane: sent drained response correlation=%d", connectionID, req.correlationID) + // Response sent - no logging case <-time.After(1 * time.Second): glog.Warningf("[%s] Data plane: timeout sending drained response correlation=%d, discarding", connectionID, req.correlationID) return } default: // Channel empty, safe to exit - glog.V(4).Infof("[%s] Data plane: drain complete, exiting", connectionID) + glog.V(2).Infof("[%s] Data plane: drain complete, exiting", connectionID) return } } @@ -823,48 +814,27 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { }() for { - // Check if context is cancelled + // OPTIMIZATION: Consolidated context/deadline check - avoid redundant select statements + // Check context once at the beginning of the loop select { case <-ctx.Done(): return ctx.Err() default: } - // Set a read deadline for the connection based on context or default timeout + // Set read deadline based on context or default timeout + // OPTIMIZATION: Calculate deadline once per iteration, not multiple times var readDeadline time.Time - var timeoutDuration time.Duration - if deadline, ok := ctx.Deadline(); ok { readDeadline = deadline - timeoutDuration = time.Until(deadline) } else { - // Use configurable read timeout instead of hardcoded 5 seconds - timeoutDuration = timeoutConfig.ReadTimeout - readDeadline = time.Now().Add(timeoutDuration) + readDeadline = time.Now().Add(timeoutConfig.ReadTimeout) } if err := conn.SetReadDeadline(readDeadline); err != nil { return fmt.Errorf("set read deadline: %w", err) } - // Check context before reading - select { - case <-ctx.Done(): - // Give a small delay to ensure proper cleanup - time.Sleep(100 * time.Millisecond) - return ctx.Err() - default: - // If context is close to being cancelled, set a very short timeout - if deadline, ok := ctx.Deadline(); ok { - timeUntilDeadline := time.Until(deadline) - if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 { - shortDeadline := time.Now().Add(500 * time.Millisecond) - if err := conn.SetReadDeadline(shortDeadline); err == nil { - } - } - } - } - // Read message size (4 bytes) var sizeBytes [4]byte if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { @@ -902,7 +872,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } // Read the message - messageBuf := make([]byte, size) + // OPTIMIZATION: Use buffer pool to reduce GC pressure (was 1MB/sec at 1000 req/s) + messageBuf := mem.Allocate(int(size)) + defer mem.Free(messageBuf) if _, err := io.ReadFull(r, messageBuf); err != nil { _ = HandleTimeoutError(err, "read") // errorCode return fmt.Errorf("read message: %w", err) @@ -918,12 +890,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - // LOG EVERY SINGLE REQUEST REGARDLESS OF apiKey - maxLen := len(messageBuf) - if maxLen > 16 { - maxLen = 16 - } - // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) @@ -948,21 +914,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } } - // CRITICAL: Log Fetch requests specifically - if apiKey == 1 { - } - - glog.V(4).Infof("API version validated: Key=%d (%s), Version=%d, Correlation=%d", - apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) - - // Log consumer group coordination requests - switch apiKey { - case 9: - case 11: - case 12: - case 14: - } - // Extract request body - special handling for ApiVersions requests var requestBody []byte if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 { @@ -1099,8 +1050,11 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { apiName := getAPIName(APIKey(req.apiKey)) - glog.V(4).Infof("[API] %s (key=%d, ver=%d, corr=%d)", - apiName, req.apiKey, req.apiVersion, req.correlationID) + // Only log high-volume requests at V(2), not V(4) + if glog.V(2) { + glog.V(2).Infof("[API] %s (key=%d, ver=%d, corr=%d)", + apiName, req.apiKey, req.apiVersion, req.correlationID) + } var response []byte var err error