|
|
@ -8,6 +8,8 @@ import ( |
|
|
|
"io" |
|
|
|
"strconv" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/parquet-go/parquet-go" |
|
|
@ -1365,6 +1367,8 @@ type StreamingFlushedDataSource struct { |
|
|
|
doneChan chan struct{} |
|
|
|
started bool |
|
|
|
finished bool |
|
|
|
closed int32 // atomic flag to prevent double close
|
|
|
|
mu sync.RWMutex |
|
|
|
} |
|
|
|
|
|
|
|
func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource { |
|
|
@ -1390,9 +1394,14 @@ func (s *StreamingFlushedDataSource) startStreaming() { |
|
|
|
s.started = true |
|
|
|
|
|
|
|
go func() { |
|
|
|
defer close(s.resultChan) |
|
|
|
defer close(s.errorChan) |
|
|
|
defer close(s.doneChan) |
|
|
|
defer func() { |
|
|
|
// Use atomic flag to ensure channels are only closed once
|
|
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { |
|
|
|
close(s.resultChan) |
|
|
|
close(s.errorChan) |
|
|
|
close(s.doneChan) |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
// Set up time range for scanning
|
|
|
|
startTime := time.Unix(0, s.options.StartTimeNs) |
|
|
@ -1497,8 +1506,11 @@ func (s *StreamingFlushedDataSource) HasMore() bool { |
|
|
|
} |
|
|
|
|
|
|
|
func (s *StreamingFlushedDataSource) Close() error { |
|
|
|
if !s.finished { |
|
|
|
// Use atomic flag to ensure channels are only closed once
|
|
|
|
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { |
|
|
|
close(s.doneChan) |
|
|
|
close(s.resultChan) |
|
|
|
close(s.errorChan) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|