From e385f0ce7d73a8e3f65a4d0a65091188251e62a2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 20:19:59 -0700 Subject: [PATCH] refactor --- weed/query/engine/aggregations.go | 551 +++++++++++++++++++++++++++ weed/query/engine/data_conversion.go | 217 +++++++++++ weed/query/engine/engine.go | 170 --------- weed/query/engine/errors.go | 36 ++ weed/query/engine/system_columns.go | 170 +++++++++ weed/query/engine/types.go | 62 +++ 6 files changed, 1036 insertions(+), 170 deletions(-) create mode 100644 weed/query/engine/aggregations.go create mode 100644 weed/query/engine/data_conversion.go create mode 100644 weed/query/engine/errors.go create mode 100644 weed/query/engine/system_columns.go create mode 100644 weed/query/engine/types.go diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go new file mode 100644 index 000000000..a4a9a70f4 --- /dev/null +++ b/weed/query/engine/aggregations.go @@ -0,0 +1,551 @@ +package engine + +import ( + "context" + "fmt" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/xwb1989/sqlparser" +) + +// AggregationSpec defines an aggregation function to be computed +type AggregationSpec struct { + Function string // COUNT, SUM, AVG, MIN, MAX + Column string // Column name, or "*" for COUNT(*) + Alias string // Optional alias for the result column + Distinct bool // Support for DISTINCT keyword +} + +// AggregationResult holds the computed result of an aggregation +type AggregationResult struct { + Count int64 + Sum float64 + Min interface{} + Max interface{} +} + +// AggregationStrategy represents the strategy for executing aggregations +type AggregationStrategy struct { + CanUseFastPath bool + Reason string + UnsupportedSpecs []AggregationSpec +} + +// TopicDataSources represents the data sources available for a topic +type TopicDataSources struct { + ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats + ParquetRowCount int64 + LiveLogRowCount int64 + PartitionsCount int +} + +// FastPathOptimizer handles fast path aggregation optimization decisions +type FastPathOptimizer struct { + engine *SQLEngine +} + +// NewFastPathOptimizer creates a new fast path optimizer +func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer { + return &FastPathOptimizer{engine: engine} +} + +// DetermineStrategy analyzes aggregations and determines if fast path can be used +func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy { + strategy := AggregationStrategy{ + CanUseFastPath: true, + Reason: "all_aggregations_supported", + UnsupportedSpecs: []AggregationSpec{}, + } + + for _, spec := range aggregations { + if !opt.engine.canUseParquetStatsForAggregation(spec) { + strategy.CanUseFastPath = false + strategy.Reason = "unsupported_aggregation_functions" + strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec) + } + } + + return strategy +} + +// CollectDataSources gathers information about available data sources for a topic +func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + dataSources := &TopicDataSources{ + ParquetFiles: make(map[string][]*ParquetFileStats), + ParquetRowCount: 0, + LiveLogRowCount: 0, + PartitionsCount: 0, + } + + // Discover partitions for the topic + relativePartitions, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) + if err != nil { + return dataSources, DataSourceError{ + Source: "partition_discovery", + Cause: err, + } + } + + topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) + + // Collect stats from each partition + for _, relPartition := range relativePartitions { + partitionPath := fmt.Sprintf("%s/%s", topicBasePath, relPartition) + + // Read parquet file statistics + parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath) + if err == nil && len(parquetStats) > 0 { + dataSources.ParquetFiles[partitionPath] = parquetStats + for _, stat := range parquetStats { + dataSources.ParquetRowCount += stat.RowCount + } + } + + // Count live log files + liveLogCount, _ := opt.engine.countLiveLogFiles(partitionPath, dataSources.ParquetFiles[partitionPath]) + dataSources.LiveLogRowCount += liveLogCount + } + + dataSources.PartitionsCount = len(relativePartitions) + + return dataSources, nil +} + +// AggregationComputer handles the computation of aggregations using fast path +type AggregationComputer struct { + engine *SQLEngine +} + +// NewAggregationComputer creates a new aggregation computer +func NewAggregationComputer(engine *SQLEngine) *AggregationComputer { + return &AggregationComputer{engine: engine} +} + +// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data +func (comp *AggregationComputer) ComputeFastPathAggregations( + ctx context.Context, + aggregations []AggregationSpec, + dataSources *TopicDataSources, + partitions []string, +) ([]AggregationResult, error) { + + aggResults := make([]AggregationResult, len(aggregations)) + + for i, spec := range aggregations { + switch spec.Function { + case "COUNT": + if spec.Column == "*" { + aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + } else { + // For specific columns, we might need to account for NULLs in the future + aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + } + + case "MIN": + globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions) + if err != nil { + return nil, AggregationError{ + Operation: spec.Function, + Column: spec.Column, + Cause: err, + } + } + aggResults[i].Min = globalMin + + case "MAX": + globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions) + if err != nil { + return nil, AggregationError{ + Operation: spec.Function, + Column: spec.Column, + Cause: err, + } + } + aggResults[i].Max = globalMax + + default: + return nil, OptimizationError{ + Strategy: "fast_path_aggregation", + Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function), + } + } + } + + return aggResults, nil +} + +// computeGlobalMin computes the global minimum value across all data sources +func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { + var globalMin interface{} + var globalMinValue *schema_pb.Value + hasParquetStats := false + + // Step 1: Get minimum from parquet statistics + for _, fileStats := range dataSources.ParquetFiles { + for _, fileStat := range fileStats { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MinValue != nil { + if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { + globalMinValue = colStats.MinValue + extractedValue := comp.engine.extractRawValue(colStats.MinValue) + if extractedValue != nil { + globalMin = extractedValue + hasParquetStats = true + } + } + } + } + } + + // Step 2: Get minimum from live log data (only if no live logs or if we need to compare) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } + + liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } + + if liveLogMin != nil { + if globalMin == nil { + globalMin = liveLogMin + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { + globalMin = liveLogMin + globalMinValue = liveLogSchemaValue + } + } + } + } + } + + // Step 3: Handle system columns if no regular data found + if globalMin == nil && !hasParquetStats { + globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) + } + + return globalMin, nil +} + +// computeGlobalMax computes the global maximum value across all data sources +func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { + var globalMax interface{} + var globalMaxValue *schema_pb.Value + hasParquetStats := false + + // Step 1: Get maximum from parquet statistics + for _, fileStats := range dataSources.ParquetFiles { + for _, fileStat := range fileStats { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MaxValue != nil { + if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { + globalMaxValue = colStats.MaxValue + extractedValue := comp.engine.extractRawValue(colStats.MaxValue) + if extractedValue != nil { + globalMax = extractedValue + hasParquetStats = true + } + } + } + } + } + + // Step 2: Get maximum from live log data (only if live logs exist) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } + + _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } + + if liveLogMax != nil { + if globalMax == nil { + globalMax = liveLogMax + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { + globalMax = liveLogMax + globalMaxValue = liveLogSchemaValue + } + } + } + } + } + + // Step 3: Handle system columns if no regular data found + if globalMax == nil && !hasParquetStats { + globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) + } + + return globalMax, nil +} + +// executeAggregationQuery handles SELECT queries with aggregation functions +func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) { + // Parse WHERE clause for filtering + var predicate func(*schema_pb.RecordValue) bool + var err error + if stmt.Where != nil { + predicate, err = e.buildPredicate(stmt.Where.Expr) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Extract time filters for optimization + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + // FAST PATH: Try to use parquet statistics for optimization + // This can be ~130x faster than scanning all data + if stmt.Where == nil { // Only optimize when no complex WHERE clause + fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) + if canOptimize { + fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") + return fastResult, nil + } + } + + // SLOW PATH: Fall back to full table scan + fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") + + // Build scan options for full table scan (aggregations need all data) + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, + StopTimeNs: stopTimeNs, + Limit: 0, // No limit for aggregations - need all data + Predicate: predicate, + } + + // Execute the hybrid scan to get all matching records + results, err := hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Compute aggregations + aggResults := e.computeAggregations(results, aggregations) + + // Build result set + columns := make([]string, len(aggregations)) + row := make([]sqltypes.Value, len(aggregations)) + + for i, spec := range aggregations { + columns[i] = spec.Alias + row[i] = e.formatAggregationResult(spec, aggResults[i]) + } + + return &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{row}, + }, nil +} + +// tryFastParquetAggregation attempts to compute aggregations using hybrid approach: +// - Use parquet metadata for parquet files +// - Count live log files for live data +// - Combine both for accurate results per partition +// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used +func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { + // Use the new modular components + optimizer := NewFastPathOptimizer(e) + computer := NewAggregationComputer(e) + + // Step 1: Determine strategy + strategy := optimizer.DetermineStrategy(aggregations) + if !strategy.CanUseFastPath { + return nil, false + } + + // Step 2: Collect data sources + dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + if err != nil { + return nil, false + } + + // Build partition list for aggregation computer + relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) + if err != nil { + return nil, false + } + + topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) + partitions := make([]string, len(relativePartitions)) + for i, relPartition := range relativePartitions { + partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) + } + + // Debug: Show the hybrid optimization results + if dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 { + partitionsWithLiveLogs := 0 + if dataSources.LiveLogRowCount > 0 { + partitionsWithLiveLogs = 1 // Simplified for now + } + fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n", + dataSources.ParquetRowCount, dataSources.LiveLogRowCount, partitionsWithLiveLogs) + } + + // Step 3: Compute aggregations using fast path + aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions) + if err != nil { + return nil, false + } + + // Step 4: Build final query result + columns := make([]string, len(aggregations)) + row := make([]sqltypes.Value, len(aggregations)) + + for i, spec := range aggregations { + columns[i] = spec.Alias + row[i] = e.formatAggregationResult(spec, aggResults[i]) + } + + result := &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{row}, + } + + return result, true +} + +// computeAggregations computes aggregation results from a full table scan +func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult { + aggResults := make([]AggregationResult, len(aggregations)) + + for i, spec := range aggregations { + switch spec.Function { + case "COUNT": + if spec.Column == "*" { + aggResults[i].Count = int64(len(results)) + } else { + count := int64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) { + count++ + } + } + aggResults[i].Count = count + } + + case "SUM": + sum := float64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if numValue := e.convertToNumber(value); numValue != nil { + sum += *numValue + } + } + } + aggResults[i].Sum = sum + + case "AVG": + sum := float64(0) + count := int64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if numValue := e.convertToNumber(value); numValue != nil { + sum += *numValue + count++ + } + } + } + if count > 0 { + aggResults[i].Sum = sum / float64(count) // Store average in Sum field + aggResults[i].Count = count + } + + case "MIN": + var min interface{} + var minValue *schema_pb.Value + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if minValue == nil || e.compareValues(value, minValue) < 0 { + minValue = value + min = e.extractRawValue(value) + } + } + } + aggResults[i].Min = min + + case "MAX": + var max interface{} + var maxValue *schema_pb.Value + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if maxValue == nil || e.compareValues(value, maxValue) > 0 { + maxValue = value + max = e.extractRawValue(value) + } + } + } + aggResults[i].Max = max + } + } + + return aggResults +} + +// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats +func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { + switch spec.Function { + case "COUNT": + return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) + case "MIN", "MAX": + return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) + case "SUM", "AVG": + // These require scanning actual values, not just min/max + return false + default: + return false + } +} diff --git a/weed/query/engine/data_conversion.go b/weed/query/engine/data_conversion.go new file mode 100644 index 000000000..f626d8f2e --- /dev/null +++ b/weed/query/engine/data_conversion.go @@ -0,0 +1,217 @@ +package engine + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" +) + +// formatAggregationResult formats an aggregation result into a SQL value +func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { + switch spec.Function { + case "COUNT": + return sqltypes.NewInt64(result.Count) + case "SUM": + return sqltypes.NewFloat64(result.Sum) + case "AVG": + return sqltypes.NewFloat64(result.Sum) // Sum contains the average for AVG + case "MIN": + if result.Min != nil { + return e.convertRawValueToSQL(result.Min) + } + return sqltypes.NULL + case "MAX": + if result.Max != nil { + return e.convertRawValueToSQL(result.Max) + } + return sqltypes.NULL + } + return sqltypes.NULL +} + +// convertRawValueToSQL converts a raw Go value to a SQL value +func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value { + switch v := value.(type) { + case int32: + return sqltypes.NewInt32(v) + case int64: + return sqltypes.NewInt64(v) + case float32: + return sqltypes.NewFloat32(v) + case float64: + return sqltypes.NewFloat64(v) + case string: + return sqltypes.NewVarChar(v) + case bool: + if v { + return sqltypes.NewVarChar("1") + } + return sqltypes.NewVarChar("0") + } + return sqltypes.NULL +} + +// extractRawValue extracts the raw Go value from a schema_pb.Value +func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} { + switch v := value.Kind.(type) { + case *schema_pb.Value_Int32Value: + return v.Int32Value + case *schema_pb.Value_Int64Value: + return v.Int64Value + case *schema_pb.Value_FloatValue: + return v.FloatValue + case *schema_pb.Value_DoubleValue: + return v.DoubleValue + case *schema_pb.Value_StringValue: + return v.StringValue + case *schema_pb.Value_BoolValue: + return v.BoolValue + case *schema_pb.Value_BytesValue: + return string(v.BytesValue) // Convert bytes to string for comparison + } + return nil +} + +// compareValues compares two schema_pb.Value objects +func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int { + if value2 == nil { + return 1 // value1 > nil + } + raw1 := e.extractRawValue(value1) + raw2 := e.extractRawValue(value2) + if raw1 == nil { + return -1 + } + if raw2 == nil { + return 1 + } + + // Simple comparison - in a full implementation this would handle type coercion + switch v1 := raw1.(type) { + case int32: + if v2, ok := raw2.(int32); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case int64: + if v2, ok := raw2.(int64); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case float32: + if v2, ok := raw2.(float32); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case float64: + if v2, ok := raw2.(float64); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case string: + if v2, ok := raw2.(string); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case bool: + if v2, ok := raw2.(bool); ok { + if v1 == v2 { + return 0 + } else if v1 && !v2 { + return 1 + } + return -1 + } + } + return 0 +} + +// convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison +func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value { + switch v := rawValue.(type) { + case int32: + return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}} + case int64: + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}} + case float32: + return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}} + case float64: + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} + case string: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} + case bool: + return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} + case []byte: + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}} + default: + // Convert other types to string as fallback + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} + } +} + +// convertJSONValueToSchemaValue converts JSON values to schema_pb.Value +func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value { + switch v := jsonValue.(type) { + case string: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} + case float64: + // JSON numbers are always float64, try to detect if it's actually an integer + if v == float64(int64(v)) { + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}} + } + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} + case bool: + return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} + case nil: + return nil + default: + // Convert other types to string + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} + } +} + +// Helper functions for aggregation processing + +// isNullValue checks if a schema_pb.Value is null or empty +func (e *SQLEngine) isNullValue(value *schema_pb.Value) bool { + return value == nil || value.Kind == nil +} + +// convertToNumber converts a schema_pb.Value to a float64 for numeric operations +func (e *SQLEngine) convertToNumber(value *schema_pb.Value) *float64 { + switch v := value.Kind.(type) { + case *schema_pb.Value_Int32Value: + result := float64(v.Int32Value) + return &result + case *schema_pb.Value_Int64Value: + result := float64(v.Int64Value) + return &result + case *schema_pb.Value_FloatValue: + result := float64(v.FloatValue) + return &result + case *schema_pb.Value_DoubleValue: + return &v.DoubleValue + } + return nil +} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 9c7079f2b..f0a8e8f2a 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -31,30 +31,6 @@ type SQLEngine struct { catalog *SchemaCatalog } -// QueryExecutionPlan contains information about how a query was executed -type QueryExecutionPlan struct { - QueryType string `json:"query_type"` // SELECT, INSERT, etc. - ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid - DataSources []string `json:"data_sources"` // parquet_files, live_logs - PartitionsScanned int `json:"partitions_scanned"` - ParquetFilesScanned int `json:"parquet_files_scanned"` - LiveLogFilesScanned int `json:"live_log_files_scanned"` - TotalRowsProcessed int64 `json:"total_rows_processed"` - OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc. - TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"` - Aggregations []string `json:"aggregations,omitempty"` - ExecutionTimeMs float64 `json:"execution_time_ms"` - Details map[string]interface{} `json:"details,omitempty"` -} - -// QueryResult represents the result of a SQL query execution -type QueryResult struct { - Columns []string `json:"columns"` - Rows [][]sqltypes.Value `json:"rows"` - Error error `json:"error,omitempty"` - ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"` -} - // NewSQLEngine creates a new SQL execution engine // Uses master address for service discovery and initialization func NewSQLEngine(masterAddress string) *SQLEngine { @@ -1515,152 +1491,6 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR return result, nil } -// AggregationSpec defines an aggregation function to be computed -type AggregationSpec struct { - Function string // COUNT, SUM, AVG, MIN, MAX - Column string // Column name, or "*" for COUNT(*) - Alias string // Optional alias for the result column - Distinct bool // Support for DISTINCT keyword -} - -// AggregationResult holds the computed result of an aggregation -type AggregationResult struct { - Count int64 - Sum float64 - Min interface{} - Max interface{} -} - -// AggregationStrategy represents the strategy for executing aggregations -type AggregationStrategy struct { - CanUseFastPath bool - Reason string - UnsupportedSpecs []AggregationSpec -} - -// TopicDataSources represents the data sources available for a topic -type TopicDataSources struct { - ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats - ParquetRowCount int64 - LiveLogRowCount int64 - PartitionsCount int -} - -// FastPathOptimizer handles fast path aggregation optimization decisions -type FastPathOptimizer struct { - engine *SQLEngine -} - -// Error types for better error handling and testing -type AggregationError struct { - Operation string - Column string - Cause error -} - -func (e AggregationError) Error() string { - return fmt.Sprintf("aggregation error in %s(%s): %v", e.Operation, e.Column, e.Cause) -} - -type DataSourceError struct { - Source string - Cause error -} - -func (e DataSourceError) Error() string { - return fmt.Sprintf("data source error in %s: %v", e.Source, e.Cause) -} - -type OptimizationError struct { - Strategy string - Reason string -} - -func (e OptimizationError) Error() string { - return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason) -} - -// NewFastPathOptimizer creates a new fast path optimizer -func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer { - return &FastPathOptimizer{engine: engine} -} - -// DetermineStrategy analyzes aggregations and determines if fast path can be used -func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy { - strategy := AggregationStrategy{ - CanUseFastPath: true, - Reason: "all_aggregations_supported", - UnsupportedSpecs: []AggregationSpec{}, - } - - for _, spec := range aggregations { - if !opt.engine.canUseParquetStatsForAggregation(spec) { - strategy.CanUseFastPath = false - strategy.Reason = "unsupported_aggregation_functions" - strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec) - } - } - - return strategy -} - -// CollectDataSources gathers information about available data sources for a topic -func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { - // Get all partitions for this topic - relativePartitions, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) - if err != nil { - return nil, DataSourceError{ - Source: fmt.Sprintf("partition_discovery:%s.%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name), - Cause: err, - } - } - - // Convert relative partition paths to full paths - topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) - partitions := make([]string, len(relativePartitions)) - for i, relPartition := range relativePartitions { - partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) - } - - // Collect statistics from all partitions - dataSources := &TopicDataSources{ - ParquetFiles: make(map[string][]*ParquetFileStats), - ParquetRowCount: 0, - LiveLogRowCount: 0, - PartitionsCount: len(partitions), - } - - for _, partition := range partitions { - partitionPath := partition - - // Get parquet file statistics - fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath) - if err != nil { - fileStats = []*ParquetFileStats{} // Empty stats, but continue - } - - if len(fileStats) > 0 { - dataSources.ParquetFiles[partitionPath] = fileStats - for _, fileStat := range fileStats { - dataSources.ParquetRowCount += fileStat.RowCount - } - } - - // Get parquet source files for deduplication - parquetSourceFiles := opt.engine.extractParquetSourceFiles(fileStats) - - // Count live log rows (excluding parquet-converted files) - liveLogRowCount, err := opt.engine.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles) - if err != nil { - liveLogRowCount = 0 // No live logs is acceptable - } - - dataSources.LiveLogRowCount += liveLogRowCount - } - - return dataSources, nil -} - // AggregationComputer handles the computation of aggregations using fast path type AggregationComputer struct { engine *SQLEngine diff --git a/weed/query/engine/errors.go b/weed/query/engine/errors.go new file mode 100644 index 000000000..6bc3e9c21 --- /dev/null +++ b/weed/query/engine/errors.go @@ -0,0 +1,36 @@ +package engine + +import "fmt" + +// Error types for better error handling and testing + +// AggregationError represents errors that occur during aggregation computation +type AggregationError struct { + Operation string + Column string + Cause error +} + +func (e AggregationError) Error() string { + return fmt.Sprintf("aggregation error in %s(%s): %v", e.Operation, e.Column, e.Cause) +} + +// DataSourceError represents errors that occur when accessing data sources +type DataSourceError struct { + Source string + Cause error +} + +func (e DataSourceError) Error() string { + return fmt.Sprintf("data source error in %s: %v", e.Source, e.Cause) +} + +// OptimizationError represents errors that occur during query optimization +type OptimizationError struct { + Strategy string + Reason string +} + +func (e OptimizationError) Error() string { + return fmt.Sprintf("optimization failed for %s: %s", e.Strategy, e.Reason) +} diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go new file mode 100644 index 000000000..c675e73c0 --- /dev/null +++ b/weed/query/engine/system_columns.go @@ -0,0 +1,170 @@ +package engine + +import ( + "regexp" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) +func (e *SQLEngine) isSystemColumn(columnName string) bool { + lowerName := strings.ToLower(columnName) + return lowerName == "_timestamp_ns" || lowerName == "timestamp_ns" || + lowerName == "_key" || lowerName == "key" || + lowerName == "_source" || lowerName == "source" +} + +// isRegularColumn checks if a column might be a regular data column (placeholder) +func (e *SQLEngine) isRegularColumn(columnName string) bool { + // For now, assume any non-system column is a regular column + return !e.isSystemColumn(columnName) +} + +// getSystemColumnGlobalMin computes global min for system columns using file metadata +func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { + lowerName := strings.ToLower(columnName) + + switch lowerName { + case "_timestamp_ns", "timestamp_ns": + // For timestamps, find the earliest timestamp across all files + // This should match what's in the Extended["min"] metadata + var minTimestamp *int64 + for _, fileStats := range allFileStats { + for _, fileStat := range fileStats { + // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) + timestamp := e.extractTimestampFromFilename(fileStat.FileName) + if timestamp != 0 { + if minTimestamp == nil || timestamp < *minTimestamp { + minTimestamp = ×tamp + } + } + } + } + if minTimestamp != nil { + return *minTimestamp + } + + case "_key", "key": + // For keys, we'd need to read the actual parquet column stats + // Fall back to scanning if not available in our current stats + return nil + + case "_source", "source": + // Source is always "parquet_archive" for parquet files + return "parquet_archive" + } + + return nil +} + +// getSystemColumnGlobalMax computes global max for system columns using file metadata +func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { + lowerName := strings.ToLower(columnName) + + switch lowerName { + case "_timestamp_ns", "timestamp_ns": + // For timestamps, find the latest timestamp across all files + // This should match what's in the Extended["max"] metadata + var maxTimestamp *int64 + for _, fileStats := range allFileStats { + for _, fileStat := range fileStats { + // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) + timestamp := e.extractTimestampFromFilename(fileStat.FileName) + if timestamp != 0 { + if maxTimestamp == nil || timestamp > *maxTimestamp { + maxTimestamp = ×tamp + } + } + } + } + if maxTimestamp != nil { + return *maxTimestamp + } + + case "_key", "key": + // For keys, we'd need to read the actual parquet column stats + // Fall back to scanning if not available in our current stats + return nil + + case "_source", "source": + // Source is always "parquet_archive" for parquet files + return "parquet_archive" + } + + return nil +} + +// extractTimestampFromFilename extracts timestamp from parquet filename +func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { + // Expected format: YYYY-MM-DD-HH-MM-SS.parquet or similar + // Try to parse timestamp from filename + re := regexp.MustCompile(`(\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2})`) + matches := re.FindStringSubmatch(filename) + if len(matches) > 1 { + timestampStr := matches[1] + // Convert to time and then to nanoseconds + t, err := time.Parse("2006-01-02-15-04-05", timestampStr) + if err == nil { + return t.UnixNano() + } + } + + // Fallback: try to parse as unix timestamp if filename is numeric + if timestampStr := strings.TrimSuffix(filename, ".parquet"); timestampStr != filename { + if timestamp, err := strconv.ParseInt(timestampStr, 10, 64); err == nil { + // Assume it's already in nanoseconds + return timestamp + } + } + + return 0 +} + +// findColumnValue performs case-insensitive lookup of column values +// Now includes support for system columns stored in HybridScanResult +func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { + lowerName := strings.ToLower(columnName) + + // Check system columns first + switch lowerName { + case "_timestamp_ns", "timestamp_ns": + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}, + } + case "_key", "key": + return &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}, + } + case "_source", "source": + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: result.Source}, + } + } + + // Check regular columns in the record data + if result.RecordValue != nil { + recordValue, ok := result.RecordValue.Kind.(*schema_pb.Value_RecordValue) + if !ok { + return nil + } + + if recordValue.RecordValue.Fields != nil { + // Try exact match first + if value, exists := recordValue.RecordValue.Fields[columnName]; exists { + return value + } + + // Try case-insensitive match + for fieldName, value := range recordValue.RecordValue.Fields { + if strings.EqualFold(fieldName, columnName) { + return value + } + } + } + } + + return nil +} diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go new file mode 100644 index 000000000..f0be49fb9 --- /dev/null +++ b/weed/query/engine/types.go @@ -0,0 +1,62 @@ +package engine + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" +) + +// QueryExecutionPlan contains information about how a query was executed +type QueryExecutionPlan struct { + QueryType string + ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid + DataSources []string `json:"data_sources"` // parquet_files, live_logs + PartitionsScanned int `json:"partitions_scanned"` + ParquetFilesScanned int `json:"parquet_files_scanned"` + LiveLogFilesScanned int `json:"live_log_files_scanned"` + TotalRowsProcessed int64 `json:"total_rows_processed"` + OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc. + TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"` + Aggregations []string `json:"aggregations,omitempty"` + ExecutionTimeMs float64 `json:"execution_time_ms"` + Details map[string]interface{} `json:"details,omitempty"` +} + +// QueryResult represents the result of a SQL query execution +type QueryResult struct { + Columns []string `json:"columns"` + Rows [][]sqltypes.Value `json:"rows"` + Error error `json:"error,omitempty"` + ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"` +} + +// ParquetColumnStats holds statistics for a single column in a Parquet file +type ParquetColumnStats struct { + ColumnName string + MinValue *schema_pb.Value + MaxValue *schema_pb.Value + NullCount int64 + RowCount int64 +} + +// ParquetFileStats holds statistics for a single Parquet file +type ParquetFileStats struct { + FileName string + RowCount int64 + ColumnStats map[string]*ParquetColumnStats +} + +// HybridScanResult represents a single record from hybrid scanning +type HybridScanResult struct { + RecordValue *schema_pb.Value + Source string // "live_log", "parquet_archive" + Timestamp int64 + Key []byte +} + +// HybridScanOptions configures how the hybrid scanner operates +type HybridScanOptions struct { + StartTimeNs int64 + StopTimeNs int64 + Limit int + Predicate func(*schema_pb.RecordValue) bool +}