package engine import ( "container/heap" "context" "encoding/json" "fmt" "io" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/protobuf/proto" ) // HybridMessageScanner scans from ALL data sources: // Architecture: // 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME // 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED // 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED // 4. Seamlessly merges data from all sources chronologically // 5. Provides complete real-time view of all messages in a topic type HybridMessageScanner struct { filerClient filer_pb.FilerClient brokerClient BrokerClientInterface // For querying unflushed data topic topic.Topic recordSchema *schema_pb.RecordType parquetLevels *schema.ParquetLevels engine *SQLEngine // Reference for system column formatting } // NewHybridMessageScanner creates a scanner that reads from all data sources // This provides complete real-time message coverage including unflushed data func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) { // Check if filerClient is available if filerClient == nil { return nil, fmt.Errorf("filerClient is required but not available") } // Create topic reference t := topic.Topic{ Namespace: namespace, Name: topicName, } // Get topic schema from broker client (works with both real and mock clients) recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) if err != nil { return nil, fmt.Errorf("failed to get topic schema: %v", err) } if recordType == nil { return nil, NoSchemaError{Namespace: namespace, Topic: topicName} } // Create a copy of the recordType to avoid modifying the original recordTypeCopy := &schema_pb.RecordType{ Fields: make([]*schema_pb.Field, len(recordType.Fields)), } copy(recordTypeCopy.Fields, recordType.Fields) // Add system columns that MQ adds to all records recordType = schema.NewRecordTypeBuilder(recordTypeCopy). WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) if err != nil { return nil, fmt.Errorf("failed to create Parquet levels: %v", err) } return &HybridMessageScanner{ filerClient: filerClient, brokerClient: brokerClient, topic: t, recordSchema: recordType, parquetLevels: parquetLevels, engine: engine, }, nil } // HybridScanOptions configure how the scanner reads from both live and archived data type HybridScanOptions struct { // Time range filtering (Unix nanoseconds) StartTimeNs int64 StopTimeNs int64 // Column projection - if empty, select all columns Columns []string // Row limit - 0 means no limit Limit int // Row offset - 0 means no offset Offset int // Predicate for WHERE clause filtering Predicate func(*schema_pb.RecordValue) bool } // HybridScanResult represents a message from either live logs or Parquet files type HybridScanResult struct { Values map[string]*schema_pb.Value // Column name -> value Timestamp int64 // Message timestamp (_ts_ns) Key []byte // Message key (_key) Source string // "live_log" or "parquet_archive" or "in_memory_broker" } // HybridScanStats contains statistics about data sources scanned type HybridScanStats struct { BrokerBufferQueried bool BrokerBufferMessages int BufferStartIndex int64 PartitionsScanned int LiveLogFilesScanned int // Number of live log files processed } // ParquetColumnStats holds statistics for a single column from parquet metadata type ParquetColumnStats struct { ColumnName string MinValue *schema_pb.Value MaxValue *schema_pb.Value NullCount int64 RowCount int64 } // ParquetFileStats holds aggregated statistics for a parquet file type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats } // StreamingDataSource provides a streaming interface for reading scan results type StreamingDataSource interface { Next() (*HybridScanResult, error) // Returns next result or nil when done HasMore() bool // Returns true if more data available Close() error // Clean up resources } // StreamingMergeItem represents an item in the priority queue for streaming merge type StreamingMergeItem struct { Result *HybridScanResult SourceID int DataSource StreamingDataSource } // StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp type StreamingMergeHeap []*StreamingMergeItem func (h StreamingMergeHeap) Len() int { return len(h) } func (h StreamingMergeHeap) Less(i, j int) bool { // Sort by timestamp (ascending order) return h[i].Result.Timestamp < h[j].Result.Timestamp } func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *StreamingMergeHeap) Push(x interface{}) { *h = append(*h, x.(*StreamingMergeItem)) } func (h *StreamingMergeHeap) Pop() interface{} { old := *h n := len(old) item := old[n-1] *h = old[0 : n-1] return item } // Scan reads messages from both live logs and archived Parquet files // Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration // Assumptions: // 1. Chronologically merges live and archived data // 2. Applies filtering at the lowest level for efficiency // 3. Handles schema evolution transparently func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) { results, _, err := hms.ScanWithStats(ctx, options) return results, err } // ScanWithStats reads messages and returns scan statistics for execution plans func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult stats := &HybridScanStats{} // Get all partitions for this topic via MQ broker discovery partitions, err := hms.discoverTopicPartitions(ctx) if err != nil { return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err) } stats.PartitionsScanned = len(partitions) for _, partition := range partitions { partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options) if err != nil { return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err) } results = append(results, partitionResults...) // Aggregate broker buffer stats if partitionStats != nil { if partitionStats.BrokerBufferQueried { stats.BrokerBufferQueried = true } stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) { stats.BufferStartIndex = partitionStats.BufferStartIndex } } // Apply global limit (without offset) across all partitions // When OFFSET is used, collect more data to ensure we have enough after skipping // Note: OFFSET will be applied at the end to avoid double-application if options.Limit > 0 { // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling) minRequired := options.Limit + options.Offset // Small buffer only when needed to handle edge cases in distributed scanning if options.Offset > 0 && minRequired < 10 { minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling } if len(results) >= minRequired { break } } } // Apply final OFFSET and LIMIT processing (done once at the end) // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows if options.Offset > 0 || options.Limit >= 0 { // Handle LIMIT 0 special case first if options.Limit == 0 { return []HybridScanResult{}, stats, nil } // Apply OFFSET first if options.Offset > 0 { if options.Offset >= len(results) { results = []HybridScanResult{} } else { results = results[options.Offset:] } } // Apply LIMIT after OFFSET (only if limit > 0) if options.Limit > 0 && len(results) > options.Limit { results = results[:options.Limit] } } return results, stats, nil } // scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options) return results, err } // scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { var results []HybridScanResult stats := &HybridScanStats{} // Skip if no broker client available if hms.brokerClient == nil { return results, stats, nil } // Mark that we attempted to query broker buffer stats.BrokerBufferQueried = true // Step 1: Get unflushed data from broker using buffer_start-based method // This method uses buffer_start metadata to avoid double-counting with exact precision unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) if err != nil { // Log error but don't fail the query - continue with disk data only if isDebugMode(ctx) { fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) } // Reset queried flag on error stats.BrokerBufferQueried = false return results, stats, nil } // Capture stats for EXPLAIN stats.BrokerBufferMessages = len(unflushedEntries) // Debug logging for EXPLAIN mode if isDebugMode(ctx) { fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries)) if len(unflushedEntries) > 0 { fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n") } } // Step 2: Process unflushed entries (already deduplicated by broker) for _, logEntry := range unflushedEntries { // Skip control entries without actual data if hms.isControlEntry(logEntry) { continue // Skip this entry } // Skip messages outside time range if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs { continue } if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs { continue } // Convert LogEntry to RecordValue format (same as disk data) recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry) if err != nil { if isDebugMode(ctx) { fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err) } continue // Skip malformed messages } // Apply predicate filter if provided if options.Predicate != nil && !options.Predicate(recordValue) { continue } // Extract system columns for result timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply column projection values := make(map[string]*schema_pb.Value) if len(options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { values[name] = value } } } else { // Select specified columns only for _, columnName := range options.Columns { if value, exists := recordValue.Fields[columnName]; exists { values[columnName] = value } } } // Create result with proper source tagging result := HybridScanResult{ Values: values, Timestamp: timestamp, Key: key, Source: "live_log", // Data from broker's unflushed messages } results = append(results, result) // Apply limit (accounting for offset) - collect exact amount needed if options.Limit > 0 { // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling) minRequired := options.Limit + options.Offset // Small buffer only when needed to handle edge cases in message streaming if options.Offset > 0 && minRequired < 10 { minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling } if len(results) >= minRequired { break } } } if isDebugMode(ctx) { fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) } return results, stats, nil } // convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { // Parse the message data as RecordValue recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(msg.Value, recordValue); err != nil { return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err) } // Add system columns if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } // Add timestamp recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs}, } return recordValue, string(msg.Key), nil } // discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem // This finds real partition directories like v2025-09-01-07-16-34/0000-0630/ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { if hms.filerClient == nil { return nil, fmt.Errorf("filerClient not available for partition discovery") } var allPartitions []topic.Partition var err error // Scan the topic directory for actual partition versions (timestamped directories) // List all version directories in the topic directory err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { if !versionEntry.IsDirectory { return nil // Skip non-directories } // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name) if parseErr != nil { // Skip directories that don't match the version format return nil } // Scan partition directories within this version versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name) return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { if !partitionEntry.IsDirectory { return nil // Skip non-directories } // Parse partition boundary from directory name (e.g., "0000-0630") rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name) if rangeStart == rangeStop { return nil // Skip invalid partition names } // Create partition object partition := topic.Partition{ RangeStart: rangeStart, RangeStop: rangeStop, RingSize: topic.PartitionCount, UnixTimeNs: versionTime.UnixNano(), } allPartitions = append(allPartitions, partition) return nil }) }) if err != nil { return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err) } // If no partitions found, return empty slice (valid for newly created or empty topics) if len(allPartitions) == 0 { fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String()) return []topic.Partition{}, nil } fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String()) return allPartitions, nil } // scanPartitionHybrid scans a specific partition using the hybrid approach // This is where the magic happens - seamlessly reading ALL data sources: // 1. Unflushed in-memory data from brokers (REAL-TIME) // 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED) func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options) return results, err } // scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency // PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting // - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records // - Scalable: Can handle large topics without LIMIT clauses efficiently // - Streaming: Processes data as it arrives rather than buffering everything func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { stats := &HybridScanStats{} // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME) unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options) if err != nil { // Don't fail the query if broker scanning fails, but provide clear warning to user // This ensures users are aware that results may not include the most recent data if isDebugMode(ctx) { fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err) } else { fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") } } else if unflushedStats != nil { stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages stats.BufferStartIndex = unflushedStats.BufferStartIndex } // Count live log files for statistics liveLogCount, err := hms.countLiveLogFiles(partition) if err != nil { // Don't fail the query, just log warning fmt.Printf("Warning: Failed to count live log files: %v\n", err) liveLogCount = 0 } stats.LiveLogFilesScanned = liveLogCount // STEP 2: Create streaming data sources for memory-efficient merge var dataSources []StreamingDataSource // Add unflushed data source (if we have unflushed results) if len(unflushedResults) > 0 { // Sort unflushed results by timestamp before creating stream if len(unflushedResults) > 1 { hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1) } dataSources = append(dataSources, NewSliceDataSource(unflushedResults)) } // Add streaming flushed data source (live logs + Parquet files) flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options) dataSources = append(dataSources, flushedDataSource) // STEP 3: Use streaming merge for memory-efficient chronological ordering var results []HybridScanResult if len(dataSources) > 0 { // Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT) // For LIMIT N OFFSET M, we need to collect at least N+M rows scanLimit := options.Limit if options.Limit > 0 && options.Offset > 0 { scanLimit = options.Limit + options.Offset } mergedResults, err := hms.streamingMerge(dataSources, scanLimit) if err != nil { return nil, stats, fmt.Errorf("streaming merge failed: %v", err) } results = mergedResults } return results, stats, nil } // countLiveLogFiles counts the number of live log files in a partition for statistics func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) { partitionDir := topic.PartitionDir(hms.topic, partition) var fileCount int err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // List all files in partition directory request := &filer_pb.ListEntriesRequest{ Directory: partitionDir, Prefix: "", StartFromFileName: "", InclusiveStartFrom: true, Limit: 10000, // reasonable limit for counting } stream, err := client.ListEntries(context.Background(), request) if err != nil { return err } for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } // Count files that are not .parquet files (live log files) // Live log files typically have timestamps or are named like log files fileName := resp.Entry.Name if !strings.HasSuffix(fileName, ".parquet") && !strings.HasSuffix(fileName, ".offset") && len(resp.Entry.Chunks) > 0 { // Has actual content fileCount++ } } return nil }) if err != nil { return 0, err } return fileCount, nil } // isControlEntry checks if a log entry is a control entry without actual data // Based on MQ system analysis, control entries are: // 1. DataMessages with populated Ctrl field (publisher close signals) // 2. Entries with empty keys (as filtered by subscriber) // 3. Entries with no data func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool { // Skip entries with no data if len(logEntry.Data) == 0 { return true } // Skip entries with empty keys (same logic as subscriber) if len(logEntry.Key) == 0 { return true } // Check if this is a DataMessage with control field populated dataMessage := &mq_pb.DataMessage{} if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil { // If it has a control field, it's a control message if dataMessage.Ctrl != nil { return true } } return false } // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue // This handles both: // 1. Live log entries (raw message format) // 2. Parquet entries (already in schema_pb.RecordValue format) func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { // Try to unmarshal as RecordValue first (Parquet format) recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { // This is an archived message from Parquet files // FIX: Add system columns from LogEntry to RecordValue if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } // Add system columns from LogEntry recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } return recordValue, "parquet_archive", nil } // If not a RecordValue, this is raw live message data - parse with schema return hms.parseRawMessageWithSchema(logEntry) } // parseRawMessageWithSchema parses raw live message data using the topic's schema // This provides proper type conversion and field mapping instead of treating everything as strings func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { recordValue := &schema_pb.RecordValue{ Fields: make(map[string]*schema_pb.Value), } // Add system columns (always present) recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } // Parse message data based on schema if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 { // Fallback: No schema available, treat as single "data" field recordValue.Fields["data"] = &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, } return recordValue, "live_log", nil } // Attempt schema-aware parsing // Strategy 1: Try JSON parsing first (most common for live messages) if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil { // Successfully parsed as JSON, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } // Strategy 2: Try protobuf parsing (binary messages) if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil { // Successfully parsed as protobuf, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } // Strategy 3: Fallback to single field with raw data // If schema has a single field, map the raw data to it with type conversion if len(hms.recordSchema.Fields) == 1 { field := hms.recordSchema.Fields[0] convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) if err == nil { recordValue.Fields[field.Name] = convertedValue return recordValue, "live_log", nil } } // Final fallback: treat as string data field recordValue.Fields["data"] = &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, } return recordValue, "live_log", nil } // parseJSONMessage attempts to parse raw data as JSON and map to schema fields func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) { // Try to parse as JSON var jsonData map[string]interface{} if err := json.Unmarshal(data, &jsonData); err != nil { return nil, fmt.Errorf("not valid JSON: %v", err) } recordValue := &schema_pb.RecordValue{ Fields: make(map[string]*schema_pb.Value), } // Map JSON fields to schema fields for _, schemaField := range hms.recordSchema.Fields { fieldName := schemaField.Name if jsonValue, exists := jsonData[fieldName]; exists { schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type) if err != nil { // Log conversion error but continue with other fields continue } recordValue.Fields[fieldName] = schemaValue } } return recordValue, nil } // parseProtobufMessage attempts to parse raw data as protobuf RecordValue func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) { // This might be a raw protobuf message that didn't parse correctly the first time // Try alternative protobuf unmarshaling approaches recordValue := &schema_pb.RecordValue{} // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue) if err := proto.Unmarshal(data, recordValue); err == nil { return recordValue, nil } // Strategy 2: Check if it's a different protobuf message type // For now, return error as we need more specific knowledge of MQ message formats return nil, fmt.Errorf("could not parse as protobuf RecordValue") } // convertRawDataToSchemaValue converts raw bytes to a specific schema type func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) { dataStr := string(data) switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: scalarType := fieldType.GetScalarType() switch scalarType { case schema_pb.ScalarType_STRING: return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: dataStr}, }, nil case schema_pb.ScalarType_INT32: if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)}, }, nil } case schema_pb.ScalarType_INT64: if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: val}, }, nil } case schema_pb.ScalarType_FLOAT: if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)}, }, nil } case schema_pb.ScalarType_DOUBLE: if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}, }, nil } case schema_pb.ScalarType_BOOL: lowerStr := strings.ToLower(strings.TrimSpace(dataStr)) if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: true}, }, nil } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: false}, }, nil } case schema_pb.ScalarType_BYTES: return &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: data}, }, nil } } return nil, fmt.Errorf("unsupported type conversion for %v", fieldType) } // convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: scalarType := fieldType.GetScalarType() switch scalarType { case schema_pb.ScalarType_STRING: if str, ok := jsonValue.(string); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: str}, }, nil } // Convert other types to string return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)}, }, nil case schema_pb.ScalarType_INT32: if num, ok := jsonValue.(float64); ok { // JSON numbers are float64 return &schema_pb.Value{ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)}, }, nil } case schema_pb.ScalarType_INT64: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)}, }, nil } case schema_pb.ScalarType_FLOAT: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)}, }, nil } case schema_pb.ScalarType_DOUBLE: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_DoubleValue{DoubleValue: num}, }, nil } case schema_pb.ScalarType_BOOL: if boolVal, ok := jsonValue.(bool); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal}, }, nil } case schema_pb.ScalarType_BYTES: if str, ok := jsonValue.(string); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)}, }, nil } } } return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType) } // ConvertToSQLResult converts HybridScanResults to SQL query results func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult { if len(results) == 0 { return &QueryResult{ Columns: columns, Rows: [][]sqltypes.Value{}, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // Determine columns if not specified if len(columns) == 0 { columnSet := make(map[string]bool) for _, result := range results { for columnName := range result.Values { columnSet[columnName] = true } } columns = make([]string, 0, len(columnSet)) for columnName := range columnSet { columns = append(columns, columnName) } } // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { row := make([]sqltypes.Value, len(columns)) for j, columnName := range columns { switch columnName { case SW_COLUMN_NAME_SOURCE: row[j] = sqltypes.NewVarChar(result.Source) case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP: // Format timestamp as proper timestamp type instead of raw nanoseconds row[j] = hms.engine.formatTimestampColumn(result.Timestamp) case SW_COLUMN_NAME_KEY: row[j] = sqltypes.NewVarBinary(string(result.Key)) default: if value, exists := result.Values[columnName]; exists { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } } rows[i] = row } return &QueryResult{ Columns: columns, Rows: rows, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries // Combines auto-discovered columns (from *) with explicitly requested columns func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult { if len(results) == 0 { // For empty results, combine auto-discovered columns with explicit ones columnSet := make(map[string]bool) // Add explicit columns first for _, col := range explicitColumns { columnSet[col] = true } // Build final column list columns := make([]string, 0, len(columnSet)) for col := range columnSet { columns = append(columns, col) } return &QueryResult{ Columns: columns, Rows: [][]sqltypes.Value{}, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // Auto-discover columns from data (like SELECT *) autoColumns := make(map[string]bool) for _, result := range results { for columnName := range result.Values { autoColumns[columnName] = true } } // Combine auto-discovered and explicit columns columnSet := make(map[string]bool) // Add auto-discovered columns first (regular data columns) for col := range autoColumns { columnSet[col] = true } // Add explicit columns (may include system columns like _source) for _, col := range explicitColumns { columnSet[col] = true } // Build final column list columns := make([]string, 0, len(columnSet)) for col := range columnSet { columns = append(columns, col) } // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { row := make([]sqltypes.Value, len(columns)) for j, columnName := range columns { switch columnName { case SW_COLUMN_NAME_TIMESTAMP: row[j] = sqltypes.NewInt64(result.Timestamp) case SW_COLUMN_NAME_KEY: row[j] = sqltypes.NewVarBinary(string(result.Key)) case SW_COLUMN_NAME_SOURCE: row[j] = sqltypes.NewVarChar(result.Source) default: // Regular data column if value, exists := result.Values[columnName]; exists { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } } rows[i] = row } return &QueryResult{ Columns: columns, Rows: rows, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // ReadParquetStatistics efficiently reads column statistics from parquet files // without scanning the full file content - uses parquet's built-in metadata func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) { var fileStats []*ParquetFileStats // Use the same chunk cache as the logstore package chunkCache := chunk_cache.NewChunkCacheInMemory(256) lookupFileIdFn := filer.LookupFn(h.filerClient) err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Only process parquet files if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { return nil } // Extract statistics from this parquet file stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache) if err != nil { // Log error but continue processing other files fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err) return nil } if stats != nil { fileStats = append(fileStats, stats) } return nil }) return fileStats, err } // extractParquetFileStats extracts column statistics from a single parquet file func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) { // Create reader for the parquet file fileSize := filer.FileSize(entry) visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) defer parquetReader.Close() fileView := parquetReader.File() fileStats := &ParquetFileStats{ FileName: entry.Name, RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } // Get schema information schema := fileView.Schema() // Process each row group rowGroups := fileView.RowGroups() for _, rowGroup := range rowGroups { columnChunks := rowGroup.ColumnChunks() // Process each column chunk for i, chunk := range columnChunks { // Get column name from schema columnName := h.getColumnNameFromSchema(schema, i) if columnName == "" { continue } // Try to get column statistics columnIndex, err := chunk.ColumnIndex() if err != nil { // No column index available - skip this column continue } // Extract min/max values from the first page (for simplicity) // In a more sophisticated implementation, we could aggregate across all pages numPages := columnIndex.NumPages() if numPages == 0 { continue } minParquetValue := columnIndex.MinValue(0) maxParquetValue := columnIndex.MaxValue(numPages - 1) nullCount := int64(0) // Aggregate null counts across all pages for pageIdx := 0; pageIdx < numPages; pageIdx++ { nullCount += columnIndex.NullCount(pageIdx) } // Convert parquet values to schema_pb.Value minValue, err := h.convertParquetValueToSchemaValue(minParquetValue) if err != nil { continue } maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue) if err != nil { continue } // Store column statistics (aggregate across row groups if column already exists) if existingStats, exists := fileStats.ColumnStats[columnName]; exists { // Update existing statistics if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 { existingStats.MinValue = minValue } if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 { existingStats.MaxValue = maxValue } existingStats.NullCount += nullCount } else { // Create new column statistics fileStats.ColumnStats[columnName] = &ParquetColumnStats{ ColumnName: columnName, MinValue: minValue, MaxValue: maxValue, NullCount: nullCount, RowCount: rowGroup.NumRows(), } } } } return fileStats, nil } // getColumnNameFromSchema extracts column name from parquet schema by index func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string { // Get the leaf columns in order var columnNames []string h.collectColumnNames(schema.Fields(), &columnNames) if columnIndex >= 0 && columnIndex < len(columnNames) { return columnNames[columnIndex] } return "" } // collectColumnNames recursively collects leaf column names from schema func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) { for _, field := range fields { if len(field.Fields()) == 0 { // This is a leaf field (no sub-fields) *names = append(*names, field.Name()) } else { // This is a group - recurse h.collectColumnNames(field.Fields(), names) } } } // convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) { switch pv.Kind() { case parquet.Boolean: return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil case parquet.Int32: return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil case parquet.Int64: return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil case parquet.Float: return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil case parquet.Double: return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil case parquet.ByteArray: return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil default: return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind()) } } // compareSchemaValues compares two schema_pb.Value objects func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int { if v1 == nil && v2 == nil { return 0 } if v1 == nil { return -1 } if v2 == nil { return 1 } // Extract raw values and compare raw1 := h.extractRawValueFromSchema(v1) raw2 := h.extractRawValueFromSchema(v2) return h.compareRawValues(raw1, raw2) } // extractRawValueFromSchema extracts the raw value from schema_pb.Value func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} { switch v := value.Kind.(type) { case *schema_pb.Value_BoolValue: return v.BoolValue case *schema_pb.Value_Int32Value: return v.Int32Value case *schema_pb.Value_Int64Value: return v.Int64Value case *schema_pb.Value_FloatValue: return v.FloatValue case *schema_pb.Value_DoubleValue: return v.DoubleValue case *schema_pb.Value_BytesValue: return string(v.BytesValue) // Convert to string for comparison case *schema_pb.Value_StringValue: return v.StringValue } return nil } // compareRawValues compares two raw values func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int { // Handle nil cases if v1 == nil && v2 == nil { return 0 } if v1 == nil { return -1 } if v2 == nil { return 1 } // Compare based on type switch val1 := v1.(type) { case bool: if val2, ok := v2.(bool); ok { if val1 == val2 { return 0 } if val1 { return 1 } return -1 } case int32: if val2, ok := v2.(int32); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case int64: if val2, ok := v2.(int64); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case float32: if val2, ok := v2.(float32); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case float64: if val2, ok := v2.(float64); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case string: if val2, ok := v2.(string); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } } // Default: try string comparison str1 := fmt.Sprintf("%v", v1) str2 := fmt.Sprintf("%v", v2) if str1 < str2 { return -1 } else if str1 > str2 { return 1 } return 0 } // streamingMerge merges multiple sorted data sources using a heap-based approach // This provides memory-efficient merging without loading all data into memory func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) { if len(dataSources) == 0 { return nil, nil } var results []HybridScanResult mergeHeap := &StreamingMergeHeap{} heap.Init(mergeHeap) // Initialize heap with first item from each data source for i, source := range dataSources { if source.HasMore() { result, err := source.Next() if err != nil { // Close all sources and return error for _, s := range dataSources { s.Close() } return nil, fmt.Errorf("failed to read from data source %d: %v", i, err) } if result != nil { heap.Push(mergeHeap, &StreamingMergeItem{ Result: result, SourceID: i, DataSource: source, }) } } } // Process results in chronological order for mergeHeap.Len() > 0 { // Get next chronologically ordered result item := heap.Pop(mergeHeap).(*StreamingMergeItem) results = append(results, *item.Result) // Check limit if limit > 0 && len(results) >= limit { break } // Try to get next item from the same data source if item.DataSource.HasMore() { nextResult, err := item.DataSource.Next() if err != nil { // Log error but continue with other sources fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err) } else if nextResult != nil { heap.Push(mergeHeap, &StreamingMergeItem{ Result: nextResult, SourceID: item.SourceID, DataSource: item.DataSource, }) } } } // Close all data sources for _, source := range dataSources { source.Close() } return results, nil } // SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource // This is used for unflushed data that is already loaded into memory type SliceDataSource struct { results []HybridScanResult index int } func NewSliceDataSource(results []HybridScanResult) *SliceDataSource { return &SliceDataSource{ results: results, index: 0, } } func (s *SliceDataSource) Next() (*HybridScanResult, error) { if s.index >= len(s.results) { return nil, nil } result := &s.results[s.index] s.index++ return result, nil } func (s *SliceDataSource) HasMore() bool { return s.index < len(s.results) } func (s *SliceDataSource) Close() error { return nil // Nothing to clean up for slice-based source } // StreamingFlushedDataSource provides streaming access to flushed data type StreamingFlushedDataSource struct { hms *HybridMessageScanner partition topic.Partition options HybridScanOptions mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) resultChan chan *HybridScanResult errorChan chan error doneChan chan struct{} started bool finished bool closed int32 // atomic flag to prevent double close mu sync.RWMutex } func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource { mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition) return &StreamingFlushedDataSource{ hms: hms, partition: partition, options: options, mergedReadFn: mergedReadFn, resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance errorChan: make(chan error, 1), doneChan: make(chan struct{}), started: false, finished: false, } } func (s *StreamingFlushedDataSource) startStreaming() { if s.started { return } s.started = true go func() { defer func() { // Use atomic flag to ensure channels are only closed once if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { close(s.resultChan) close(s.errorChan) close(s.doneChan) } }() // Set up time range for scanning startTime := time.Unix(0, s.options.StartTimeNs) if s.options.StartTimeNs == 0 { startTime = time.Unix(0, 0) } stopTsNs := s.options.StopTimeNs // For SQL queries, stopTsNs = 0 means "no stop time restriction" // This is different from message queue consumers which want to stop at "now" // We detect SQL context by checking if we have a predicate function if stopTsNs == 0 && s.options.Predicate == nil { // Only set to current time for non-SQL queries (message queue consumers) stopTsNs = time.Now().UnixNano() } // If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning // Message processing function eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { // Skip control entries without actual data if s.hms.isControlEntry(logEntry) { return false, nil // Skip this entry } // Convert log entry to schema_pb.RecordValue for consistent processing recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry) if convertErr != nil { return false, fmt.Errorf("failed to convert log entry: %v", convertErr) } // Apply predicate filtering (WHERE clause) if s.options.Predicate != nil && !s.options.Predicate(recordValue) { return false, nil // Skip this message } // Extract system columns timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply column projection values := make(map[string]*schema_pb.Value) if len(s.options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { values[name] = value } } } else { // Select specified columns only for _, columnName := range s.options.Columns { if value, exists := recordValue.Fields[columnName]; exists { values[columnName] = value } } } result := &HybridScanResult{ Values: values, Timestamp: timestamp, Key: key, Source: source, } // Check if already closed before trying to send if atomic.LoadInt32(&s.closed) != 0 { return true, nil // Stop processing if closed } // Send result to channel with proper handling of closed channels select { case s.resultChan <- result: return false, nil case <-s.doneChan: return true, nil // Stop processing if closed default: // Check again if closed (in case it was closed between the atomic check and select) if atomic.LoadInt32(&s.closed) != 0 { return true, nil } // If not closed, try sending again with blocking select select { case s.resultChan <- result: return false, nil case <-s.doneChan: return true, nil } } } // Start scanning from the specified position startPosition := log_buffer.MessagePosition{Time: startTime} _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) if err != nil { // Only try to send error if not already closed if atomic.LoadInt32(&s.closed) == 0 { select { case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err): case <-s.doneChan: default: // Channel might be full or closed, ignore } } } s.finished = true }() } func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) { if !s.started { s.startStreaming() } select { case result, ok := <-s.resultChan: if !ok { return nil, nil // No more results } return result, nil case err := <-s.errorChan: return nil, err case <-s.doneChan: return nil, nil } } func (s *StreamingFlushedDataSource) HasMore() bool { if !s.started { return true // Haven't started yet, so potentially has data } return !s.finished || len(s.resultChan) > 0 } func (s *StreamingFlushedDataSource) Close() error { // Use atomic flag to ensure channels are only closed once if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { close(s.doneChan) close(s.resultChan) close(s.errorChan) } return nil } // mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) { if left < right { mid := left + (right-left)/2 // Recursively sort both halves hms.mergeSort(results, left, mid) hms.mergeSort(results, mid+1, right) // Merge the sorted halves hms.merge(results, left, mid, right) } } // merge combines two sorted subarrays into a single sorted array func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) { // Create temporary arrays for the two subarrays leftArray := make([]HybridScanResult, mid-left+1) rightArray := make([]HybridScanResult, right-mid) // Copy data to temporary arrays copy(leftArray, results[left:mid+1]) copy(rightArray, results[mid+1:right+1]) // Merge the temporary arrays back into results[left..right] i, j, k := 0, 0, left for i < len(leftArray) && j < len(rightArray) { if leftArray[i].Timestamp <= rightArray[j].Timestamp { results[k] = leftArray[i] i++ } else { results[k] = rightArray[j] j++ } k++ } // Copy remaining elements of leftArray, if any for i < len(leftArray) { results[k] = leftArray[i] i++ k++ } // Copy remaining elements of rightArray, if any for j < len(rightArray) { results[k] = rightArray[j] j++ k++ } }