|
|
@ -94,28 +94,6 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p |
|
|
|
glog.Errorf("[FETCH] ReadRecords failed: %v", err) |
|
|
|
return nil, fmt.Errorf("failed to read records: %v", err) |
|
|
|
} |
|
|
|
// CRITICAL FIX: If ReadRecords returns 0 but HWM indicates data exists on disk, force a disk read
|
|
|
|
// This handles the case where subscriber advanced past data that was already on disk
|
|
|
|
// Only do this ONCE per fetch request to avoid subscriber churn
|
|
|
|
if len(seaweedRecords) == 0 { |
|
|
|
hwm, hwmErr := brokerClient.GetHighWaterMark(topic, partition) |
|
|
|
if hwmErr == nil && fromOffset < hwm { |
|
|
|
// Restart the existing subscriber at the requested offset for disk read
|
|
|
|
// This is more efficient than closing and recreating
|
|
|
|
consumerGroup := "kafka-gateway" |
|
|
|
consumerID := fmt.Sprintf("kafka-gateway-%s-%d", topic, partition) |
|
|
|
|
|
|
|
if err := brokerClient.RestartSubscriber(brokerSubscriber, fromOffset, consumerGroup, consumerID); err != nil { |
|
|
|
return nil, fmt.Errorf("failed to restart subscriber: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Try reading again from restarted subscriber (will do disk read)
|
|
|
|
seaweedRecords, err = brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to read after restart: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(2).Infof("[FETCH] ReadRecords returned %d records", len(seaweedRecords)) |
|
|
|
//
|
|
|
|