package engine import ( "context" "fmt" "math" "strconv" "strings" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util" ) // AggregationSpec defines an aggregation function to be computed type AggregationSpec struct { Function string // COUNT, SUM, AVG, MIN, MAX Column string // Column name, or "*" for COUNT(*) Alias string // Optional alias for the result column Distinct bool // Support for DISTINCT keyword } // AggregationResult holds the computed result of an aggregation type AggregationResult struct { Count int64 Sum float64 Min interface{} Max interface{} } // AggregationStrategy represents the strategy for executing aggregations type AggregationStrategy struct { CanUseFastPath bool Reason string UnsupportedSpecs []AggregationSpec } // TopicDataSources represents the data sources available for a topic type TopicDataSources struct { ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats ParquetRowCount int64 LiveLogRowCount int64 LiveLogFilesCount int // Total count of live log files across all partitions PartitionsCount int } // FastPathOptimizer handles fast path aggregation optimization decisions type FastPathOptimizer struct { engine *SQLEngine } // NewFastPathOptimizer creates a new fast path optimizer func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer { return &FastPathOptimizer{engine: engine} } // DetermineStrategy analyzes aggregations and determines if fast path can be used func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy { strategy := AggregationStrategy{ CanUseFastPath: true, Reason: "all_aggregations_supported", UnsupportedSpecs: []AggregationSpec{}, } for _, spec := range aggregations { if !opt.engine.canUseParquetStatsForAggregation(spec) { strategy.CanUseFastPath = false strategy.Reason = "unsupported_aggregation_functions" strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec) } } return strategy } // CollectDataSources gathers information about available data sources for a topic func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { dataSources := &TopicDataSources{ ParquetFiles: make(map[string][]*ParquetFileStats), ParquetRowCount: 0, LiveLogRowCount: 0, LiveLogFilesCount: 0, PartitionsCount: 0, } if isDebugMode(ctx) { fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name) } // Discover partitions for the topic partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) if err != nil { if isDebugMode(ctx) { fmt.Printf("ERROR: Partition discovery failed: %v\n", err) } return dataSources, DataSourceError{ Source: "partition_discovery", Cause: err, } } // DEBUG: Log discovered partitions if isDebugMode(ctx) { fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths) } // Collect stats from each partition // Note: discoverTopicPartitions always returns absolute paths starting with "/topics/" for _, partitionPath := range partitionPaths { if isDebugMode(ctx) { fmt.Printf("\nProcessing partition: %s\n", partitionPath) } // Read parquet file statistics parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath) if err != nil { if isDebugMode(ctx) { fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err) } } else if len(parquetStats) == 0 { if isDebugMode(ctx) { fmt.Printf(" No parquet files found in partition\n") } } else { dataSources.ParquetFiles[partitionPath] = parquetStats partitionParquetRows := int64(0) for _, stat := range parquetStats { partitionParquetRows += stat.RowCount dataSources.ParquetRowCount += stat.RowCount } if isDebugMode(ctx) { fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) } } // Count live log files (excluding those converted to parquet) parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath]) liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources) if liveLogErr != nil { if isDebugMode(ctx) { fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr) } } else { dataSources.LiveLogRowCount += liveLogCount if isDebugMode(ctx) { fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources)) } } // Count live log files for partition with proper range values // Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520") partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:] partitionParts := strings.Split(partitionName, "-") if len(partitionParts) == 2 { rangeStart, err1 := strconv.Atoi(partitionParts[0]) rangeStop, err2 := strconv.Atoi(partitionParts[1]) if err1 == nil && err2 == nil { partition := topic.Partition{ RangeStart: int32(rangeStart), RangeStop: int32(rangeStop), } liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition) if err == nil { dataSources.LiveLogFilesCount += liveLogFileCount } } } } dataSources.PartitionsCount = len(partitionPaths) if isDebugMode(ctx) { fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows\n", dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount) } return dataSources, nil } // AggregationComputer handles the computation of aggregations using fast path type AggregationComputer struct { engine *SQLEngine } // NewAggregationComputer creates a new aggregation computer func NewAggregationComputer(engine *SQLEngine) *AggregationComputer { return &AggregationComputer{engine: engine} } // ComputeFastPathAggregations computes aggregations using parquet statistics and live log data func (comp *AggregationComputer) ComputeFastPathAggregations( ctx context.Context, aggregations []AggregationSpec, dataSources *TopicDataSources, partitions []string, ) ([]AggregationResult, error) { aggResults := make([]AggregationResult, len(aggregations)) for i, spec := range aggregations { switch spec.Function { case FuncCOUNT: if spec.Column == "*" { aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount } else { // For specific columns, we might need to account for NULLs in the future aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount } case FuncMIN: globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions) if err != nil { return nil, AggregationError{ Operation: spec.Function, Column: spec.Column, Cause: err, } } aggResults[i].Min = globalMin case FuncMAX: globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions) if err != nil { return nil, AggregationError{ Operation: spec.Function, Column: spec.Column, Cause: err, } } aggResults[i].Max = globalMax default: return nil, OptimizationError{ Strategy: "fast_path_aggregation", Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function), } } } return aggResults, nil } // computeGlobalMin computes the global minimum value across all data sources func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { var globalMin interface{} var globalMinValue *schema_pb.Value hasParquetStats := false // Step 1: Get minimum from parquet statistics for _, fileStats := range dataSources.ParquetFiles { for _, fileStat := range fileStats { // Try case-insensitive column lookup var colStats *ParquetColumnStats var found bool // First try exact match if stats, exists := fileStat.ColumnStats[spec.Column]; exists { colStats = stats found = true } else { // Try case-insensitive lookup for colName, stats := range fileStat.ColumnStats { if strings.EqualFold(colName, spec.Column) { colStats = stats found = true break } } } if found && colStats != nil && colStats.MinValue != nil { if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { globalMinValue = colStats.MinValue extractedValue := comp.engine.extractRawValue(colStats.MinValue) if extractedValue != nil { globalMin = extractedValue hasParquetStats = true } } } } } // Step 2: Get minimum from live log data (only if no live logs or if we need to compare) if dataSources.LiveLogRowCount > 0 { for _, partition := range partitions { partitionParquetSources := make(map[string]bool) if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) } liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) if err != nil { continue // Skip partitions with errors } if liveLogMin != nil { if globalMin == nil { globalMin = liveLogMin } else { liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { globalMin = liveLogMin globalMinValue = liveLogSchemaValue } } } } } // Step 3: Handle system columns if no regular data found if globalMin == nil && !hasParquetStats { globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) } return globalMin, nil } // computeGlobalMax computes the global maximum value across all data sources func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { var globalMax interface{} var globalMaxValue *schema_pb.Value hasParquetStats := false // Step 1: Get maximum from parquet statistics for _, fileStats := range dataSources.ParquetFiles { for _, fileStat := range fileStats { // Try case-insensitive column lookup var colStats *ParquetColumnStats var found bool // First try exact match if stats, exists := fileStat.ColumnStats[spec.Column]; exists { colStats = stats found = true } else { // Try case-insensitive lookup for colName, stats := range fileStat.ColumnStats { if strings.EqualFold(colName, spec.Column) { colStats = stats found = true break } } } if found && colStats != nil && colStats.MaxValue != nil { if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { globalMaxValue = colStats.MaxValue extractedValue := comp.engine.extractRawValue(colStats.MaxValue) if extractedValue != nil { globalMax = extractedValue hasParquetStats = true } } } } } // Step 2: Get maximum from live log data (only if live logs exist) if dataSources.LiveLogRowCount > 0 { for _, partition := range partitions { partitionParquetSources := make(map[string]bool) if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) } _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) if err != nil { continue // Skip partitions with errors } if liveLogMax != nil { if globalMax == nil { globalMax = liveLogMax } else { liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { globalMax = liveLogMax globalMaxValue = liveLogSchemaValue } } } } } // Step 3: Handle system columns if no regular data found if globalMax == nil && !hasParquetStats { globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) } return globalMax, nil } // executeAggregationQuery handles SELECT queries with aggregation functions func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) { return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil) } // executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { // Parse LIMIT and OFFSET for aggregation results (do this first) // Use -1 to distinguish "no LIMIT" from "LIMIT 0" limit := -1 offset := 0 if stmt.Limit != nil && stmt.Limit.Rowcount != nil { if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal { if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil { if limit64 > int64(math.MaxInt) || limit64 < 0 { return nil, fmt.Errorf("LIMIT value %d is out of range", limit64) } // Safe conversion after bounds check limit = int(limit64) } } } if stmt.Limit != nil && stmt.Limit.Offset != nil { if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal { if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil { if offset64 > int64(math.MaxInt) || offset64 < 0 { return nil, fmt.Errorf("OFFSET value %d is out of range", offset64) } // Safe conversion after bounds check offset = int(offset64) } } } // Parse WHERE clause for filtering var predicate func(*schema_pb.RecordValue) bool var err error if stmt.Where != nil { predicate, err = e.buildPredicate(stmt.Where.Expr) if err != nil { return &QueryResult{Error: err}, err } } // Extract time filters for optimization startTimeNs, stopTimeNs := int64(0), int64(0) if stmt.Where != nil { startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) } // FAST PATH RE-ENABLED WITH DEBUG LOGGING: // Added comprehensive debug logging to identify data counting issues // This will help us understand why fast path was returning 0 when slow path returns 1803 if stmt.Where == nil { if isDebugMode(ctx) { fmt.Printf("\nFast path optimization attempt...\n") } fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) if canOptimize { if isDebugMode(ctx) { fmt.Printf("Fast path optimization succeeded!\n") } return fastResult, nil } else { if isDebugMode(ctx) { fmt.Printf("Fast path optimization failed, falling back to slow path\n") } } } else { if isDebugMode(ctx) { fmt.Printf("Fast path not applicable due to WHERE clause\n") } } // SLOW PATH: Fall back to full table scan if isDebugMode(ctx) { fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") } // Extract columns needed for aggregations columnsNeeded := make(map[string]bool) for _, spec := range aggregations { if spec.Column != "*" { columnsNeeded[spec.Column] = true } } // Convert to slice var scanColumns []string if len(columnsNeeded) > 0 { scanColumns = make([]string, 0, len(columnsNeeded)) for col := range columnsNeeded { scanColumns = append(scanColumns, col) } } // If no specific columns needed (COUNT(*) only), don't specify columns (scan all) // Build scan options for full table scan (aggregations need all data during scanning) hybridScanOptions := HybridScanOptions{ StartTimeNs: startTimeNs, StopTimeNs: stopTimeNs, Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation Offset: 0, // No offset during scanning - OFFSET applies to final results Predicate: predicate, Columns: scanColumns, // Include columns needed for aggregation functions } // DEBUG: Log scan options for aggregation debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION") // Execute the hybrid scan to get all matching records var results []HybridScanResult if plan != nil { // EXPLAIN mode - capture broker buffer stats var stats *HybridScanStats results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) if err != nil { return &QueryResult{Error: err}, err } // Populate plan with broker buffer information if stats != nil { plan.BrokerBufferQueried = stats.BrokerBufferQueried plan.BrokerBufferMessages = stats.BrokerBufferMessages plan.BufferStartIndex = stats.BufferStartIndex // Add broker_buffer to data sources if buffer was queried if stats.BrokerBufferQueried { // Check if broker_buffer is already in data sources hasBrokerBuffer := false for _, source := range plan.DataSources { if source == "broker_buffer" { hasBrokerBuffer = true break } } if !hasBrokerBuffer { plan.DataSources = append(plan.DataSources, "broker_buffer") } } } } else { // Normal mode - just get results results, err = hybridScanner.Scan(ctx, hybridScanOptions) if err != nil { return &QueryResult{Error: err}, err } } // DEBUG: Log scan results if isDebugMode(ctx) { fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results)) } // Compute aggregations aggResults := e.computeAggregations(results, aggregations) // Build result set columns := make([]string, len(aggregations)) row := make([]sqltypes.Value, len(aggregations)) for i, spec := range aggregations { columns[i] = spec.Alias row[i] = e.formatAggregationResult(spec, aggResults[i]) } // Apply OFFSET and LIMIT to aggregation results // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows rows := [][]sqltypes.Value{row} if offset > 0 || limit >= 0 { // Handle LIMIT 0 first if limit == 0 { rows = [][]sqltypes.Value{} } else { // Apply OFFSET first if offset > 0 { if offset >= len(rows) { rows = [][]sqltypes.Value{} } else { rows = rows[offset:] } } // Apply LIMIT after OFFSET (only if limit > 0) if limit > 0 && len(rows) > limit { rows = rows[:limit] } } } return &QueryResult{ Columns: columns, Rows: rows, }, nil } // tryFastParquetAggregation attempts to compute aggregations using hybrid approach: // - Use parquet metadata for parquet files // - Count live log files for live data // - Combine both for accurate results per partition // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) } // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { // Use the new modular components optimizer := NewFastPathOptimizer(e) computer := NewAggregationComputer(e) // Step 1: Determine strategy strategy := optimizer.DetermineStrategy(aggregations) if !strategy.CanUseFastPath { return nil, false } // Step 2: Collect data sources dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) if err != nil { return nil, false } // Build partition list for aggregation computer // Note: discoverTopicPartitions always returns absolute paths partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) if err != nil { return nil, false } // Debug: Show the hybrid optimization results (only in explain mode) if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0) { partitionsWithLiveLogs := 0 if dataSources.LiveLogRowCount > 0 { partitionsWithLiveLogs = 1 // Simplified for now } fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n", dataSources.ParquetRowCount, dataSources.LiveLogRowCount, partitionsWithLiveLogs) } // Step 3: Compute aggregations using fast path aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions) if err != nil { return nil, false } // Step 3.5: Validate fast path results (safety check) // For simple COUNT(*) queries, ensure we got a reasonable result if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" { totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount countResult := aggResults[0].Count if isDebugMode(ctx) { fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows) } if totalRows == 0 && countResult > 0 { // Fast path found data but data sources show 0 - this suggests a bug if isDebugMode(ctx) { fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult) } return nil, false } if totalRows > 0 && countResult == 0 { // Data sources show data but COUNT is 0 - this also suggests a bug if isDebugMode(ctx) { fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows) } return nil, false } if countResult != totalRows { // Counts don't match - this suggests inconsistent logic if isDebugMode(ctx) { fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows) } return nil, false } if isDebugMode(ctx) { fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult) } } // Step 4: Populate execution plan if provided (for EXPLAIN queries) if plan != nil { strategy := optimizer.DetermineStrategy(aggregations) builder := &ExecutionPlanBuilder{} // Create a minimal SELECT statement for the plan builder (avoid nil pointer) stmt := &SelectStatement{} // Build aggregation plan with fast path strategy aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources) // Copy relevant fields to the main plan plan.ExecutionStrategy = aggPlan.ExecutionStrategy plan.DataSources = aggPlan.DataSources plan.OptimizationsUsed = aggPlan.OptimizationsUsed plan.PartitionsScanned = aggPlan.PartitionsScanned plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed plan.Aggregations = aggPlan.Aggregations // Merge details while preserving existing ones if plan.Details == nil { plan.Details = make(map[string]interface{}) } for key, value := range aggPlan.Details { plan.Details[key] = value } // Add file path information from the data collection plan.Details["partition_paths"] = partitions // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string for _, partitionPath := range partitions { // Get parquet files for this partition if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { for _, stats := range parquetStats { parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) } } // Get live log files for this partition if liveFiles, err := collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { for _, fileName := range liveFiles { liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) } } } if len(parquetFiles) > 0 { plan.Details["parquet_files"] = parquetFiles } if len(liveLogFiles) > 0 { plan.Details["live_log_files"] = liveLogFiles } // Update the dataSources.LiveLogFilesCount to match the actual files found dataSources.LiveLogFilesCount = len(liveLogFiles) // Also update the plan's LiveLogFilesScanned to match plan.LiveLogFilesScanned = len(liveLogFiles) // Ensure PartitionsScanned is set so Statistics section appears if plan.PartitionsScanned == 0 && len(partitions) > 0 { plan.PartitionsScanned = len(partitions) } if isDebugMode(ctx) { fmt.Printf("Populated execution plan with fast path strategy\n") } } // Step 5: Build final query result columns := make([]string, len(aggregations)) row := make([]sqltypes.Value, len(aggregations)) for i, spec := range aggregations { columns[i] = spec.Alias row[i] = e.formatAggregationResult(spec, aggResults[i]) } result := &QueryResult{ Columns: columns, Rows: [][]sqltypes.Value{row}, } return result, true } // computeAggregations computes aggregation results from a full table scan func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult { aggResults := make([]AggregationResult, len(aggregations)) for i, spec := range aggregations { switch spec.Function { case FuncCOUNT: if spec.Column == "*" { aggResults[i].Count = int64(len(results)) } else { count := int64(0) for _, result := range results { if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) { count++ } } aggResults[i].Count = count } case FuncSUM: sum := float64(0) for _, result := range results { if value := e.findColumnValue(result, spec.Column); value != nil { if numValue := e.convertToNumber(value); numValue != nil { sum += *numValue } } } aggResults[i].Sum = sum case FuncAVG: sum := float64(0) count := int64(0) for _, result := range results { if value := e.findColumnValue(result, spec.Column); value != nil { if numValue := e.convertToNumber(value); numValue != nil { sum += *numValue count++ } } } if count > 0 { aggResults[i].Sum = sum / float64(count) // Store average in Sum field aggResults[i].Count = count } case FuncMIN: var min interface{} var minValue *schema_pb.Value for _, result := range results { if value := e.findColumnValue(result, spec.Column); value != nil { if minValue == nil || e.compareValues(value, minValue) < 0 { minValue = value min = e.extractRawValue(value) } } } aggResults[i].Min = min case FuncMAX: var max interface{} var maxValue *schema_pb.Value for _, result := range results { if value := e.findColumnValue(result, spec.Column); value != nil { if maxValue == nil || e.compareValues(value, maxValue) > 0 { maxValue = value max = e.extractRawValue(value) } } } aggResults[i].Max = max } } return aggResults } // canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { switch spec.Function { case FuncCOUNT: return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) case FuncMIN, FuncMAX: return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) case FuncSUM, FuncAVG: // These require scanning actual values, not just min/max return false default: return false } } // debugHybridScanOptions logs the exact scan options being used func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) { if isDebugMode(ctx) { fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType) fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs) fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs) fmt.Printf("Limit: %d\n", options.Limit) fmt.Printf("Offset: %d\n", options.Offset) fmt.Printf("Predicate: %v\n", options.Predicate != nil) fmt.Printf("Columns: %v\n", options.Columns) fmt.Printf("==========================================\n") } } // collectLiveLogFileNames collects the names of live log files in a partition func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { var fileNames []string err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Skip directories and parquet files if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { return nil } // Only include files with actual content if len(entry.Chunks) > 0 { fileNames = append(fileNames, entry.Name) } return nil }) return fileNames, err }