package engine import ( "context" "encoding/json" "fmt" "strconv" "strings" "time" "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/protobuf/proto" ) // HybridMessageScanner scans from ALL data sources: // Architecture: // 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME // 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED // 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED // 4. Seamlessly merges data from all sources chronologically // 5. Provides complete real-time view of all messages in a topic type HybridMessageScanner struct { filerClient filer_pb.FilerClient brokerClient BrokerClientInterface // For querying unflushed data topic topic.Topic recordSchema *schema_pb.RecordType parquetLevels *schema.ParquetLevels } // NewHybridMessageScanner creates a scanner that reads from all data sources // This provides complete real-time message coverage including unflushed data func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string) (*HybridMessageScanner, error) { // Check if filerClient is available if filerClient == nil { return nil, fmt.Errorf("filerClient is required but not available") } // Create topic reference t := topic.Topic{ Namespace: namespace, Name: topicName, } // Read topic configuration to get schema var topicConf *mq_pb.ConfigureTopicResponse var err error if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { topicConf, err = t.ReadConfFile(client) return err }); err != nil { return nil, fmt.Errorf("failed to read topic config: %v", err) } // Build complete schema with system columns recordType := topicConf.GetRecordType() if recordType == nil { return nil, fmt.Errorf("topic %s.%s has no schema", namespace, topicName) } // Add system columns that MQ adds to all records recordType = schema.NewRecordTypeBuilder(recordType). WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) if err != nil { return nil, fmt.Errorf("failed to create Parquet levels: %v", err) } return &HybridMessageScanner{ filerClient: filerClient, brokerClient: brokerClient, topic: t, recordSchema: recordType, parquetLevels: parquetLevels, }, nil } // HybridScanOptions configure how the scanner reads from both live and archived data type HybridScanOptions struct { // Time range filtering (Unix nanoseconds) StartTimeNs int64 StopTimeNs int64 // Column projection - if empty, select all columns Columns []string // Row limit - 0 means no limit Limit int // Predicate for WHERE clause filtering Predicate func(*schema_pb.RecordValue) bool } // HybridScanResult represents a message from either live logs or Parquet files type HybridScanResult struct { Values map[string]*schema_pb.Value // Column name -> value Timestamp int64 // Message timestamp (_ts_ns) Key []byte // Message key (_key) Source string // "live_log" or "parquet_archive" } // ParquetColumnStats holds statistics for a single column from parquet metadata type ParquetColumnStats struct { ColumnName string MinValue *schema_pb.Value MaxValue *schema_pb.Value NullCount int64 RowCount int64 } // ParquetFileStats holds aggregated statistics for a parquet file type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats } // Scan reads messages from both live logs and archived Parquet files // Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration // Assumptions: // 1. Chronologically merges live and archived data // 2. Applies filtering at the lowest level for efficiency // 3. Handles schema evolution transparently func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) { var results []HybridScanResult // Get all partitions for this topic // RESOLVED TODO: Implement proper partition discovery via MQ broker partitions, err := hms.discoverTopicPartitions(ctx) if err != nil { // Fallback to default partition if discovery fails partitions = []topic.Partition{{RangeStart: 0, RangeStop: 1000}} } for _, partition := range partitions { partitionResults, err := hms.scanPartitionHybrid(ctx, partition, options) if err != nil { return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err) } results = append(results, partitionResults...) // Apply global limit across all partitions if options.Limit > 0 && len(results) >= options.Limit { results = results[:options.Limit] break } } return results, nil } // scanUnflushedData queries brokers for unflushed in-memory data func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { var results []HybridScanResult // Skip if no broker client available if hms.brokerClient == nil { return results, nil } // Get broker address for this partition // TODO: Implement proper broker discovery for partition // For now, assume broker client knows how to reach the right broker // Create a temporary slice to collect unflushed messages unflushedMessages := make([]*mq_pb.DataMessage, 0) // We need to call the broker to get unflushed data // For now, we'll implement this as a best-effort approach // In a full implementation, this would require a new gRPC method on the broker // TODO: Implement actual broker gRPC call to get unflushed data // Convert unflushed messages to HybridScanResult format for _, msg := range unflushedMessages { // Skip messages outside time range if options.StartTimeNs > 0 && msg.TsNs < options.StartTimeNs { continue } if options.StopTimeNs > 0 && msg.TsNs > options.StopTimeNs { continue } // Convert DataMessage to RecordValue format recordValue, _, err := hms.convertDataMessageToRecord(msg) if err != nil { continue // Skip malformed messages } // Apply predicate filter if provided if options.Predicate != nil && !options.Predicate(recordValue) { continue } // Convert to HybridScanResult result := HybridScanResult{ Values: recordValue.Fields, Timestamp: msg.TsNs, Key: msg.Key, Source: "in_memory_broker", } results = append(results, result) // Apply limit if options.Limit > 0 && len(results) >= options.Limit { break } } return results, nil } // convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { // Parse the message data as RecordValue recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(msg.Value, recordValue); err != nil { return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err) } // Add system columns if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } // Add timestamp recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs}, } return recordValue, string(msg.Key), nil } // discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem // This finds real partition directories like v2025-09-01-07-16-34/0000-0630/ func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { if hms.filerClient == nil { return nil, fmt.Errorf("filerClient not available for partition discovery") } var allPartitions []topic.Partition var err error // Scan the topic directory for actual partition versions (timestamped directories) // List all version directories in the topic directory err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { if !versionEntry.IsDirectory { return nil // Skip non-directories } // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name) if parseErr != nil { // Skip directories that don't match the version format return nil } // Scan partition directories within this version versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name) return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { if !partitionEntry.IsDirectory { return nil // Skip non-directories } // Parse partition boundary from directory name (e.g., "0000-0630") rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name) if rangeStart == rangeStop { return nil // Skip invalid partition names } // Create partition object partition := topic.Partition{ RangeStart: rangeStart, RangeStop: rangeStop, RingSize: topic.PartitionCount, UnixTimeNs: versionTime.UnixNano(), } allPartitions = append(allPartitions, partition) return nil }) }) if err != nil { return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err) } // If no partitions found, use fallback if len(allPartitions) == 0 { fmt.Printf("No partitions found in filesystem for topic %s, using default partition\n", hms.topic.String()) return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil } fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String()) return allPartitions, nil } // scanPartitionHybrid scans a specific partition using the hybrid approach // This is where the magic happens - seamlessly reading ALL data sources: // 1. Unflushed in-memory data from brokers (REAL-TIME) // 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED) func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { var results []HybridScanResult // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME) unflushedResults, err := hms.scanUnflushedData(ctx, partition, options) if err != nil { // Don't fail the query if broker scanning fails - just log and continue with disk data fmt.Printf("Warning: Failed to scan unflushed data from broker: %v\n", err) } else { results = append(results, unflushedResults...) } // STEP 2: Scan flushed data from disk (live logs + Parquet files) // Create the hybrid read function that combines live logs + Parquet files // This uses SeaweedFS MQ's own merged reading logic mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition) // Set up time range for scanning startTime := time.Unix(0, options.StartTimeNs) if options.StartTimeNs == 0 { startTime = time.Unix(0, 0) // Start from beginning if not specified } stopTsNs := options.StopTimeNs if stopTsNs == 0 { stopTsNs = time.Now().UnixNano() // Stop at current time if not specified } // Message processing function eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { // Convert log entry to schema_pb.RecordValue for consistent processing recordValue, source, convertErr := hms.convertLogEntryToRecordValue(logEntry) if convertErr != nil { return false, fmt.Errorf("failed to convert log entry: %v", convertErr) } // Apply predicate filtering (WHERE clause) if options.Predicate != nil && !options.Predicate(recordValue) { return false, nil // Skip this message } // Extract system columns timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply column projection values := make(map[string]*schema_pb.Value) if len(options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { values[name] = value } } } else { // Select specified columns only for _, columnName := range options.Columns { if value, exists := recordValue.Fields[columnName]; exists { values[columnName] = value } } } results = append(results, HybridScanResult{ Values: values, Timestamp: timestamp, Key: key, Source: source, }) // Apply row limit if options.Limit > 0 && len(results) >= options.Limit { return true, nil // Stop processing } return false, nil } // Only scan flushed data if we haven't reached the limit from unflushed data if options.Limit == 0 || len(results) < options.Limit { // Adjust limit for remaining capacity remainingLimit := options.Limit - len(results) if remainingLimit > 0 { // Create a copy of options with adjusted limit for flushed data flushedOptions := options flushedOptions.Limit = remainingLimit } // Start scanning from the specified position startPosition := log_buffer.MessagePosition{Time: startTime} _, _, err = mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) if err != nil { return nil, fmt.Errorf("flushed data scan failed: %v", err) } } // STEP 3: Sort results chronologically (unflushed + flushed data) // This ensures proper time ordering across all data sources if len(results) > 1 { // Simple sort by timestamp - in a full implementation, consider more efficient merging for i := 0; i < len(results)-1; i++ { for j := i + 1; j < len(results); j++ { if results[i].Timestamp > results[j].Timestamp { results[i], results[j] = results[j], results[i] } } } } // Apply final limit after merging and sorting if options.Limit > 0 && len(results) > options.Limit { results = results[:options.Limit] } return results, nil } // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue // This handles both: // 1. Live log entries (raw message format) // 2. Parquet entries (already in schema_pb.RecordValue format) func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { // Try to unmarshal as RecordValue first (Parquet format) recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { // This is an archived message from Parquet files // FIX: Add system columns from LogEntry to RecordValue if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } // Add system columns from LogEntry recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } return recordValue, "parquet_archive", nil } // If not a RecordValue, this is raw live message data // RESOLVED TODO: Implement proper schema-aware parsing based on topic schema return hms.parseRawMessageWithSchema(logEntry) } // parseRawMessageWithSchema parses raw live message data using the topic's schema // This provides proper type conversion and field mapping instead of treating everything as strings func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { recordValue := &schema_pb.RecordValue{ Fields: make(map[string]*schema_pb.Value), } // Add system columns (always present) recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } // Parse message data based on schema if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 { // Fallback: No schema available, treat as single "data" field recordValue.Fields["data"] = &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, } return recordValue, "live_log", nil } // Attempt schema-aware parsing // Strategy 1: Try JSON parsing first (most common for live messages) if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil { // Successfully parsed as JSON, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } // Strategy 2: Try protobuf parsing (binary messages) if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil { // Successfully parsed as protobuf, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } // Strategy 3: Fallback to single field with raw data // If schema has a single field, map the raw data to it with type conversion if len(hms.recordSchema.Fields) == 1 { field := hms.recordSchema.Fields[0] convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) if err == nil { recordValue.Fields[field.Name] = convertedValue return recordValue, "live_log", nil } } // Final fallback: treat as string data field recordValue.Fields["data"] = &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, } return recordValue, "live_log", nil } // parseJSONMessage attempts to parse raw data as JSON and map to schema fields func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) { // Try to parse as JSON var jsonData map[string]interface{} if err := json.Unmarshal(data, &jsonData); err != nil { return nil, fmt.Errorf("not valid JSON: %v", err) } recordValue := &schema_pb.RecordValue{ Fields: make(map[string]*schema_pb.Value), } // Map JSON fields to schema fields for _, schemaField := range hms.recordSchema.Fields { fieldName := schemaField.Name if jsonValue, exists := jsonData[fieldName]; exists { schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type) if err != nil { // Log conversion error but continue with other fields continue } recordValue.Fields[fieldName] = schemaValue } } return recordValue, nil } // parseProtobufMessage attempts to parse raw data as protobuf RecordValue func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) { // This might be a raw protobuf message that didn't parse correctly the first time // Try alternative protobuf unmarshaling approaches recordValue := &schema_pb.RecordValue{} // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue) if err := proto.Unmarshal(data, recordValue); err == nil { return recordValue, nil } // Strategy 2: Check if it's a different protobuf message type // For now, return error as we need more specific knowledge of MQ message formats return nil, fmt.Errorf("could not parse as protobuf RecordValue") } // convertRawDataToSchemaValue converts raw bytes to a specific schema type func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) { dataStr := string(data) switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: scalarType := fieldType.GetScalarType() switch scalarType { case schema_pb.ScalarType_STRING: return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: dataStr}, }, nil case schema_pb.ScalarType_INT32: if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)}, }, nil } case schema_pb.ScalarType_INT64: if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: val}, }, nil } case schema_pb.ScalarType_FLOAT: if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)}, }, nil } case schema_pb.ScalarType_DOUBLE: if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil { return &schema_pb.Value{ Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}, }, nil } case schema_pb.ScalarType_BOOL: lowerStr := strings.ToLower(strings.TrimSpace(dataStr)) if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: true}, }, nil } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: false}, }, nil } case schema_pb.ScalarType_BYTES: return &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: data}, }, nil } } return nil, fmt.Errorf("unsupported type conversion for %v", fieldType) } // convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: scalarType := fieldType.GetScalarType() switch scalarType { case schema_pb.ScalarType_STRING: if str, ok := jsonValue.(string); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: str}, }, nil } // Convert other types to string return &schema_pb.Value{ Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)}, }, nil case schema_pb.ScalarType_INT32: if num, ok := jsonValue.(float64); ok { // JSON numbers are float64 return &schema_pb.Value{ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)}, }, nil } case schema_pb.ScalarType_INT64: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)}, }, nil } case schema_pb.ScalarType_FLOAT: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)}, }, nil } case schema_pb.ScalarType_DOUBLE: if num, ok := jsonValue.(float64); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_DoubleValue{DoubleValue: num}, }, nil } case schema_pb.ScalarType_BOOL: if boolVal, ok := jsonValue.(bool); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal}, }, nil } case schema_pb.ScalarType_BYTES: if str, ok := jsonValue.(string); ok { return &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)}, }, nil } } } return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType) } // ConvertToSQLResult converts HybridScanResults to SQL query results func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult { if len(results) == 0 { return &QueryResult{ Columns: columns, Rows: [][]sqltypes.Value{}, } } // Determine columns if not specified if len(columns) == 0 { columnSet := make(map[string]bool) for _, result := range results { for columnName := range result.Values { columnSet[columnName] = true } } columns = make([]string, 0, len(columnSet)) for columnName := range columnSet { columns = append(columns, columnName) } } // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { row := make([]sqltypes.Value, len(columns)) for j, columnName := range columns { switch columnName { case "_source": row[j] = sqltypes.NewVarChar(result.Source) case "_timestamp_ns": row[j] = sqltypes.NewInt64(result.Timestamp) case "_key": row[j] = sqltypes.NewVarBinary(string(result.Key)) default: if value, exists := result.Values[columnName]; exists { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } } rows[i] = row } return &QueryResult{ Columns: columns, Rows: rows, } } // generateSampleHybridData creates sample data that simulates both live and archived messages func (hms *HybridMessageScanner) generateSampleHybridData(options HybridScanOptions) []HybridScanResult { now := time.Now().UnixNano() sampleData := []HybridScanResult{ // Simulated live log data (recent) { Values: map[string]*schema_pb.Value{ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, }, Timestamp: now - 300000000000, // 5 minutes ago Key: []byte("live-user-1003"), Source: "live_log", }, { Values: map[string]*schema_pb.Value{ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_action"}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"action": "click", "live": true}`}}, }, Timestamp: now - 120000000000, // 2 minutes ago Key: []byte("live-user-1004"), Source: "live_log", }, // Simulated archived Parquet data (older) { Values: map[string]*schema_pb.Value{ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, }, Timestamp: now - 3600000000000, // 1 hour ago Key: []byte("archived-user-1001"), Source: "parquet_archive", }, { Values: map[string]*schema_pb.Value{ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, }, Timestamp: now - 1800000000000, // 30 minutes ago Key: []byte("archived-user-1002"), Source: "parquet_archive", }, } // Apply predicate filtering if specified if options.Predicate != nil { var filtered []HybridScanResult for _, result := range sampleData { // Convert to RecordValue for predicate testing recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)} for k, v := range result.Values { recordValue.Fields[k] = v } recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} if options.Predicate(recordValue) { filtered = append(filtered, result) } } sampleData = filtered } // Apply limit if options.Limit > 0 && len(sampleData) > options.Limit { sampleData = sampleData[:options.Limit] } return sampleData } // ReadParquetStatistics efficiently reads column statistics from parquet files // without scanning the full file content - uses parquet's built-in metadata func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) { var fileStats []*ParquetFileStats // Use the same chunk cache as the logstore package chunkCache := chunk_cache.NewChunkCacheInMemory(256) lookupFileIdFn := filer.LookupFn(h.filerClient) err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Only process parquet files if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { return nil } // Extract statistics from this parquet file stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache) if err != nil { // Log error but continue processing other files fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err) return nil } if stats != nil { fileStats = append(fileStats, stats) } return nil }) return fileStats, err } // extractParquetFileStats extracts column statistics from a single parquet file func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) { // Create reader for the parquet file fileSize := filer.FileSize(entry) visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) // Create parquet reader - this only reads metadata, not data parquetReader := parquet.NewReader(readerAt) defer parquetReader.Close() fileView := parquetReader.File() fileStats := &ParquetFileStats{ FileName: entry.Name, RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } // Get schema information schema := fileView.Schema() // Process each row group rowGroups := fileView.RowGroups() for _, rowGroup := range rowGroups { columnChunks := rowGroup.ColumnChunks() // Process each column chunk for i, chunk := range columnChunks { // Get column name from schema columnName := h.getColumnNameFromSchema(schema, i) if columnName == "" { continue } // Try to get column statistics columnIndex, err := chunk.ColumnIndex() if err != nil { // No column index available - skip this column continue } // Extract min/max values from the first page (for simplicity) // In a more sophisticated implementation, we could aggregate across all pages numPages := columnIndex.NumPages() if numPages == 0 { continue } minParquetValue := columnIndex.MinValue(0) maxParquetValue := columnIndex.MaxValue(numPages - 1) nullCount := int64(0) // Aggregate null counts across all pages for pageIdx := 0; pageIdx < numPages; pageIdx++ { nullCount += columnIndex.NullCount(pageIdx) } // Convert parquet values to schema_pb.Value minValue, err := h.convertParquetValueToSchemaValue(minParquetValue) if err != nil { continue } maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue) if err != nil { continue } // Store column statistics (aggregate across row groups if column already exists) if existingStats, exists := fileStats.ColumnStats[columnName]; exists { // Update existing statistics if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 { existingStats.MinValue = minValue } if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 { existingStats.MaxValue = maxValue } existingStats.NullCount += nullCount } else { // Create new column statistics fileStats.ColumnStats[columnName] = &ParquetColumnStats{ ColumnName: columnName, MinValue: minValue, MaxValue: maxValue, NullCount: nullCount, RowCount: rowGroup.NumRows(), } } } } return fileStats, nil } // getColumnNameFromSchema extracts column name from parquet schema by index func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string { // Get the leaf columns in order var columnNames []string h.collectColumnNames(schema.Fields(), &columnNames) if columnIndex >= 0 && columnIndex < len(columnNames) { return columnNames[columnIndex] } return "" } // collectColumnNames recursively collects leaf column names from schema func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) { for _, field := range fields { if len(field.Fields()) == 0 { // This is a leaf field (no sub-fields) *names = append(*names, field.Name()) } else { // This is a group - recurse h.collectColumnNames(field.Fields(), names) } } } // convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) { switch pv.Kind() { case parquet.Boolean: return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil case parquet.Int32: return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil case parquet.Int64: return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil case parquet.Float: return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil case parquet.Double: return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil case parquet.ByteArray: return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil default: return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind()) } } // compareSchemaValues compares two schema_pb.Value objects func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int { if v1 == nil && v2 == nil { return 0 } if v1 == nil { return -1 } if v2 == nil { return 1 } // Extract raw values and compare raw1 := h.extractRawValueFromSchema(v1) raw2 := h.extractRawValueFromSchema(v2) return h.compareRawValues(raw1, raw2) } // extractRawValueFromSchema extracts the raw value from schema_pb.Value func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} { switch v := value.Kind.(type) { case *schema_pb.Value_BoolValue: return v.BoolValue case *schema_pb.Value_Int32Value: return v.Int32Value case *schema_pb.Value_Int64Value: return v.Int64Value case *schema_pb.Value_FloatValue: return v.FloatValue case *schema_pb.Value_DoubleValue: return v.DoubleValue case *schema_pb.Value_BytesValue: return string(v.BytesValue) // Convert to string for comparison case *schema_pb.Value_StringValue: return v.StringValue } return nil } // compareRawValues compares two raw values func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int { // Handle nil cases if v1 == nil && v2 == nil { return 0 } if v1 == nil { return -1 } if v2 == nil { return 1 } // Compare based on type switch val1 := v1.(type) { case bool: if val2, ok := v2.(bool); ok { if val1 == val2 { return 0 } if val1 { return 1 } return -1 } case int32: if val2, ok := v2.(int32); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case int64: if val2, ok := v2.(int64); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case float32: if val2, ok := v2.(float32); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case float64: if val2, ok := v2.(float64); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } case string: if val2, ok := v2.(string); ok { if val1 < val2 { return -1 } else if val1 > val2 { return 1 } return 0 } } // Default: try string comparison str1 := fmt.Sprintf("%v", v1) str2 := fmt.Sprintf("%v", v2) if str1 < str2 { return -1 } else if str1 > str2 { return 1 } return 0 }