You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1905 lines
						
					
					
						
							61 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							1905 lines
						
					
					
						
							61 KiB
						
					
					
				
								package engine
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"container/heap"
							 | 
						|
									"context"
							 | 
						|
									"encoding/binary"
							 | 
						|
									"encoding/json"
							 | 
						|
									"fmt"
							 | 
						|
									"io"
							 | 
						|
									"strconv"
							 | 
						|
									"strings"
							 | 
						|
									"sync"
							 | 
						|
									"sync/atomic"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/parquet-go/parquet-go"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/schema"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/wdclient"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// HybridMessageScanner scans from ALL data sources:
							 | 
						|
								// Architecture:
							 | 
						|
								// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
							 | 
						|
								// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
							 | 
						|
								// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
							 | 
						|
								// 4. Seamlessly merges data from all sources chronologically
							 | 
						|
								// 5. Provides complete real-time view of all messages in a topic
							 | 
						|
								type HybridMessageScanner struct {
							 | 
						|
									filerClient   filer_pb.FilerClient
							 | 
						|
									brokerClient  BrokerClientInterface // For querying unflushed data
							 | 
						|
									topic         topic.Topic
							 | 
						|
									recordSchema  *schema_pb.RecordType
							 | 
						|
									schemaFormat  string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
							 | 
						|
									parquetLevels *schema.ParquetLevels
							 | 
						|
									engine        *SQLEngine // Reference for system column formatting
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewHybridMessageScanner creates a scanner that reads from all data sources
							 | 
						|
								// This provides complete real-time message coverage including unflushed data
							 | 
						|
								func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
							 | 
						|
									// Check if filerClient is available
							 | 
						|
									if filerClient == nil {
							 | 
						|
										return nil, fmt.Errorf("filerClient is required but not available")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create topic reference
							 | 
						|
									t := topic.Topic{
							 | 
						|
										Namespace: namespace,
							 | 
						|
										Name:      topicName,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Get flat schema from broker client
							 | 
						|
									recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to get topic record type: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if recordType == nil || len(recordType.Fields) == 0 {
							 | 
						|
										// For topics without schema, create a minimal schema with system fields and _value
							 | 
						|
										recordType = schema.RecordTypeBegin().
							 | 
						|
											WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
							 | 
						|
											WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
							 | 
						|
											WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
							 | 
						|
											RecordTypeEnd()
							 | 
						|
									} else {
							 | 
						|
										// Create a copy of the recordType to avoid modifying the original
							 | 
						|
										recordTypeCopy := &schema_pb.RecordType{
							 | 
						|
											Fields: make([]*schema_pb.Field, len(recordType.Fields)),
							 | 
						|
										}
							 | 
						|
										copy(recordTypeCopy.Fields, recordType.Fields)
							 | 
						|
								
							 | 
						|
										// Add system columns that MQ adds to all records
							 | 
						|
										recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
							 | 
						|
											WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
							 | 
						|
											WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
							 | 
						|
											RecordTypeEnd()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert to Parquet levels for efficient reading
							 | 
						|
									parquetLevels, err := schema.ToParquetLevels(recordType)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &HybridMessageScanner{
							 | 
						|
										filerClient:   filerClient,
							 | 
						|
										brokerClient:  brokerClient,
							 | 
						|
										topic:         t,
							 | 
						|
										recordSchema:  recordType,
							 | 
						|
										schemaFormat:  schemaFormat,
							 | 
						|
										parquetLevels: parquetLevels,
							 | 
						|
										engine:        engine,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// HybridScanOptions configure how the scanner reads from both live and archived data
							 | 
						|
								type HybridScanOptions struct {
							 | 
						|
									// Time range filtering (Unix nanoseconds)
							 | 
						|
									StartTimeNs int64
							 | 
						|
									StopTimeNs  int64
							 | 
						|
								
							 | 
						|
									// Column projection - if empty, select all columns
							 | 
						|
									Columns []string
							 | 
						|
								
							 | 
						|
									// Row limit - 0 means no limit
							 | 
						|
									Limit int
							 | 
						|
								
							 | 
						|
									// Row offset - 0 means no offset
							 | 
						|
									Offset int
							 | 
						|
								
							 | 
						|
									// Predicate for WHERE clause filtering
							 | 
						|
									Predicate func(*schema_pb.RecordValue) bool
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// HybridScanResult represents a message from either live logs or Parquet files
							 | 
						|
								type HybridScanResult struct {
							 | 
						|
									Values    map[string]*schema_pb.Value // Column name -> value
							 | 
						|
									Timestamp int64                       // Message timestamp (_ts_ns)
							 | 
						|
									Key       []byte                      // Message key (_key)
							 | 
						|
									Source    string                      // "live_log" or "parquet_archive" or "in_memory_broker"
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// HybridScanStats contains statistics about data sources scanned
							 | 
						|
								type HybridScanStats struct {
							 | 
						|
									BrokerBufferQueried  bool
							 | 
						|
									BrokerBufferMessages int
							 | 
						|
									BufferStartIndex     int64
							 | 
						|
									PartitionsScanned    int
							 | 
						|
									LiveLogFilesScanned  int // Number of live log files processed
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ParquetColumnStats holds statistics for a single column from parquet metadata
							 | 
						|
								type ParquetColumnStats struct {
							 | 
						|
									ColumnName string
							 | 
						|
									MinValue   *schema_pb.Value
							 | 
						|
									MaxValue   *schema_pb.Value
							 | 
						|
									NullCount  int64
							 | 
						|
									RowCount   int64
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ParquetFileStats holds aggregated statistics for a parquet file
							 | 
						|
								type ParquetFileStats struct {
							 | 
						|
									FileName    string
							 | 
						|
									RowCount    int64
							 | 
						|
									ColumnStats map[string]*ParquetColumnStats
							 | 
						|
									// Optional file-level timestamp range from filer extended attributes
							 | 
						|
									MinTimestampNs int64
							 | 
						|
									MaxTimestampNs int64
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
							 | 
						|
								func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
							 | 
						|
									if fileStats == nil {
							 | 
						|
										return 0, 0, false
							 | 
						|
									}
							 | 
						|
									// Prefer column stats for _ts_ns if present
							 | 
						|
									if len(fileStats.ColumnStats) > 0 {
							 | 
						|
										if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
							 | 
						|
											if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
							 | 
						|
												if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
							 | 
						|
													return minNs, maxNs, true
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									// Fallback to file-level range if present in filer extended metadata
							 | 
						|
									if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
							 | 
						|
										return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
							 | 
						|
									}
							 | 
						|
									return 0, 0, false
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
							 | 
						|
								func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
							 | 
						|
									if v == nil {
							 | 
						|
										return 0, false
							 | 
						|
									}
							 | 
						|
									switch k := v.Kind.(type) {
							 | 
						|
									case *schema_pb.Value_Int64Value:
							 | 
						|
										return k.Int64Value, true
							 | 
						|
									case *schema_pb.Value_Int32Value:
							 | 
						|
										return int64(k.Int32Value), true
							 | 
						|
									default:
							 | 
						|
										return 0, false
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StreamingDataSource provides a streaming interface for reading scan results
							 | 
						|
								type StreamingDataSource interface {
							 | 
						|
									Next() (*HybridScanResult, error) // Returns next result or nil when done
							 | 
						|
									HasMore() bool                    // Returns true if more data available
							 | 
						|
									Close() error                     // Clean up resources
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StreamingMergeItem represents an item in the priority queue for streaming merge
							 | 
						|
								type StreamingMergeItem struct {
							 | 
						|
									Result     *HybridScanResult
							 | 
						|
									SourceID   int
							 | 
						|
									DataSource StreamingDataSource
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
							 | 
						|
								type StreamingMergeHeap []*StreamingMergeItem
							 | 
						|
								
							 | 
						|
								func (h StreamingMergeHeap) Len() int { return len(h) }
							 | 
						|
								
							 | 
						|
								func (h StreamingMergeHeap) Less(i, j int) bool {
							 | 
						|
									// Sort by timestamp (ascending order)
							 | 
						|
									return h[i].Result.Timestamp < h[j].Result.Timestamp
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
							 | 
						|
								
							 | 
						|
								func (h *StreamingMergeHeap) Push(x interface{}) {
							 | 
						|
									*h = append(*h, x.(*StreamingMergeItem))
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h *StreamingMergeHeap) Pop() interface{} {
							 | 
						|
									old := *h
							 | 
						|
									n := len(old)
							 | 
						|
									item := old[n-1]
							 | 
						|
									*h = old[0 : n-1]
							 | 
						|
									return item
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Scan reads messages from both live logs and archived Parquet files
							 | 
						|
								// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
							 | 
						|
								// Assumptions:
							 | 
						|
								// 1. Chronologically merges live and archived data
							 | 
						|
								// 2. Applies filtering at the lowest level for efficiency
							 | 
						|
								// 3. Handles schema evolution transparently
							 | 
						|
								func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
							 | 
						|
									results, _, err := hms.ScanWithStats(ctx, options)
							 | 
						|
									return results, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ScanWithStats reads messages and returns scan statistics for execution plans
							 | 
						|
								func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
							 | 
						|
									var results []HybridScanResult
							 | 
						|
									stats := &HybridScanStats{}
							 | 
						|
								
							 | 
						|
									// Get all partitions for this topic via MQ broker discovery
							 | 
						|
									partitions, err := hms.discoverTopicPartitions(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									stats.PartitionsScanned = len(partitions)
							 | 
						|
								
							 | 
						|
									for _, partition := range partitions {
							 | 
						|
										partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										results = append(results, partitionResults...)
							 | 
						|
								
							 | 
						|
										// Aggregate broker buffer stats
							 | 
						|
										if partitionStats != nil {
							 | 
						|
											if partitionStats.BrokerBufferQueried {
							 | 
						|
												stats.BrokerBufferQueried = true
							 | 
						|
											}
							 | 
						|
											stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
							 | 
						|
											if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
							 | 
						|
												stats.BufferStartIndex = partitionStats.BufferStartIndex
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Apply global limit (without offset) across all partitions
							 | 
						|
										// When OFFSET is used, collect more data to ensure we have enough after skipping
							 | 
						|
										// Note: OFFSET will be applied at the end to avoid double-application
							 | 
						|
										if options.Limit > 0 {
							 | 
						|
											// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
							 | 
						|
											minRequired := options.Limit + options.Offset
							 | 
						|
											// Small buffer only when needed to handle edge cases in distributed scanning
							 | 
						|
											if options.Offset > 0 && minRequired < 10 {
							 | 
						|
												minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
							 | 
						|
											}
							 | 
						|
											if len(results) >= minRequired {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Apply final OFFSET and LIMIT processing (done once at the end)
							 | 
						|
									// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
							 | 
						|
									if options.Offset > 0 || options.Limit >= 0 {
							 | 
						|
										// Handle LIMIT 0 special case first
							 | 
						|
										if options.Limit == 0 {
							 | 
						|
											return []HybridScanResult{}, stats, nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Apply OFFSET first
							 | 
						|
										if options.Offset > 0 {
							 | 
						|
											if options.Offset >= len(results) {
							 | 
						|
												results = []HybridScanResult{}
							 | 
						|
											} else {
							 | 
						|
												results = results[options.Offset:]
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Apply LIMIT after OFFSET (only if limit > 0)
							 | 
						|
										if options.Limit > 0 && len(results) > options.Limit {
							 | 
						|
											results = results[:options.Limit]
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results, stats, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
							 | 
						|
								func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
							 | 
						|
									results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
							 | 
						|
									return results, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
							 | 
						|
								func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
							 | 
						|
									var results []HybridScanResult
							 | 
						|
									stats := &HybridScanStats{}
							 | 
						|
								
							 | 
						|
									// Skip if no broker client available
							 | 
						|
									if hms.brokerClient == nil {
							 | 
						|
										return results, stats, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Mark that we attempted to query broker buffer
							 | 
						|
									stats.BrokerBufferQueried = true
							 | 
						|
								
							 | 
						|
									// Step 1: Get unflushed data from broker using buffer_start-based method
							 | 
						|
									// This method uses buffer_start metadata to avoid double-counting with exact precision
							 | 
						|
									unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
							 | 
						|
									if err != nil {
							 | 
						|
										// Log error but don't fail the query - continue with disk data only
							 | 
						|
										// Reset queried flag on error
							 | 
						|
										stats.BrokerBufferQueried = false
							 | 
						|
										return results, stats, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Capture stats for EXPLAIN
							 | 
						|
									stats.BrokerBufferMessages = len(unflushedEntries)
							 | 
						|
								
							 | 
						|
									// Step 2: Process unflushed entries (already deduplicated by broker)
							 | 
						|
									for _, logEntry := range unflushedEntries {
							 | 
						|
										// Pre-decode DataMessage for reuse in both control check and conversion
							 | 
						|
										var dataMessage *mq_pb.DataMessage
							 | 
						|
										if len(logEntry.Data) > 0 {
							 | 
						|
											dataMessage = &mq_pb.DataMessage{}
							 | 
						|
											if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
							 | 
						|
												dataMessage = nil // Failed to decode, treat as raw data
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Skip control entries without actual data
							 | 
						|
										if hms.isControlEntryWithDecoded(logEntry, dataMessage) {
							 | 
						|
											continue // Skip this entry
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Skip messages outside time range
							 | 
						|
										if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Convert LogEntry to RecordValue format (same as disk data)
							 | 
						|
										recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
							 | 
						|
										if err != nil {
							 | 
						|
											continue // Skip malformed messages
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Apply predicate filter if provided
							 | 
						|
										if options.Predicate != nil && !options.Predicate(recordValue) {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Extract system columns for result
							 | 
						|
										timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
							 | 
						|
										key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
							 | 
						|
								
							 | 
						|
										// Apply column projection
							 | 
						|
										values := make(map[string]*schema_pb.Value)
							 | 
						|
										if len(options.Columns) == 0 {
							 | 
						|
											// Select all columns (excluding system columns from user view)
							 | 
						|
											for name, value := range recordValue.Fields {
							 | 
						|
												if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
							 | 
						|
													values[name] = value
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										} else {
							 | 
						|
											// Select specified columns only
							 | 
						|
											for _, columnName := range options.Columns {
							 | 
						|
												if value, exists := recordValue.Fields[columnName]; exists {
							 | 
						|
													values[columnName] = value
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Create result with proper source tagging
							 | 
						|
										result := HybridScanResult{
							 | 
						|
											Values:    values,
							 | 
						|
											Timestamp: timestamp,
							 | 
						|
											Key:       key,
							 | 
						|
											Source:    "live_log", // Data from broker's unflushed messages
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										results = append(results, result)
							 | 
						|
								
							 | 
						|
										// Apply limit (accounting for offset) - collect exact amount needed
							 | 
						|
										if options.Limit > 0 {
							 | 
						|
											// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
							 | 
						|
											minRequired := options.Limit + options.Offset
							 | 
						|
											// Small buffer only when needed to handle edge cases in message streaming
							 | 
						|
											if options.Offset > 0 && minRequired < 10 {
							 | 
						|
												minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
							 | 
						|
											}
							 | 
						|
											if len(results) >= minRequired {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results, stats, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
							 | 
						|
								func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
							 | 
						|
									// Parse the message data as RecordValue
							 | 
						|
									recordValue := &schema_pb.RecordValue{}
							 | 
						|
									if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
							 | 
						|
										return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add system columns
							 | 
						|
									if recordValue.Fields == nil {
							 | 
						|
										recordValue.Fields = make(map[string]*schema_pb.Value)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add timestamp
							 | 
						|
									recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
										Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return recordValue, string(msg.Key), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
							 | 
						|
								// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
							 | 
						|
								func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
							 | 
						|
									if hms.filerClient == nil {
							 | 
						|
										return nil, fmt.Errorf("filerClient not available for partition discovery")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var allPartitions []topic.Partition
							 | 
						|
									var err error
							 | 
						|
								
							 | 
						|
									// Scan the topic directory for actual partition versions (timestamped directories)
							 | 
						|
									// List all version directories in the topic directory
							 | 
						|
									err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
							 | 
						|
										if !versionEntry.IsDirectory {
							 | 
						|
											return nil // Skip non-directories
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
							 | 
						|
										versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
							 | 
						|
										if parseErr != nil {
							 | 
						|
											// Skip directories that don't match the version format
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Scan partition directories within this version
							 | 
						|
										versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
							 | 
						|
										return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
							 | 
						|
											if !partitionEntry.IsDirectory {
							 | 
						|
												return nil // Skip non-directories
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Parse partition boundary from directory name (e.g., "0000-0630")
							 | 
						|
											rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
							 | 
						|
											if rangeStart == rangeStop {
							 | 
						|
												return nil // Skip invalid partition names
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Create partition object
							 | 
						|
											partition := topic.Partition{
							 | 
						|
												RangeStart: rangeStart,
							 | 
						|
												RangeStop:  rangeStop,
							 | 
						|
												RingSize:   topic.PartitionCount,
							 | 
						|
												UnixTimeNs: versionTime.UnixNano(),
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											allPartitions = append(allPartitions, partition)
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If no partitions found, return empty slice (valid for newly created or empty topics)
							 | 
						|
									if len(allPartitions) == 0 {
							 | 
						|
										fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
							 | 
						|
										return []topic.Partition{}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
							 | 
						|
									return allPartitions, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scanPartitionHybrid scans a specific partition using the hybrid approach
							 | 
						|
								// This is where the magic happens - seamlessly reading ALL data sources:
							 | 
						|
								// 1. Unflushed in-memory data from brokers (REAL-TIME)
							 | 
						|
								// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
							 | 
						|
								func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
							 | 
						|
									results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
							 | 
						|
									return results, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
							 | 
						|
								// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
							 | 
						|
								// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
							 | 
						|
								// - Scalable: Can handle large topics without LIMIT clauses efficiently
							 | 
						|
								// - Streaming: Processes data as it arrives rather than buffering everything
							 | 
						|
								func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
							 | 
						|
									stats := &HybridScanStats{}
							 | 
						|
								
							 | 
						|
									// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
							 | 
						|
									unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
							 | 
						|
									if err != nil {
							 | 
						|
										// Don't fail the query if broker scanning fails, but provide clear warning to user
							 | 
						|
										// This ensures users are aware that results may not include the most recent data
							 | 
						|
										fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
							 | 
						|
										fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
							 | 
						|
									} else if unflushedStats != nil {
							 | 
						|
										stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
							 | 
						|
										stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
							 | 
						|
										stats.BufferStartIndex = unflushedStats.BufferStartIndex
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Count live log files for statistics
							 | 
						|
									liveLogCount, err := hms.countLiveLogFiles(partition)
							 | 
						|
									if err != nil {
							 | 
						|
										// Don't fail the query, just log warning
							 | 
						|
										fmt.Printf("Warning: Failed to count live log files: %v\n", err)
							 | 
						|
										liveLogCount = 0
							 | 
						|
									}
							 | 
						|
									stats.LiveLogFilesScanned = liveLogCount
							 | 
						|
								
							 | 
						|
									// STEP 2: Create streaming data sources for memory-efficient merge
							 | 
						|
									var dataSources []StreamingDataSource
							 | 
						|
								
							 | 
						|
									// Add unflushed data source (if we have unflushed results)
							 | 
						|
									if len(unflushedResults) > 0 {
							 | 
						|
										// Sort unflushed results by timestamp before creating stream
							 | 
						|
										if len(unflushedResults) > 1 {
							 | 
						|
											hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
							 | 
						|
										}
							 | 
						|
										dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add streaming flushed data source (live logs + Parquet files)
							 | 
						|
									flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
							 | 
						|
									dataSources = append(dataSources, flushedDataSource)
							 | 
						|
								
							 | 
						|
									// STEP 3: Use streaming merge for memory-efficient chronological ordering
							 | 
						|
									var results []HybridScanResult
							 | 
						|
									if len(dataSources) > 0 {
							 | 
						|
										// Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
							 | 
						|
										// For LIMIT N OFFSET M, we need to collect at least N+M rows
							 | 
						|
										scanLimit := options.Limit
							 | 
						|
										if options.Limit > 0 && options.Offset > 0 {
							 | 
						|
											scanLimit = options.Limit + options.Offset
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
							 | 
						|
										}
							 | 
						|
										results = mergedResults
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results, stats, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// countLiveLogFiles counts the number of live log files in a partition for statistics
							 | 
						|
								func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
							 | 
						|
									partitionDir := topic.PartitionDir(hms.topic, partition)
							 | 
						|
								
							 | 
						|
									var fileCount int
							 | 
						|
									err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
										// List all files in partition directory
							 | 
						|
										request := &filer_pb.ListEntriesRequest{
							 | 
						|
											Directory:          partitionDir,
							 | 
						|
											Prefix:             "",
							 | 
						|
											StartFromFileName:  "",
							 | 
						|
											InclusiveStartFrom: true,
							 | 
						|
											Limit:              10000, // reasonable limit for counting
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										stream, err := client.ListEntries(context.Background(), request)
							 | 
						|
										if err != nil {
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										for {
							 | 
						|
											resp, err := stream.Recv()
							 | 
						|
											if err == io.EOF {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											if err != nil {
							 | 
						|
												return err
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Count files that are not .parquet files (live log files)
							 | 
						|
											// Live log files typically have timestamps or are named like log files
							 | 
						|
											fileName := resp.Entry.Name
							 | 
						|
											if !strings.HasSuffix(fileName, ".parquet") &&
							 | 
						|
												!strings.HasSuffix(fileName, ".offset") &&
							 | 
						|
												len(resp.Entry.Chunks) > 0 { // Has actual content
							 | 
						|
												fileCount++
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return nil
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										return 0, err
							 | 
						|
									}
							 | 
						|
									return fileCount, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isControlEntry checks if a log entry is a control entry without actual data
							 | 
						|
								// Based on MQ system analysis, control entries are:
							 | 
						|
								// 1. DataMessages with populated Ctrl field (publisher close signals)
							 | 
						|
								// 2. Entries with empty keys (as filtered by subscriber)
							 | 
						|
								// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries
							 | 
						|
								func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
							 | 
						|
									// Pre-decode DataMessage if needed
							 | 
						|
									var dataMessage *mq_pb.DataMessage
							 | 
						|
									if len(logEntry.Data) > 0 {
							 | 
						|
										dataMessage = &mq_pb.DataMessage{}
							 | 
						|
										if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
							 | 
						|
											dataMessage = nil // Failed to decode, treat as raw data
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									return hms.isControlEntryWithDecoded(logEntry, dataMessage)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage
							 | 
						|
								// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded
							 | 
						|
								func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool {
							 | 
						|
									// Skip entries with empty keys (same logic as subscriber)
							 | 
						|
									if len(logEntry.Key) == 0 {
							 | 
						|
										return true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check if this is a DataMessage with control field populated
							 | 
						|
									if dataMessage != nil && dataMessage.Ctrl != nil {
							 | 
						|
										return true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Messages with valid keys (even if data is empty) are legitimate messages
							 | 
						|
									// Examples: NOOP messages from Schema Registry
							 | 
						|
									return false
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isNullOrEmpty checks if a schema_pb.Value is null or empty
							 | 
						|
								func isNullOrEmpty(value *schema_pb.Value) bool {
							 | 
						|
									if value == nil {
							 | 
						|
										return true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									switch v := value.Kind.(type) {
							 | 
						|
									case *schema_pb.Value_StringValue:
							 | 
						|
										return v.StringValue == ""
							 | 
						|
									case *schema_pb.Value_BytesValue:
							 | 
						|
										return len(v.BytesValue) == 0
							 | 
						|
									case *schema_pb.Value_ListValue:
							 | 
						|
										return v.ListValue == nil || len(v.ListValue.Values) == 0
							 | 
						|
									case nil:
							 | 
						|
										return true // No kind set means null
							 | 
						|
									default:
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isSchemaless checks if the scanner is configured for a schema-less topic
							 | 
						|
								// Schema-less topics only have system fields: _ts_ns, _key, and _value
							 | 
						|
								func (hms *HybridMessageScanner) isSchemaless() bool {
							 | 
						|
									// Schema-less topics only have system fields: _ts_ns, _key, and _value
							 | 
						|
									// System topics like _schemas are NOT schema-less - they have structured data
							 | 
						|
									// We just need to map their fields during read
							 | 
						|
								
							 | 
						|
									if hms.recordSchema == nil {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Count only non-system data fields (exclude _ts_ns and _key which are always present)
							 | 
						|
									// Schema-less topics should only have _value as the data field
							 | 
						|
									hasValue := false
							 | 
						|
									dataFieldCount := 0
							 | 
						|
								
							 | 
						|
									for _, field := range hms.recordSchema.Fields {
							 | 
						|
										switch field.Name {
							 | 
						|
										case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY:
							 | 
						|
											// System fields - ignore
							 | 
						|
											continue
							 | 
						|
										case SW_COLUMN_NAME_VALUE:
							 | 
						|
											hasValue = true
							 | 
						|
											dataFieldCount++
							 | 
						|
										default:
							 | 
						|
											// Any other field means it's not schema-less
							 | 
						|
											dataFieldCount++
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Schema-less = only has _value field as the data field (plus system fields)
							 | 
						|
									return hasValue && dataFieldCount == 1
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
							 | 
						|
								// This handles both:
							 | 
						|
								// 1. Live log entries (raw message format)
							 | 
						|
								// 2. Parquet entries (already in schema_pb.RecordValue format)
							 | 
						|
								// 3. Schema-less topics (raw bytes in _value field)
							 | 
						|
								func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
							 | 
						|
									// For schema-less topics, put raw data directly into _value field
							 | 
						|
									if hms.isSchemaless() {
							 | 
						|
										recordValue := &schema_pb.RecordValue{
							 | 
						|
											Fields: make(map[string]*schema_pb.Value),
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
							 | 
						|
										}
							 | 
						|
										return recordValue, "live_log", nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Try to unmarshal as RecordValue first (Parquet format)
							 | 
						|
									recordValue := &schema_pb.RecordValue{}
							 | 
						|
									if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
							 | 
						|
										// This is an archived message from Parquet files
							 | 
						|
										// FIX: Add system columns from LogEntry to RecordValue
							 | 
						|
										if recordValue.Fields == nil {
							 | 
						|
											recordValue.Fields = make(map[string]*schema_pb.Value)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Add system columns from LogEntry
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return recordValue, "parquet_archive", nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If not a RecordValue, this is raw live message data - parse with schema
							 | 
						|
									return hms.parseRawMessageWithSchema(logEntry)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// min returns the minimum of two integers
							 | 
						|
								func min(a, b int) int {
							 | 
						|
									if a < b {
							 | 
						|
										return a
							 | 
						|
									}
							 | 
						|
									return b
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// parseRawMessageWithSchema parses raw live message data using the topic's schema
							 | 
						|
								// This provides proper type conversion and field mapping instead of treating everything as strings
							 | 
						|
								func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
							 | 
						|
									recordValue := &schema_pb.RecordValue{
							 | 
						|
										Fields: make(map[string]*schema_pb.Value),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add system columns (always present)
							 | 
						|
									recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
										Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
							 | 
						|
									}
							 | 
						|
									recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
							 | 
						|
										Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse message data based on schema
							 | 
						|
									if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
							 | 
						|
										// Fallback: No schema available, use "_value" for schema-less topics only
							 | 
						|
										if hms.isSchemaless() {
							 | 
						|
											recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										return recordValue, "live_log", nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Use schema format to directly choose the right decoder
							 | 
						|
									// This avoids trying multiple decoders and improves performance
							 | 
						|
									var parsedRecord *schema_pb.RecordValue
							 | 
						|
									var err error
							 | 
						|
								
							 | 
						|
									switch hms.schemaFormat {
							 | 
						|
									case "AVRO":
							 | 
						|
										// AVRO format - use Avro decoder
							 | 
						|
										// Note: Avro decoding requires schema registry integration
							 | 
						|
										// For now, fall through to JSON as many Avro messages are also valid JSON
							 | 
						|
										parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
							 | 
						|
									case "PROTOBUF":
							 | 
						|
										// PROTOBUF format - use protobuf decoder
							 | 
						|
										parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
							 | 
						|
									case "JSON_SCHEMA", "":
							 | 
						|
										// JSON_SCHEMA format or empty (default to JSON)
							 | 
						|
										// JSON is the most common format for schema registry
							 | 
						|
										parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
							 | 
						|
										if err != nil {
							 | 
						|
											// Try protobuf as fallback
							 | 
						|
											parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
							 | 
						|
										}
							 | 
						|
									default:
							 | 
						|
										// Unknown format - try JSON first, then protobuf as fallback
							 | 
						|
										parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
							 | 
						|
										if err != nil {
							 | 
						|
											parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err == nil && parsedRecord != nil {
							 | 
						|
										// Successfully parsed, merge with system columns
							 | 
						|
										for fieldName, fieldValue := range parsedRecord.Fields {
							 | 
						|
											recordValue.Fields[fieldName] = fieldValue
							 | 
						|
										}
							 | 
						|
										return recordValue, "live_log", nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Fallback: If schema has a single field, map the raw data to it with type conversion
							 | 
						|
									if len(hms.recordSchema.Fields) == 1 {
							 | 
						|
										field := hms.recordSchema.Fields[0]
							 | 
						|
										convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
							 | 
						|
										if convErr == nil {
							 | 
						|
											recordValue.Fields[field.Name] = convertedValue
							 | 
						|
											return recordValue, "live_log", nil
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Final fallback: treat as bytes field for schema-less topics only
							 | 
						|
									if hms.isSchemaless() {
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return recordValue, "live_log", nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue
							 | 
						|
								// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling
							 | 
						|
								func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
							 | 
						|
									// IMPORTANT: Check for schema-less topics FIRST
							 | 
						|
									// Schema-less topics (like _schemas) should store raw data directly in _value field
							 | 
						|
									if hms.isSchemaless() {
							 | 
						|
										recordValue := &schema_pb.RecordValue{
							 | 
						|
											Fields: make(map[string]*schema_pb.Value),
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
							 | 
						|
										}
							 | 
						|
										recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
							 | 
						|
											Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
							 | 
						|
										}
							 | 
						|
										return recordValue, "live_log", nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data
							 | 
						|
									// So we need to try unmarshaling LogEntry.Data as RecordValue first
							 | 
						|
									var recordValueBytes []byte
							 | 
						|
								
							 | 
						|
									if dataMessage != nil && len(dataMessage.Value) > 0 {
							 | 
						|
										// DataMessage has a Value field - use it
							 | 
						|
										recordValueBytes = dataMessage.Value
							 | 
						|
									} else {
							 | 
						|
										// DataMessage doesn't have Value, use LogEntry.Data directly
							 | 
						|
										// This is the normal case when broker stores messages
							 | 
						|
										recordValueBytes = logEntry.Data
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Try to unmarshal as RecordValue
							 | 
						|
									if len(recordValueBytes) > 0 {
							 | 
						|
										recordValue := &schema_pb.RecordValue{}
							 | 
						|
										if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil {
							 | 
						|
											// Successfully unmarshaled as RecordValue
							 | 
						|
								
							 | 
						|
											// Ensure Fields map exists
							 | 
						|
											if recordValue.Fields == nil {
							 | 
						|
												recordValue.Fields = make(map[string]*schema_pb.Value)
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Add system columns from LogEntry
							 | 
						|
											recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
							 | 
						|
											}
							 | 
						|
											recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											return recordValue, "live_log", nil
							 | 
						|
										}
							 | 
						|
										// If unmarshaling as RecordValue fails, fall back to schema-aware parsing
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// For cases where protobuf unmarshaling failed or data is empty,
							 | 
						|
									// attempt schema-aware parsing to try JSON, protobuf, and other formats
							 | 
						|
									return hms.parseRawMessageWithSchema(logEntry)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
							 | 
						|
								func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
							 | 
						|
									// Try to parse as JSON
							 | 
						|
									var jsonData map[string]interface{}
							 | 
						|
									if err := json.Unmarshal(data, &jsonData); err != nil {
							 | 
						|
										return nil, fmt.Errorf("not valid JSON: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									recordValue := &schema_pb.RecordValue{
							 | 
						|
										Fields: make(map[string]*schema_pb.Value),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Map JSON fields to schema fields
							 | 
						|
									for _, schemaField := range hms.recordSchema.Fields {
							 | 
						|
										fieldName := schemaField.Name
							 | 
						|
										if jsonValue, exists := jsonData[fieldName]; exists {
							 | 
						|
											schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
							 | 
						|
											if err != nil {
							 | 
						|
												// Log conversion error but continue with other fields
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
											recordValue.Fields[fieldName] = schemaValue
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return recordValue, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
							 | 
						|
								func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
							 | 
						|
									// This might be a raw protobuf message that didn't parse correctly the first time
							 | 
						|
									// Try alternative protobuf unmarshaling approaches
							 | 
						|
									recordValue := &schema_pb.RecordValue{}
							 | 
						|
								
							 | 
						|
									// Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
							 | 
						|
									if err := proto.Unmarshal(data, recordValue); err == nil {
							 | 
						|
										return recordValue, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Strategy 2: Check if it's a different protobuf message type
							 | 
						|
									// For now, return error as we need more specific knowledge of MQ message formats
							 | 
						|
									return nil, fmt.Errorf("could not parse as protobuf RecordValue")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertRawDataToSchemaValue converts raw bytes to a specific schema type
							 | 
						|
								func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
							 | 
						|
									dataStr := string(data)
							 | 
						|
								
							 | 
						|
									switch fieldType.Kind.(type) {
							 | 
						|
									case *schema_pb.Type_ScalarType:
							 | 
						|
										scalarType := fieldType.GetScalarType()
							 | 
						|
										switch scalarType {
							 | 
						|
										case schema_pb.ScalarType_STRING:
							 | 
						|
											return &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
							 | 
						|
											}, nil
							 | 
						|
										case schema_pb.ScalarType_INT32:
							 | 
						|
											if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_INT64:
							 | 
						|
											if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_Int64Value{Int64Value: val},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_FLOAT:
							 | 
						|
											if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_DOUBLE:
							 | 
						|
											if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_BOOL:
							 | 
						|
											lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
							 | 
						|
											if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_BoolValue{BoolValue: true},
							 | 
						|
												}, nil
							 | 
						|
											} else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_BoolValue{BoolValue: false},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_BYTES:
							 | 
						|
											return &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_BytesValue{BytesValue: data},
							 | 
						|
											}, nil
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
							 | 
						|
								func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
							 | 
						|
									switch fieldType.Kind.(type) {
							 | 
						|
									case *schema_pb.Type_ScalarType:
							 | 
						|
										scalarType := fieldType.GetScalarType()
							 | 
						|
										switch scalarType {
							 | 
						|
										case schema_pb.ScalarType_STRING:
							 | 
						|
											if str, ok := jsonValue.(string); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_StringValue{StringValue: str},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
											// Convert other types to string
							 | 
						|
											return &schema_pb.Value{
							 | 
						|
												Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
							 | 
						|
											}, nil
							 | 
						|
										case schema_pb.ScalarType_INT32:
							 | 
						|
											if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_INT64:
							 | 
						|
											if num, ok := jsonValue.(float64); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_FLOAT:
							 | 
						|
											if num, ok := jsonValue.(float64); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_DOUBLE:
							 | 
						|
											if num, ok := jsonValue.(float64); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_BOOL:
							 | 
						|
											if boolVal, ok := jsonValue.(bool); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										case schema_pb.ScalarType_BYTES:
							 | 
						|
											if str, ok := jsonValue.(string); ok {
							 | 
						|
												return &schema_pb.Value{
							 | 
						|
													Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
							 | 
						|
												}, nil
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ConvertToSQLResult converts HybridScanResults to SQL query results
							 | 
						|
								func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
							 | 
						|
									if len(results) == 0 {
							 | 
						|
										return &QueryResult{
							 | 
						|
											Columns:  columns,
							 | 
						|
											Rows:     [][]sqltypes.Value{},
							 | 
						|
											Database: hms.topic.Namespace,
							 | 
						|
											Table:    hms.topic.Name,
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Determine columns if not specified
							 | 
						|
									if len(columns) == 0 {
							 | 
						|
										columnSet := make(map[string]bool)
							 | 
						|
										for _, result := range results {
							 | 
						|
											for columnName := range result.Values {
							 | 
						|
												columnSet[columnName] = true
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										columns = make([]string, 0, len(columnSet))
							 | 
						|
										for columnName := range columnSet {
							 | 
						|
											columns = append(columns, columnName)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// If no data columns were found, include system columns so we have something to display
							 | 
						|
										if len(columns) == 0 {
							 | 
						|
											columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert to SQL rows
							 | 
						|
									rows := make([][]sqltypes.Value, len(results))
							 | 
						|
									for i, result := range results {
							 | 
						|
										row := make([]sqltypes.Value, len(columns))
							 | 
						|
										for j, columnName := range columns {
							 | 
						|
											switch columnName {
							 | 
						|
											case SW_COLUMN_NAME_SOURCE:
							 | 
						|
												row[j] = sqltypes.NewVarChar(result.Source)
							 | 
						|
											case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
							 | 
						|
												// Format timestamp as proper timestamp type instead of raw nanoseconds
							 | 
						|
												row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
							 | 
						|
											case SW_COLUMN_NAME_KEY:
							 | 
						|
												row[j] = sqltypes.NewVarBinary(string(result.Key))
							 | 
						|
											default:
							 | 
						|
												if value, exists := result.Values[columnName]; exists {
							 | 
						|
													row[j] = convertSchemaValueToSQL(value)
							 | 
						|
												} else {
							 | 
						|
													row[j] = sqltypes.NULL
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										rows[i] = row
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &QueryResult{
							 | 
						|
										Columns:  columns,
							 | 
						|
										Rows:     rows,
							 | 
						|
										Database: hms.topic.Namespace,
							 | 
						|
										Table:    hms.topic.Name,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
							 | 
						|
								// Combines auto-discovered columns (from *) with explicitly requested columns
							 | 
						|
								func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
							 | 
						|
									if len(results) == 0 {
							 | 
						|
										// For empty results, combine auto-discovered columns with explicit ones
							 | 
						|
										columnSet := make(map[string]bool)
							 | 
						|
								
							 | 
						|
										// Add explicit columns first
							 | 
						|
										for _, col := range explicitColumns {
							 | 
						|
											columnSet[col] = true
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Build final column list
							 | 
						|
										columns := make([]string, 0, len(columnSet))
							 | 
						|
										for col := range columnSet {
							 | 
						|
											columns = append(columns, col)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return &QueryResult{
							 | 
						|
											Columns:  columns,
							 | 
						|
											Rows:     [][]sqltypes.Value{},
							 | 
						|
											Database: hms.topic.Namespace,
							 | 
						|
											Table:    hms.topic.Name,
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Auto-discover columns from data (like SELECT *)
							 | 
						|
									autoColumns := make(map[string]bool)
							 | 
						|
									for _, result := range results {
							 | 
						|
										for columnName := range result.Values {
							 | 
						|
											autoColumns[columnName] = true
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Combine auto-discovered and explicit columns
							 | 
						|
									columnSet := make(map[string]bool)
							 | 
						|
								
							 | 
						|
									// Add auto-discovered columns first (regular data columns)
							 | 
						|
									for col := range autoColumns {
							 | 
						|
										columnSet[col] = true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add explicit columns (may include system columns like _source)
							 | 
						|
									for _, col := range explicitColumns {
							 | 
						|
										columnSet[col] = true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Build final column list
							 | 
						|
									columns := make([]string, 0, len(columnSet))
							 | 
						|
									for col := range columnSet {
							 | 
						|
										columns = append(columns, col)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If no data columns were found and no explicit columns specified, include system columns
							 | 
						|
									if len(columns) == 0 {
							 | 
						|
										columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert to SQL rows
							 | 
						|
									rows := make([][]sqltypes.Value, len(results))
							 | 
						|
									for i, result := range results {
							 | 
						|
										row := make([]sqltypes.Value, len(columns))
							 | 
						|
										for j, columnName := range columns {
							 | 
						|
											switch columnName {
							 | 
						|
											case SW_COLUMN_NAME_TIMESTAMP:
							 | 
						|
												row[j] = sqltypes.NewInt64(result.Timestamp)
							 | 
						|
											case SW_COLUMN_NAME_KEY:
							 | 
						|
												row[j] = sqltypes.NewVarBinary(string(result.Key))
							 | 
						|
											case SW_COLUMN_NAME_SOURCE:
							 | 
						|
												row[j] = sqltypes.NewVarChar(result.Source)
							 | 
						|
											default:
							 | 
						|
												// Regular data column
							 | 
						|
												if value, exists := result.Values[columnName]; exists {
							 | 
						|
													row[j] = convertSchemaValueToSQL(value)
							 | 
						|
												} else {
							 | 
						|
													row[j] = sqltypes.NULL
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
										rows[i] = row
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &QueryResult{
							 | 
						|
										Columns:  columns,
							 | 
						|
										Rows:     rows,
							 | 
						|
										Database: hms.topic.Namespace,
							 | 
						|
										Table:    hms.topic.Name,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ReadParquetStatistics efficiently reads column statistics from parquet files
							 | 
						|
								// without scanning the full file content - uses parquet's built-in metadata
							 | 
						|
								func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
							 | 
						|
									var fileStats []*ParquetFileStats
							 | 
						|
								
							 | 
						|
									// Use the same chunk cache as the logstore package
							 | 
						|
									chunkCache := chunk_cache.NewChunkCacheInMemory(256)
							 | 
						|
									lookupFileIdFn := filer.LookupFn(h.filerClient)
							 | 
						|
								
							 | 
						|
									err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
							 | 
						|
										// Only process parquet files
							 | 
						|
										if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Extract statistics from this parquet file
							 | 
						|
										stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
							 | 
						|
										if err != nil {
							 | 
						|
											// Log error but continue processing other files
							 | 
						|
											fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
							 | 
						|
											return nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if stats != nil {
							 | 
						|
											fileStats = append(fileStats, stats)
							 | 
						|
										}
							 | 
						|
										return nil
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									return fileStats, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// extractParquetFileStats extracts column statistics from a single parquet file
							 | 
						|
								func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
							 | 
						|
									// Create reader for the parquet file
							 | 
						|
									fileSize := filer.FileSize(entry)
							 | 
						|
									visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
							 | 
						|
									chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
							 | 
						|
									readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
							 | 
						|
									readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
							 | 
						|
								
							 | 
						|
									// Create parquet reader - this only reads metadata, not data
							 | 
						|
									parquetReader := parquet.NewReader(readerAt)
							 | 
						|
									defer parquetReader.Close()
							 | 
						|
								
							 | 
						|
									fileView := parquetReader.File()
							 | 
						|
								
							 | 
						|
									fileStats := &ParquetFileStats{
							 | 
						|
										FileName:    entry.Name,
							 | 
						|
										RowCount:    fileView.NumRows(),
							 | 
						|
										ColumnStats: make(map[string]*ParquetColumnStats),
							 | 
						|
									}
							 | 
						|
									// Populate optional min/max from filer extended attributes (writer stores ns timestamps)
							 | 
						|
									if entry != nil && entry.Extended != nil {
							 | 
						|
										if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 {
							 | 
						|
											fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
							 | 
						|
										}
							 | 
						|
										if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 {
							 | 
						|
											fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Get schema information
							 | 
						|
									schema := fileView.Schema()
							 | 
						|
								
							 | 
						|
									// Process each row group
							 | 
						|
									rowGroups := fileView.RowGroups()
							 | 
						|
									for _, rowGroup := range rowGroups {
							 | 
						|
										columnChunks := rowGroup.ColumnChunks()
							 | 
						|
								
							 | 
						|
										// Process each column chunk
							 | 
						|
										for i, chunk := range columnChunks {
							 | 
						|
											// Get column name from schema
							 | 
						|
											columnName := h.getColumnNameFromSchema(schema, i)
							 | 
						|
											if columnName == "" {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Try to get column statistics
							 | 
						|
											columnIndex, err := chunk.ColumnIndex()
							 | 
						|
											if err != nil {
							 | 
						|
												// No column index available - skip this column
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Extract min/max values from the first page (for simplicity)
							 | 
						|
											// In a more sophisticated implementation, we could aggregate across all pages
							 | 
						|
											numPages := columnIndex.NumPages()
							 | 
						|
											if numPages == 0 {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											minParquetValue := columnIndex.MinValue(0)
							 | 
						|
											maxParquetValue := columnIndex.MaxValue(numPages - 1)
							 | 
						|
											nullCount := int64(0)
							 | 
						|
								
							 | 
						|
											// Aggregate null counts across all pages
							 | 
						|
											for pageIdx := 0; pageIdx < numPages; pageIdx++ {
							 | 
						|
												nullCount += columnIndex.NullCount(pageIdx)
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Convert parquet values to schema_pb.Value
							 | 
						|
											minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
							 | 
						|
											if err != nil {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
							 | 
						|
											if err != nil {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Store column statistics (aggregate across row groups if column already exists)
							 | 
						|
											if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
							 | 
						|
												// Update existing statistics
							 | 
						|
												if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
							 | 
						|
													existingStats.MinValue = minValue
							 | 
						|
												}
							 | 
						|
												if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
							 | 
						|
													existingStats.MaxValue = maxValue
							 | 
						|
												}
							 | 
						|
												existingStats.NullCount += nullCount
							 | 
						|
											} else {
							 | 
						|
												// Create new column statistics
							 | 
						|
												fileStats.ColumnStats[columnName] = &ParquetColumnStats{
							 | 
						|
													ColumnName: columnName,
							 | 
						|
													MinValue:   minValue,
							 | 
						|
													MaxValue:   maxValue,
							 | 
						|
													NullCount:  nullCount,
							 | 
						|
													RowCount:   rowGroup.NumRows(),
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return fileStats, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getColumnNameFromSchema extracts column name from parquet schema by index
							 | 
						|
								func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
							 | 
						|
									// Get the leaf columns in order
							 | 
						|
									var columnNames []string
							 | 
						|
									h.collectColumnNames(schema.Fields(), &columnNames)
							 | 
						|
								
							 | 
						|
									if columnIndex >= 0 && columnIndex < len(columnNames) {
							 | 
						|
										return columnNames[columnIndex]
							 | 
						|
									}
							 | 
						|
									return ""
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// collectColumnNames recursively collects leaf column names from schema
							 | 
						|
								func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
							 | 
						|
									for _, field := range fields {
							 | 
						|
										if len(field.Fields()) == 0 {
							 | 
						|
											// This is a leaf field (no sub-fields)
							 | 
						|
											*names = append(*names, field.Name())
							 | 
						|
										} else {
							 | 
						|
											// This is a group - recurse
							 | 
						|
											h.collectColumnNames(field.Fields(), names)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
							 | 
						|
								func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
							 | 
						|
									switch pv.Kind() {
							 | 
						|
									case parquet.Boolean:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
							 | 
						|
									case parquet.Int32:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
							 | 
						|
									case parquet.Int64:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
							 | 
						|
									case parquet.Float:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
							 | 
						|
									case parquet.Double:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
							 | 
						|
									case parquet.ByteArray:
							 | 
						|
										return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
							 | 
						|
									default:
							 | 
						|
										return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// compareSchemaValues compares two schema_pb.Value objects
							 | 
						|
								func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
							 | 
						|
									if v1 == nil && v2 == nil {
							 | 
						|
										return 0
							 | 
						|
									}
							 | 
						|
									if v1 == nil {
							 | 
						|
										return -1
							 | 
						|
									}
							 | 
						|
									if v2 == nil {
							 | 
						|
										return 1
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Extract raw values and compare
							 | 
						|
									raw1 := h.extractRawValueFromSchema(v1)
							 | 
						|
									raw2 := h.extractRawValueFromSchema(v2)
							 | 
						|
								
							 | 
						|
									return h.compareRawValues(raw1, raw2)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// extractRawValueFromSchema extracts the raw value from schema_pb.Value
							 | 
						|
								func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
							 | 
						|
									switch v := value.Kind.(type) {
							 | 
						|
									case *schema_pb.Value_BoolValue:
							 | 
						|
										return v.BoolValue
							 | 
						|
									case *schema_pb.Value_Int32Value:
							 | 
						|
										return v.Int32Value
							 | 
						|
									case *schema_pb.Value_Int64Value:
							 | 
						|
										return v.Int64Value
							 | 
						|
									case *schema_pb.Value_FloatValue:
							 | 
						|
										return v.FloatValue
							 | 
						|
									case *schema_pb.Value_DoubleValue:
							 | 
						|
										return v.DoubleValue
							 | 
						|
									case *schema_pb.Value_BytesValue:
							 | 
						|
										return string(v.BytesValue) // Convert to string for comparison
							 | 
						|
									case *schema_pb.Value_StringValue:
							 | 
						|
										return v.StringValue
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// compareRawValues compares two raw values
							 | 
						|
								func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
							 | 
						|
									// Handle nil cases
							 | 
						|
									if v1 == nil && v2 == nil {
							 | 
						|
										return 0
							 | 
						|
									}
							 | 
						|
									if v1 == nil {
							 | 
						|
										return -1
							 | 
						|
									}
							 | 
						|
									if v2 == nil {
							 | 
						|
										return 1
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Compare based on type
							 | 
						|
									switch val1 := v1.(type) {
							 | 
						|
									case bool:
							 | 
						|
										if val2, ok := v2.(bool); ok {
							 | 
						|
											if val1 == val2 {
							 | 
						|
												return 0
							 | 
						|
											}
							 | 
						|
											if val1 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return -1
							 | 
						|
										}
							 | 
						|
									case int32:
							 | 
						|
										if val2, ok := v2.(int32); ok {
							 | 
						|
											if val1 < val2 {
							 | 
						|
												return -1
							 | 
						|
											} else if val1 > val2 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return 0
							 | 
						|
										}
							 | 
						|
									case int64:
							 | 
						|
										if val2, ok := v2.(int64); ok {
							 | 
						|
											if val1 < val2 {
							 | 
						|
												return -1
							 | 
						|
											} else if val1 > val2 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return 0
							 | 
						|
										}
							 | 
						|
									case float32:
							 | 
						|
										if val2, ok := v2.(float32); ok {
							 | 
						|
											if val1 < val2 {
							 | 
						|
												return -1
							 | 
						|
											} else if val1 > val2 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return 0
							 | 
						|
										}
							 | 
						|
									case float64:
							 | 
						|
										if val2, ok := v2.(float64); ok {
							 | 
						|
											if val1 < val2 {
							 | 
						|
												return -1
							 | 
						|
											} else if val1 > val2 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return 0
							 | 
						|
										}
							 | 
						|
									case string:
							 | 
						|
										if val2, ok := v2.(string); ok {
							 | 
						|
											if val1 < val2 {
							 | 
						|
												return -1
							 | 
						|
											} else if val1 > val2 {
							 | 
						|
												return 1
							 | 
						|
											}
							 | 
						|
											return 0
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Default: try string comparison
							 | 
						|
									str1 := fmt.Sprintf("%v", v1)
							 | 
						|
									str2 := fmt.Sprintf("%v", v2)
							 | 
						|
									if str1 < str2 {
							 | 
						|
										return -1
							 | 
						|
									} else if str1 > str2 {
							 | 
						|
										return 1
							 | 
						|
									}
							 | 
						|
									return 0
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// streamingMerge merges multiple sorted data sources using a heap-based approach
							 | 
						|
								// This provides memory-efficient merging without loading all data into memory
							 | 
						|
								func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
							 | 
						|
									if len(dataSources) == 0 {
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var results []HybridScanResult
							 | 
						|
									mergeHeap := &StreamingMergeHeap{}
							 | 
						|
									heap.Init(mergeHeap)
							 | 
						|
								
							 | 
						|
									// Initialize heap with first item from each data source
							 | 
						|
									for i, source := range dataSources {
							 | 
						|
										if source.HasMore() {
							 | 
						|
											result, err := source.Next()
							 | 
						|
											if err != nil {
							 | 
						|
												// Close all sources and return error
							 | 
						|
												for _, s := range dataSources {
							 | 
						|
													s.Close()
							 | 
						|
												}
							 | 
						|
												return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
							 | 
						|
											}
							 | 
						|
											if result != nil {
							 | 
						|
												heap.Push(mergeHeap, &StreamingMergeItem{
							 | 
						|
													Result:     result,
							 | 
						|
													SourceID:   i,
							 | 
						|
													DataSource: source,
							 | 
						|
												})
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Process results in chronological order
							 | 
						|
									for mergeHeap.Len() > 0 {
							 | 
						|
										// Get next chronologically ordered result
							 | 
						|
										item := heap.Pop(mergeHeap).(*StreamingMergeItem)
							 | 
						|
										results = append(results, *item.Result)
							 | 
						|
								
							 | 
						|
										// Check limit
							 | 
						|
										if limit > 0 && len(results) >= limit {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Try to get next item from the same data source
							 | 
						|
										if item.DataSource.HasMore() {
							 | 
						|
											nextResult, err := item.DataSource.Next()
							 | 
						|
											if err != nil {
							 | 
						|
												// Log error but continue with other sources
							 | 
						|
												fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
							 | 
						|
											} else if nextResult != nil {
							 | 
						|
												heap.Push(mergeHeap, &StreamingMergeItem{
							 | 
						|
													Result:     nextResult,
							 | 
						|
													SourceID:   item.SourceID,
							 | 
						|
													DataSource: item.DataSource,
							 | 
						|
												})
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Close all data sources
							 | 
						|
									for _, source := range dataSources {
							 | 
						|
										source.Close()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
							 | 
						|
								// This is used for unflushed data that is already loaded into memory
							 | 
						|
								type SliceDataSource struct {
							 | 
						|
									results []HybridScanResult
							 | 
						|
									index   int
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
							 | 
						|
									return &SliceDataSource{
							 | 
						|
										results: results,
							 | 
						|
										index:   0,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *SliceDataSource) Next() (*HybridScanResult, error) {
							 | 
						|
									if s.index >= len(s.results) {
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									result := &s.results[s.index]
							 | 
						|
									s.index++
							 | 
						|
									return result, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *SliceDataSource) HasMore() bool {
							 | 
						|
									return s.index < len(s.results)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *SliceDataSource) Close() error {
							 | 
						|
									return nil // Nothing to clean up for slice-based source
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StreamingFlushedDataSource provides streaming access to flushed data
							 | 
						|
								type StreamingFlushedDataSource struct {
							 | 
						|
									hms          *HybridMessageScanner
							 | 
						|
									partition    topic.Partition
							 | 
						|
									options      HybridScanOptions
							 | 
						|
									mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
							 | 
						|
									resultChan   chan *HybridScanResult
							 | 
						|
									errorChan    chan error
							 | 
						|
									doneChan     chan struct{}
							 | 
						|
									started      bool
							 | 
						|
									finished     bool
							 | 
						|
									closed       int32 // atomic flag to prevent double close
							 | 
						|
									mu           sync.RWMutex
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
							 | 
						|
									mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
							 | 
						|
								
							 | 
						|
									return &StreamingFlushedDataSource{
							 | 
						|
										hms:          hms,
							 | 
						|
										partition:    partition,
							 | 
						|
										options:      options,
							 | 
						|
										mergedReadFn: mergedReadFn,
							 | 
						|
										resultChan:   make(chan *HybridScanResult, 100), // Buffer for better performance
							 | 
						|
										errorChan:    make(chan error, 1),
							 | 
						|
										doneChan:     make(chan struct{}),
							 | 
						|
										started:      false,
							 | 
						|
										finished:     false,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *StreamingFlushedDataSource) startStreaming() {
							 | 
						|
									if s.started {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
									s.started = true
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										defer func() {
							 | 
						|
											// Use atomic flag to ensure channels are only closed once
							 | 
						|
											if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
							 | 
						|
												close(s.resultChan)
							 | 
						|
												close(s.errorChan)
							 | 
						|
												close(s.doneChan)
							 | 
						|
											}
							 | 
						|
										}()
							 | 
						|
								
							 | 
						|
										// Set up time range for scanning
							 | 
						|
										startTime := time.Unix(0, s.options.StartTimeNs)
							 | 
						|
										if s.options.StartTimeNs == 0 {
							 | 
						|
											startTime = time.Unix(0, 0)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										stopTsNs := s.options.StopTimeNs
							 | 
						|
										// For SQL queries, stopTsNs = 0 means "no stop time restriction"
							 | 
						|
										// This is different from message queue consumers which want to stop at "now"
							 | 
						|
										// We detect SQL context by checking if we have a predicate function
							 | 
						|
										if stopTsNs == 0 && s.options.Predicate == nil {
							 | 
						|
											// Only set to current time for non-SQL queries (message queue consumers)
							 | 
						|
											stopTsNs = time.Now().UnixNano()
							 | 
						|
										}
							 | 
						|
										// If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
							 | 
						|
								
							 | 
						|
										// Message processing function
							 | 
						|
										eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
							 | 
						|
											// Pre-decode DataMessage for reuse in both control check and conversion
							 | 
						|
											var dataMessage *mq_pb.DataMessage
							 | 
						|
											if len(logEntry.Data) > 0 {
							 | 
						|
												dataMessage = &mq_pb.DataMessage{}
							 | 
						|
												if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
							 | 
						|
													dataMessage = nil // Failed to decode, treat as raw data
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Skip control entries without actual data
							 | 
						|
											if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) {
							 | 
						|
												return false, nil // Skip this entry
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Convert log entry to schema_pb.RecordValue for consistent processing
							 | 
						|
											recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
							 | 
						|
											if convertErr != nil {
							 | 
						|
												return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Apply predicate filtering (WHERE clause)
							 | 
						|
											if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
							 | 
						|
												return false, nil // Skip this message
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Extract system columns
							 | 
						|
											timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
							 | 
						|
											key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
							 | 
						|
								
							 | 
						|
											// Apply column projection
							 | 
						|
											values := make(map[string]*schema_pb.Value)
							 | 
						|
											if len(s.options.Columns) == 0 {
							 | 
						|
												// Select all columns (excluding system columns from user view)
							 | 
						|
												for name, value := range recordValue.Fields {
							 | 
						|
													if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
							 | 
						|
														values[name] = value
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											} else {
							 | 
						|
												// Select specified columns only
							 | 
						|
												for _, columnName := range s.options.Columns {
							 | 
						|
													if value, exists := recordValue.Fields[columnName]; exists {
							 | 
						|
														values[columnName] = value
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											result := &HybridScanResult{
							 | 
						|
												Values:    values,
							 | 
						|
												Timestamp: timestamp,
							 | 
						|
												Key:       key,
							 | 
						|
												Source:    source,
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Check if already closed before trying to send
							 | 
						|
											if atomic.LoadInt32(&s.closed) != 0 {
							 | 
						|
												return true, nil // Stop processing if closed
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Send result to channel with proper handling of closed channels
							 | 
						|
											select {
							 | 
						|
											case s.resultChan <- result:
							 | 
						|
												return false, nil
							 | 
						|
											case <-s.doneChan:
							 | 
						|
												return true, nil // Stop processing if closed
							 | 
						|
											default:
							 | 
						|
												// Check again if closed (in case it was closed between the atomic check and select)
							 | 
						|
												if atomic.LoadInt32(&s.closed) != 0 {
							 | 
						|
													return true, nil
							 | 
						|
												}
							 | 
						|
												// If not closed, try sending again with blocking select
							 | 
						|
												select {
							 | 
						|
												case s.resultChan <- result:
							 | 
						|
													return false, nil
							 | 
						|
												case <-s.doneChan:
							 | 
						|
													return true, nil
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Start scanning from the specified position
							 | 
						|
										startPosition := log_buffer.MessagePosition{Time: startTime}
							 | 
						|
										_, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
							 | 
						|
								
							 | 
						|
										if err != nil {
							 | 
						|
											// Only try to send error if not already closed
							 | 
						|
											if atomic.LoadInt32(&s.closed) == 0 {
							 | 
						|
												select {
							 | 
						|
												case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
							 | 
						|
												case <-s.doneChan:
							 | 
						|
												default:
							 | 
						|
													// Channel might be full or closed, ignore
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										s.finished = true
							 | 
						|
									}()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
							 | 
						|
									if !s.started {
							 | 
						|
										s.startStreaming()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									select {
							 | 
						|
									case result, ok := <-s.resultChan:
							 | 
						|
										if !ok {
							 | 
						|
											return nil, nil // No more results
							 | 
						|
										}
							 | 
						|
										return result, nil
							 | 
						|
									case err := <-s.errorChan:
							 | 
						|
										return nil, err
							 | 
						|
									case <-s.doneChan:
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *StreamingFlushedDataSource) HasMore() bool {
							 | 
						|
									if !s.started {
							 | 
						|
										return true // Haven't started yet, so potentially has data
							 | 
						|
									}
							 | 
						|
									return !s.finished || len(s.resultChan) > 0
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (s *StreamingFlushedDataSource) Close() error {
							 | 
						|
									// Use atomic flag to ensure channels are only closed once
							 | 
						|
									if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
							 | 
						|
										close(s.doneChan)
							 | 
						|
										close(s.resultChan)
							 | 
						|
										close(s.errorChan)
							 | 
						|
									}
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
							 | 
						|
								func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
							 | 
						|
									if left < right {
							 | 
						|
										mid := left + (right-left)/2
							 | 
						|
								
							 | 
						|
										// Recursively sort both halves
							 | 
						|
										hms.mergeSort(results, left, mid)
							 | 
						|
										hms.mergeSort(results, mid+1, right)
							 | 
						|
								
							 | 
						|
										// Merge the sorted halves
							 | 
						|
										hms.merge(results, left, mid, right)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// merge combines two sorted subarrays into a single sorted array
							 | 
						|
								func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
							 | 
						|
									// Create temporary arrays for the two subarrays
							 | 
						|
									leftArray := make([]HybridScanResult, mid-left+1)
							 | 
						|
									rightArray := make([]HybridScanResult, right-mid)
							 | 
						|
								
							 | 
						|
									// Copy data to temporary arrays
							 | 
						|
									copy(leftArray, results[left:mid+1])
							 | 
						|
									copy(rightArray, results[mid+1:right+1])
							 | 
						|
								
							 | 
						|
									// Merge the temporary arrays back into results[left..right]
							 | 
						|
									i, j, k := 0, 0, left
							 | 
						|
								
							 | 
						|
									for i < len(leftArray) && j < len(rightArray) {
							 | 
						|
										if leftArray[i].Timestamp <= rightArray[j].Timestamp {
							 | 
						|
											results[k] = leftArray[i]
							 | 
						|
											i++
							 | 
						|
										} else {
							 | 
						|
											results[k] = rightArray[j]
							 | 
						|
											j++
							 | 
						|
										}
							 | 
						|
										k++
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Copy remaining elements of leftArray, if any
							 | 
						|
									for i < len(leftArray) {
							 | 
						|
										results[k] = leftArray[i]
							 | 
						|
										i++
							 | 
						|
										k++
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Copy remaining elements of rightArray, if any
							 | 
						|
									for j < len(rightArray) {
							 | 
						|
										results[k] = rightArray[j]
							 | 
						|
										j++
							 | 
						|
										k++
							 | 
						|
									}
							 | 
						|
								}
							 |