|
|
@ -2,6 +2,7 @@ package protocol |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
|
|
|
"fmt" |
|
|
"sync" |
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
@ -42,6 +43,7 @@ type partitionFetchRequest struct { |
|
|
resultChan chan *partitionFetchResult |
|
|
resultChan chan *partitionFetchResult |
|
|
isSchematized bool |
|
|
isSchematized bool |
|
|
apiVersion uint16 |
|
|
apiVersion uint16 |
|
|
|
|
|
correlationID int32 // Added for correlation tracking
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// newPartitionReader creates and starts a new partition reader with pre-fetch buffering
|
|
|
// newPartitionReader creates and starts a new partition reader with pre-fetch buffering
|
|
|
@ -117,13 +119,31 @@ func (pr *partitionReader) handleRequests(ctx context.Context) { |
|
|
func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) { |
|
|
func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) { |
|
|
startTime := time.Now() |
|
|
startTime := time.Now() |
|
|
result := &partitionFetchResult{} |
|
|
result := &partitionFetchResult{} |
|
|
|
|
|
|
|
|
|
|
|
// Log request START with full details
|
|
|
|
|
|
glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) |
|
|
|
|
|
|
|
|
defer func() { |
|
|
defer func() { |
|
|
result.fetchDuration = time.Since(startTime) |
|
|
result.fetchDuration = time.Since(startTime) |
|
|
|
|
|
|
|
|
|
|
|
// Log request END with results
|
|
|
|
|
|
resultStatus := "EMPTY" |
|
|
|
|
|
if len(result.recordBatch) > 0 { |
|
|
|
|
|
resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) |
|
|
|
|
|
} |
|
|
|
|
|
glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) |
|
|
|
|
|
|
|
|
|
|
|
// Send result back to client
|
|
|
select { |
|
|
select { |
|
|
case req.resultChan <- result: |
|
|
case req.resultChan <- result: |
|
|
|
|
|
// Successfully sent
|
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|
|
|
|
glog.Warningf("[%s] Context cancelled while sending result for %s[%d]", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) |
|
|
case <-time.After(50 * time.Millisecond): |
|
|
case <-time.After(50 * time.Millisecond): |
|
|
glog.Warningf("[%s] Timeout sending result for %s[%d]", |
|
|
|
|
|
|
|
|
glog.Warningf("[%s] Timeout sending result for %s[%d] - CLIENT MAY HAVE DISCONNECTED", |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
@ -131,22 +151,23 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition |
|
|
// Get high water mark
|
|
|
// Get high water mark
|
|
|
hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) |
|
|
hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) |
|
|
if hwmErr != nil { |
|
|
if hwmErr != nil { |
|
|
glog.Warningf("[%s] Failed to get high water mark for %s[%d]: %v", |
|
|
|
|
|
|
|
|
glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v", |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) |
|
|
result.recordBatch = []byte{} |
|
|
result.recordBatch = []byte{} |
|
|
|
|
|
result.highWaterMark = 0 |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
result.highWaterMark = hwm |
|
|
result.highWaterMark = hwm |
|
|
|
|
|
|
|
|
glog.V(2).Infof("[%s] FETCH %s[%d]: requestedOffset=%d hwm=%d", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) |
|
|
|
|
|
|
|
|
glog.V(2).Infof("[%s] HWM for %s[%d]: %d (requested: %d)", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwm, req.requestedOffset) |
|
|
|
|
|
|
|
|
// If requested offset >= HWM, return immediately with empty result
|
|
|
// If requested offset >= HWM, return immediately with empty result
|
|
|
// This prevents overwhelming the broker with futile read attempts when no data is available
|
|
|
// This prevents overwhelming the broker with futile read attempts when no data is available
|
|
|
if req.requestedOffset >= hwm { |
|
|
if req.requestedOffset >= hwm { |
|
|
result.recordBatch = []byte{} |
|
|
result.recordBatch = []byte{} |
|
|
glog.V(2).Infof("[%s] FETCH %s[%d]: EMPTY (offset %d >= hwm %d)", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) |
|
|
|
|
|
|
|
|
glog.V(3).Infof("[%s] Requested offset %d >= HWM %d, returning empty", |
|
|
|
|
|
pr.connCtx.ConnectionID, req.requestedOffset, hwm) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -162,13 +183,14 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition |
|
|
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
|
|
|
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
|
|
|
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) |
|
|
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) |
|
|
|
|
|
|
|
|
// Log what we got back
|
|
|
|
|
|
|
|
|
// Log what we got back - DETAILED for diagnostics
|
|
|
if len(recordBatch) == 0 { |
|
|
if len(recordBatch) == 0 { |
|
|
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", |
|
|
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) |
|
|
result.recordBatch = []byte{} |
|
|
result.recordBatch = []byte{} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned data (offset=%d->%d, hwm=%d, bytes=%d)", |
|
|
|
|
|
|
|
|
// Log successful fetch with details
|
|
|
|
|
|
glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) |
|
|
result.recordBatch = recordBatch |
|
|
result.recordBatch = recordBatch |
|
|
pr.bufferMu.Lock() |
|
|
pr.bufferMu.Lock() |
|
|
@ -239,22 +261,21 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Warningf("[%s] Single-batch fetch also failed for %s[%d] offset=%d: %v (total duration: %v)", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(fetchStartTime)) |
|
|
|
|
|
|
|
|
glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err) |
|
|
return []byte{}, fromOffset |
|
|
return []byte{}, fromOffset |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if len(smqRecords) > 0 { |
|
|
if len(smqRecords) > 0 { |
|
|
recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) |
|
|
recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) |
|
|
nextOffset := fromOffset + int64(len(smqRecords)) |
|
|
nextOffset := fromOffset + int64(len(smqRecords)) |
|
|
glog.V(4).Infof("[%s] Single-batch fallback for %s[%d]: %d records, %d bytes, offset %d -> %d (total duration: %v)", |
|
|
|
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, |
|
|
|
|
|
len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(fetchStartTime)) |
|
|
|
|
|
|
|
|
glog.V(3).Infof("[%s] Fallback succeeded: got %d records for %s[%d] offset %d -> %d (total: %v)", |
|
|
|
|
|
pr.connCtx.ConnectionID, len(smqRecords), pr.topicName, pr.partitionID, fromOffset, nextOffset, time.Since(fetchStartTime)) |
|
|
return recordBatch, nextOffset |
|
|
return recordBatch, nextOffset |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// No records available
|
|
|
// No records available
|
|
|
glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d (total duration: %v)", |
|
|
|
|
|
|
|
|
glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d after multi-batch and fallback (total: %v)", |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime)) |
|
|
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime)) |
|
|
return []byte{}, fromOffset |
|
|
return []byte{}, fromOffset |
|
|
} |
|
|
} |
|
|
|