diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2b820261c..a0c46cf3b 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -8,6 +8,8 @@ import ( "io" "strconv" "strings" + "sync" + "sync/atomic" "time" "github.com/parquet-go/parquet-go" @@ -1365,6 +1367,8 @@ type StreamingFlushedDataSource struct { doneChan chan struct{} started bool finished bool + closed int32 // atomic flag to prevent double close + mu sync.RWMutex } func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource { @@ -1390,9 +1394,14 @@ func (s *StreamingFlushedDataSource) startStreaming() { s.started = true go func() { - defer close(s.resultChan) - defer close(s.errorChan) - defer close(s.doneChan) + defer func() { + // Use atomic flag to ensure channels are only closed once + if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { + close(s.resultChan) + close(s.errorChan) + close(s.doneChan) + } + }() // Set up time range for scanning startTime := time.Unix(0, s.options.StartTimeNs) @@ -1497,8 +1506,11 @@ func (s *StreamingFlushedDataSource) HasMore() bool { } func (s *StreamingFlushedDataSource) Close() error { - if !s.finished { + // Use atomic flag to ensure channels are only closed once + if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { close(s.doneChan) + close(s.resultChan) + close(s.errorChan) } return nil }