From 3b75e50b04a4efe5f8d91e9efd07dc3a29e227c0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 10:50:54 -0700 Subject: [PATCH] removing the unnecessary restart logic and relying on the seek mechanism we already implemented --- .../mq/kafka/integration/seaweedmq_handler.go | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 7689d0612..554acb9f3 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -94,28 +94,6 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p glog.Errorf("[FETCH] ReadRecords failed: %v", err) return nil, fmt.Errorf("failed to read records: %v", err) } - // CRITICAL FIX: If ReadRecords returns 0 but HWM indicates data exists on disk, force a disk read - // This handles the case where subscriber advanced past data that was already on disk - // Only do this ONCE per fetch request to avoid subscriber churn - if len(seaweedRecords) == 0 { - hwm, hwmErr := brokerClient.GetHighWaterMark(topic, partition) - if hwmErr == nil && fromOffset < hwm { - // Restart the existing subscriber at the requested offset for disk read - // This is more efficient than closing and recreating - consumerGroup := "kafka-gateway" - consumerID := fmt.Sprintf("kafka-gateway-%s-%d", topic, partition) - - if err := brokerClient.RestartSubscriber(brokerSubscriber, fromOffset, consumerGroup, consumerID); err != nil { - return nil, fmt.Errorf("failed to restart subscriber: %v", err) - } - - // Try reading again from restarted subscriber (will do disk read) - seaweedRecords, err = brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords) - if err != nil { - return nil, fmt.Errorf("failed to read after restart: %v", err) - } - } - } glog.V(2).Infof("[FETCH] ReadRecords returned %d records", len(seaweedRecords)) //