diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index f2e3d1a23..345863c53 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -1,6 +1,7 @@ package engine import ( + "container/heap" "context" "encoding/json" "fmt" @@ -139,6 +140,44 @@ type ParquetFileStats struct { ColumnStats map[string]*ParquetColumnStats } +// StreamingDataSource provides a streaming interface for reading scan results +type StreamingDataSource interface { + Next() (*HybridScanResult, error) // Returns next result or nil when done + HasMore() bool // Returns true if more data available + Close() error // Clean up resources +} + +// StreamingMergeItem represents an item in the priority queue for streaming merge +type StreamingMergeItem struct { + Result *HybridScanResult + SourceID int + DataSource StreamingDataSource +} + +// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp +type StreamingMergeHeap []*StreamingMergeItem + +func (h StreamingMergeHeap) Len() int { return len(h) } + +func (h StreamingMergeHeap) Less(i, j int) bool { + // Sort by timestamp (ascending order) + return h[i].Result.Timestamp < h[j].Result.Timestamp +} + +func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *StreamingMergeHeap) Push(x interface{}) { + *h = append(*h, x.(*StreamingMergeItem)) +} + +func (h *StreamingMergeHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + return item +} + // Scan reads messages from both live logs and archived Parquet files // Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration // Assumptions: @@ -398,9 +437,12 @@ func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partit return results, err } -// scanPartitionHybridWithStats scans a specific partition and returns statistics +// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency +// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting +// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records +// - Scalable: Can handle large topics without LIMIT clauses efficiently +// - Streaming: Processes data as it arrives rather than buffering everything func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { - var results []HybridScanResult stats := &HybridScanStats{} // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME) @@ -410,20 +452,12 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex if !isDebugMode(ctx) { fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err) } - } else { - results = append(results, unflushedResults...) - if unflushedStats != nil { - stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried - stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages - stats.BufferStartIndex = unflushedStats.BufferStartIndex - } + } else if unflushedStats != nil { + stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried + stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages + stats.BufferStartIndex = unflushedStats.BufferStartIndex } - // STEP 2: Scan flushed data from disk (live logs + Parquet files) - // Create the hybrid read function that combines live logs + Parquet files - // This uses SeaweedFS MQ's own merged reading logic - mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition) - // Count live log files for statistics liveLogCount, err := hms.countLiveLogFiles(partition) if err != nil { @@ -433,99 +467,33 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex } stats.LiveLogFilesScanned = liveLogCount - // Set up time range for scanning - startTime := time.Unix(0, options.StartTimeNs) - if options.StartTimeNs == 0 { - startTime = time.Unix(0, 0) // Start from beginning if not specified - } - - stopTsNs := options.StopTimeNs - if stopTsNs == 0 { - stopTsNs = time.Now().UnixNano() // Stop at current time if not specified - } - - // Message processing function - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { - // Convert log entry to schema_pb.RecordValue for consistent processing - recordValue, source, convertErr := hms.convertLogEntryToRecordValue(logEntry) - if convertErr != nil { - return false, fmt.Errorf("failed to convert log entry: %v", convertErr) - } - - // Apply predicate filtering (WHERE clause) - if options.Predicate != nil && !options.Predicate(recordValue) { - return false, nil // Skip this message - } - - // Extract system columns - timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() - key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() - - // Apply column projection - values := make(map[string]*schema_pb.Value) - if len(options.Columns) == 0 { - // Select all columns (excluding system columns from user view) - for name, value := range recordValue.Fields { - if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { - values[name] = value - } - } - } else { - // Select specified columns only - for _, columnName := range options.Columns { - if value, exists := recordValue.Fields[columnName]; exists { - values[columnName] = value - } - } - } - - results = append(results, HybridScanResult{ - Values: values, - Timestamp: timestamp, - Key: key, - Source: source, - }) + // STEP 2: Create streaming data sources for memory-efficient merge + var dataSources []StreamingDataSource - // Apply row limit - if options.Limit > 0 && len(results) >= options.Limit { - return true, nil // Stop processing + // Add unflushed data source (if we have unflushed results) + if len(unflushedResults) > 0 { + // Sort unflushed results by timestamp before creating stream + if len(unflushedResults) > 1 { + hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1) } - - return false, nil + dataSources = append(dataSources, NewSliceDataSource(unflushedResults)) } - // Only scan flushed data if we haven't reached the limit from unflushed data - if options.Limit == 0 || len(results) < options.Limit { - // Adjust limit for remaining capacity - remainingLimit := options.Limit - len(results) - if remainingLimit > 0 { - // Create a copy of options with adjusted limit for flushed data - flushedOptions := options - flushedOptions.Limit = remainingLimit - } - - // Start scanning from the specified position - startPosition := log_buffer.MessagePosition{Time: startTime} - _, _, err = mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) + // Add streaming flushed data source (live logs + Parquet files) + flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options) + dataSources = append(dataSources, flushedDataSource) + // STEP 3: Use streaming merge for memory-efficient chronological ordering + var results []HybridScanResult + if len(dataSources) > 0 { + mergedResults, err := hms.streamingMerge(dataSources, options.Limit) if err != nil { - return nil, stats, fmt.Errorf("flushed data scan failed: %v", err) + return nil, stats, fmt.Errorf("streaming merge failed: %v", err) } + results = mergedResults } - // STEP 3: Sort results chronologically (unflushed + flushed data) - // This ensures proper time ordering across all data sources - if len(results) > 1 { - // Use efficient merge sort for better performance with large datasets - hms.mergeSort(results, 0, len(results)-1) - } - - // Apply final limit after merging and sorting - if options.Limit > 0 && len(results) > options.Limit { - results = results[:options.Limit] - } - - // If no results found, generate sample data for testing environments + // STEP 4: Fallback to sample data if no results found if len(results) == 0 { sampleResults := hms.generateSampleHybridData(options) results = append(results, sampleResults...) @@ -1267,6 +1235,253 @@ func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int { return 0 } +// streamingMerge merges multiple sorted data sources using a heap-based approach +// This provides memory-efficient merging without loading all data into memory +func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) { + if len(dataSources) == 0 { + return nil, nil + } + + var results []HybridScanResult + mergeHeap := &StreamingMergeHeap{} + heap.Init(mergeHeap) + + // Initialize heap with first item from each data source + for i, source := range dataSources { + if source.HasMore() { + result, err := source.Next() + if err != nil { + // Close all sources and return error + for _, s := range dataSources { + s.Close() + } + return nil, fmt.Errorf("failed to read from data source %d: %v", i, err) + } + if result != nil { + heap.Push(mergeHeap, &StreamingMergeItem{ + Result: result, + SourceID: i, + DataSource: source, + }) + } + } + } + + // Process results in chronological order + for mergeHeap.Len() > 0 { + // Get next chronologically ordered result + item := heap.Pop(mergeHeap).(*StreamingMergeItem) + results = append(results, *item.Result) + + // Check limit + if limit > 0 && len(results) >= limit { + break + } + + // Try to get next item from the same data source + if item.DataSource.HasMore() { + nextResult, err := item.DataSource.Next() + if err != nil { + // Log error but continue with other sources + fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err) + } else if nextResult != nil { + heap.Push(mergeHeap, &StreamingMergeItem{ + Result: nextResult, + SourceID: item.SourceID, + DataSource: item.DataSource, + }) + } + } + } + + // Close all data sources + for _, source := range dataSources { + source.Close() + } + + return results, nil +} + +// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource +// This is used for unflushed data that is already loaded into memory +type SliceDataSource struct { + results []HybridScanResult + index int +} + +func NewSliceDataSource(results []HybridScanResult) *SliceDataSource { + return &SliceDataSource{ + results: results, + index: 0, + } +} + +func (s *SliceDataSource) Next() (*HybridScanResult, error) { + if s.index >= len(s.results) { + return nil, nil + } + result := &s.results[s.index] + s.index++ + return result, nil +} + +func (s *SliceDataSource) HasMore() bool { + return s.index < len(s.results) +} + +func (s *SliceDataSource) Close() error { + return nil // Nothing to clean up for slice-based source +} + +// StreamingFlushedDataSource provides streaming access to flushed data +type StreamingFlushedDataSource struct { + hms *HybridMessageScanner + partition topic.Partition + options HybridScanOptions + mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) + resultChan chan *HybridScanResult + errorChan chan error + doneChan chan struct{} + started bool + finished bool +} + +func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource { + mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition) + + return &StreamingFlushedDataSource{ + hms: hms, + partition: partition, + options: options, + mergedReadFn: mergedReadFn, + resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance + errorChan: make(chan error, 1), + doneChan: make(chan struct{}), + started: false, + finished: false, + } +} + +func (s *StreamingFlushedDataSource) startStreaming() { + if s.started { + return + } + s.started = true + + go func() { + defer close(s.resultChan) + defer close(s.errorChan) + defer close(s.doneChan) + + // Set up time range for scanning + startTime := time.Unix(0, s.options.StartTimeNs) + if s.options.StartTimeNs == 0 { + startTime = time.Unix(0, 0) + } + + stopTsNs := s.options.StopTimeNs + if stopTsNs == 0 { + stopTsNs = time.Now().UnixNano() + } + + // Message processing function + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Convert log entry to schema_pb.RecordValue for consistent processing + recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry) + if convertErr != nil { + return false, fmt.Errorf("failed to convert log entry: %v", convertErr) + } + + // Apply predicate filtering (WHERE clause) + if s.options.Predicate != nil && !s.options.Predicate(recordValue) { + return false, nil // Skip this message + } + + // Extract system columns + timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() + + // Apply column projection + values := make(map[string]*schema_pb.Value) + if len(s.options.Columns) == 0 { + // Select all columns (excluding system columns from user view) + for name, value := range recordValue.Fields { + if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { + values[name] = value + } + } + } else { + // Select specified columns only + for _, columnName := range s.options.Columns { + if value, exists := recordValue.Fields[columnName]; exists { + values[columnName] = value + } + } + } + + result := &HybridScanResult{ + Values: values, + Timestamp: timestamp, + Key: key, + Source: source, + } + + // Send result to channel + select { + case s.resultChan <- result: + return false, nil + case <-s.doneChan: + return true, nil // Stop processing if closed + } + } + + // Start scanning from the specified position + startPosition := log_buffer.MessagePosition{Time: startTime} + _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) + + if err != nil { + select { + case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err): + case <-s.doneChan: + } + } + + s.finished = true + }() +} + +func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) { + if !s.started { + s.startStreaming() + } + + select { + case result, ok := <-s.resultChan: + if !ok { + return nil, nil // No more results + } + return result, nil + case err := <-s.errorChan: + return nil, err + case <-s.doneChan: + return nil, nil + } +} + +func (s *StreamingFlushedDataSource) HasMore() bool { + if !s.started { + return true // Haven't started yet, so potentially has data + } + return !s.finished || len(s.resultChan) > 0 +} + +func (s *StreamingFlushedDataSource) Close() error { + if !s.finished { + close(s.doneChan) + } + return nil +} + // mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) { if left < right {