diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index de0ff15f9..b73d05c64 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "fmt" "sync" "time" @@ -42,6 +43,7 @@ type partitionFetchRequest struct { resultChan chan *partitionFetchResult isSchematized bool apiVersion uint16 + correlationID int32 // Added for correlation tracking } // newPartitionReader creates and starts a new partition reader with pre-fetch buffering @@ -117,13 +119,31 @@ func (pr *partitionReader) handleRequests(ctx context.Context) { func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) { startTime := time.Now() result := &partitionFetchResult{} + + // Log request START with full details + glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) + defer func() { result.fetchDuration = time.Since(startTime) + + // Log request END with results + resultStatus := "EMPTY" + if len(result.recordBatch) > 0 { + resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) + } + glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) + + // Send result back to client select { case req.resultChan <- result: + // Successfully sent case <-ctx.Done(): + glog.Warningf("[%s] Context cancelled while sending result for %s[%d]", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) case <-time.After(50 * time.Millisecond): - glog.Warningf("[%s] Timeout sending result for %s[%d]", + glog.Warningf("[%s] Timeout sending result for %s[%d] - CLIENT MAY HAVE DISCONNECTED", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) } }() @@ -131,22 +151,23 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Get high water mark hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) if hwmErr != nil { - glog.Warningf("[%s] Failed to get high water mark for %s[%d]: %v", + glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) result.recordBatch = []byte{} + result.highWaterMark = 0 return } result.highWaterMark = hwm - glog.V(2).Infof("[%s] FETCH %s[%d]: requestedOffset=%d hwm=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + glog.V(2).Infof("[%s] HWM for %s[%d]: %d (requested: %d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwm, req.requestedOffset) // If requested offset >= HWM, return immediately with empty result // This prevents overwhelming the broker with futile read attempts when no data is available if req.requestedOffset >= hwm { result.recordBatch = []byte{} - glog.V(2).Infof("[%s] FETCH %s[%d]: EMPTY (offset %d >= hwm %d)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + glog.V(3).Infof("[%s] Requested offset %d >= HWM %d, returning empty", + pr.connCtx.ConnectionID, req.requestedOffset, hwm) return } @@ -162,13 +183,14 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Fetch on-demand - no pre-fetching to avoid overwhelming the broker recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) - // Log what we got back + // Log what we got back - DETAILED for diagnostics if len(recordBatch) == 0 { glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) result.recordBatch = []byte{} } else { - glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned data (offset=%d->%d, hwm=%d, bytes=%d)", + // Log successful fetch with details + glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() @@ -239,22 +261,21 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma } if err != nil { - glog.Warningf("[%s] Single-batch fetch also failed for %s[%d] offset=%d: %v (total duration: %v)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(fetchStartTime)) + glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err) return []byte{}, fromOffset } if len(smqRecords) > 0 { recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) nextOffset := fromOffset + int64(len(smqRecords)) - glog.V(4).Infof("[%s] Single-batch fallback for %s[%d]: %d records, %d bytes, offset %d -> %d (total duration: %v)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(fetchStartTime)) + glog.V(3).Infof("[%s] Fallback succeeded: got %d records for %s[%d] offset %d -> %d (total: %v)", + pr.connCtx.ConnectionID, len(smqRecords), pr.topicName, pr.partitionID, fromOffset, nextOffset, time.Since(fetchStartTime)) return recordBatch, nextOffset } // No records available - glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d (total duration: %v)", + glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d after multi-batch and fallback (total: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime)) return []byte{}, fromOffset }