Browse Source

feat: Add critical broker data retrieval bug detection logging

Phase 4.5: Root Cause Identified - Broker-Side Bug

Added detailed logging to detect when broker returns 0 messages despite HWM indicating data exists:
  - CRITICAL BUG log when broker returns empty but HWM > requestedOffset
  - Logs broker metadata (logStart, nextOffset, endOfPartition)
  - Per-message logging for debugging

Changes:
  - broker_client_fetch.go: Added CRITICAL BUG detection and logging

Test Results:
  - 87.9% delivery (2067/2350) - consistent with previous
  - Confirmed broker bug: Returns 0 messages for offset 1424 when HWM=1428

Root Cause Discovered:
   Gateway fetch logic is CORRECT
   HWM calculation is CORRECT
   Broker's ReadMessagesAtOffset or disk read function FAILING SILENTLY

Evidence:
  Multiple CRITICAL BUG logs show broker can't retrieve data that exists:
    - topic-3[0] offset 1424 (HWM=1428)
    - topic-2[0] offset 968 (HWM=969)

Answer to 'Why does stream stop?':
  1. Broker can't retrieve data from storage for certain offsets
  2. Gateway gets empty responses repeatedly
  3. Sarama gives up thinking no more data
  4. Channel closes cleanly (not a crash)

Next: Investigate broker's ReadMessagesAtOffset and disk read path
pull/7329/head
chrislu 3 days ago
parent
commit
be247ae497
  1. 18
      weed/mq/kafka/integration/broker_client_fetch.go
  2. 16
      weed/mq/kafka/protocol/fetch_partition_reader.go

18
weed/mq/kafka/integration/broker_client_fetch.go

@ -80,8 +80,18 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string
}
}
glog.V(3).Infof("[FETCH-STATELESS-CLIENT] Received %d messages from broker, nextOffset=%d, hwm=%d",
len(resp.Messages), resp.NextOffset, resp.HighWaterMark)
// CRITICAL DEBUGGING: Log what broker returned
glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v",
topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition)
// CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong
if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset {
glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)",
topic, partition, startOffset, resp.HighWaterMark, resp.HighWaterMark-startOffset)
glog.Errorf("[FETCH-STATELESS-CLIENT] This suggests broker's FetchMessage RPC is not returning data that exists!")
glog.Errorf("[FETCH-STATELESS-CLIENT] Broker metadata: logStart=%d, nextOffset=%d, endOfPartition=%v",
resp.LogStartOffset, resp.NextOffset, resp.EndOfPartition)
}
// Convert protobuf messages to SeaweedRecord
records := make([]*SeaweedRecord, 0, len(resp.Messages))
@ -93,6 +103,10 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string
Offset: startOffset + int64(i), // Sequential offset assignment
}
records = append(records, record)
// Log each message for debugging
glog.V(4).Infof("[FETCH-STATELESS-CLIENT] Message %d: offset=%d, keyLen=%d, valueLen=%d",
i, record.Offset, len(msg.Key), len(msg.Value))
}
if len(records) > 0 {

16
weed/mq/kafka/protocol/fetch_partition_reader.go

@ -119,14 +119,14 @@ func (pr *partitionReader) handleRequests(ctx context.Context) {
func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) {
startTime := time.Now()
result := &partitionFetchResult{}
// Log request START with full details
glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID)
defer func() {
result.fetchDuration = time.Since(startTime)
// Log request END with results
resultStatus := "EMPTY"
if len(result.recordBatch) > 0 {
@ -134,7 +134,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
}
glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000)
// Send result back to client
select {
case req.resultChan <- result:
@ -182,7 +182,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm)
// Log what we got back - DETAILED for diagnostics
if len(recordBatch) == 0 {
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)",
@ -202,7 +202,7 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
// readRecords reads records forward using the multi-batch fetcher
func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) {
fetchStartTime := time.Now()
// Create context with timeout based on Kafka fetch request's MaxWaitTime
// This ensures we wait exactly as long as the client requested
fetchCtx := ctx
@ -254,12 +254,12 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma
fallbackStartTime := time.Now()
smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10)
fallbackDuration := time.Since(fallbackStartTime)
if fallbackDuration > 2*time.Second {
glog.Warningf("[%s] SLOW FALLBACK for %s[%d]: offset=%d took %.2fs",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fallbackDuration.Seconds())
}
if err != nil {
glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err)

Loading…
Cancel
Save