diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index 623e489dd..6b58517e1 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -8,10 +8,8 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" - "github.com/seaweedfs/seaweedfs/weed/util" ) // AggregationSpec defines an aggregation function to be computed @@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) // CollectDataSources gathers information about available data sources for a topic func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0) +} + +// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic +// with optional time filtering to skip irrelevant parquet files +func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) { dataSources := &TopicDataSources{ ParquetFiles: make(map[string][]*ParquetFileStats), ParquetRowCount: 0, @@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan fmt.Printf(" No parquet files found in partition\n") } } else { - dataSources.ParquetFiles[partitionPath] = parquetStats + // Prune by time range using parquet column statistics + filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + dataSources.ParquetFiles[partitionPath] = filtered partitionParquetRows := int64(0) - for _, stat := range parquetStats { + for _, stat := range filtered { partitionParquetRows += stat.RowCount dataSources.ParquetRowCount += stat.RowCount } if isDebugMode(ctx) { - fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) + fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows) } } @@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } - // Extract time filters for optimization + // Extract time filters and validate that WHERE clause contains only time-based predicates startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr) } - // FAST PATH RE-ENABLED WITH DEBUG LOGGING: - // Added comprehensive debug logging to identify data counting issues - // This will help us understand why fast path was returning 0 when slow path returns 1803 - if stmt.Where == nil { + // FAST PATH WITH TIME-BASED OPTIMIZATION: + // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses + // This prevents incorrect results when non-time predicates are present + canAttemptFastPath := stmt.Where == nil || onlyTimePredicates + + if canAttemptFastPath { if isDebugMode(ctx) { - fmt.Printf("\nFast path optimization attempt...\n") + if stmt.Where == nil { + fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n") + } else { + fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n") + } } - fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) + fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt) if canOptimize { if isDebugMode(ctx) { fmt.Printf("Fast path optimization succeeded!\n") @@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } else { if isDebugMode(ctx) { - fmt.Printf("Fast path not applicable due to WHERE clause\n") + fmt.Printf("Fast path not applicable due to complex WHERE clause\n") } } @@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS // Build execution tree for aggregation queries if plan is provided if plan != nil { + // Populate detailed plan information for full scan (similar to fast path) + e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt) plan.RootNode = e.buildExecutionTree(plan, stmt) } return result, nil } +// populateFullScanPlanDetails populates detailed plan information for full scan queries +// This provides consistency with fast path execution plan details +func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) { + // plan.Details is initialized at the start of the SELECT execution + + // Extract table information + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Discover partitions and populate file details + if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { + // Add partition paths to execution plan details + plan.Details["partition_paths"] = partitions + + // Populate detailed file information using shared helper + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) + } else { + // Record discovery error to plan for better diagnostics + plan.Details["error_partition_discovery"] = discoverErr.Error() + } +} + // tryFastParquetAggregation attempts to compute aggregations using hybrid approach: // - Use parquet metadata for parquet files // - Count live log files for live data // - Combine both for accurate results per partition // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { - return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) + return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil) } // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided -func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { +// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering) +// stmt: SELECT statement for column statistics pruning optimization (can be nil) +func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) { // Use the new modular components optimizer := NewFastPathOptimizer(e) computer := NewAggregationComputer(e) @@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri return nil, false } - // Step 2: Collect data sources - dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + // Step 2: Collect data sources with time filtering for parquet file optimization + dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs) if err != nil { return nil, false } @@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri } // Merge details while preserving existing ones - if plan.Details == nil { - plan.Details = make(map[string]interface{}) - } for key, value := range aggPlan.Details { plan.Details[key] = value } @@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri // Add file path information from the data collection plan.Details["partition_paths"] = partitions - // Collect actual file information for each partition - var parquetFiles []string - var liveLogFiles []string - parquetSources := make(map[string]bool) - - for _, partitionPath := range partitions { - // Get parquet files for this partition - if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { - parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) - } - } - - // Merge accurate parquet sources from metadata (preferred over filename fallback) - if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { - for src := range sources { - parquetSources[src] = true - } - } + // Populate detailed file information using shared helper, including time filters for pruning + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) - // Get live log files for this partition - if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { - for _, fileName := range liveFiles { - // Exclude live log files that have been converted to parquet (deduplicated) - if parquetSources[fileName] { - continue - } - liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) - } - } - } - - if len(parquetFiles) > 0 { - plan.Details["parquet_files"] = parquetFiles - } - if len(liveLogFiles) > 0 { - plan.Details["live_log_files"] = liveLogFiles + // Update counts to match discovered live log files + if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok { + dataSources.LiveLogFilesCount = len(liveLogFiles) + plan.LiveLogFilesScanned = len(liveLogFiles) } - // Update the dataSources.LiveLogFilesCount to match the actual files found - dataSources.LiveLogFilesCount = len(liveLogFiles) - - // Also update the plan's LiveLogFilesScanned to match - plan.LiveLogFilesScanned = len(liveLogFiles) - // Ensure PartitionsScanned is set so Statistics section appears if plan.PartitionsScanned == 0 && len(partitions) > 0 { plan.PartitionsScanned = len(partitions) @@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer fmt.Printf("==========================================\n") } } - -// collectLiveLogFileNames collects the names of live log files in a partition -func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { - var fileNames []string - - err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories and parquet files - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { - return nil - } - - // Only include files with actual content - if len(entry.Chunks) > 0 { - fileNames = append(fileNames, entry.Name) - } - - return nil - }) - - return fileNames, err -} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 84c238583..ffed03f35 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -495,69 +495,6 @@ func ParseSQL(sql string) (Statement, error) { } } -// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser -func extractFunctionArguments(expr string) ([]SelectExpr, error) { - // Find the parentheses - startParen := strings.Index(expr, "(") - endParen := strings.LastIndex(expr, ")") - - if startParen == -1 || endParen == -1 || endParen <= startParen { - return nil, fmt.Errorf("invalid function syntax") - } - - // Extract arguments string - argsStr := strings.TrimSpace(expr[startParen+1 : endParen]) - - // Handle empty arguments - if argsStr == "" { - return []SelectExpr{}, nil - } - - // Handle single * argument (for COUNT(*)) - if argsStr == "*" { - return []SelectExpr{&StarExpr{}}, nil - } - - // Parse multiple arguments separated by commas - args := []SelectExpr{} - argParts := strings.Split(argsStr, ",") - - // Use CockroachDB parser to parse each argument as a SELECT expression - cockroachParser := NewCockroachSQLParser() - - for _, argPart := range argParts { - argPart = strings.TrimSpace(argPart) - if argPart == "*" { - args = append(args, &StarExpr{}) - } else { - // Create a dummy SELECT statement to parse the argument expression - dummySelect := fmt.Sprintf("SELECT %s", argPart) - - // Parse using CockroachDB parser - stmt, err := cockroachParser.ParseSQL(dummySelect) - if err != nil { - // If CockroachDB parser fails, fall back to simple column name - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - continue - } - - // Extract the expression from the parsed SELECT statement - if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 { - args = append(args, selectStmt.SelectExprs[0]) - } else { - // Fallback to column name if parsing fails - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - } - } - } - - return args, nil -} - // debugModeKey is used to store debug mode flag in context type debugModeKey struct{} @@ -1221,8 +1158,8 @@ func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectSta } } - // Create broker buffer node if queried - if plan.BrokerBufferQueried { + // Create broker buffer node only if queried AND has unflushed messages + if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 { brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{ FilePath: "broker_memory_buffer", SourceType: "broker_buffer", @@ -1489,6 +1426,8 @@ func (e *SQLEngine) formatOptimization(opt string) string { return "Duplicate Data Avoidance" case "predicate_pushdown": return "WHERE Clause Pushdown" + case "column_statistics_pruning": + return "Column Statistics File Pruning" case "column_projection": return "Column Selection" case "limit_pushdown": @@ -1540,6 +1479,10 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) // executeSelectStatementWithPlan handles SELECT queries with execution plan tracking func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Initialize plan details once + if plan != nil && plan.Details == nil { + plan.Details = make(map[string]interface{}) + } // Parse aggregations to populate plan var aggregations []AggregationSpec hasAggregations := false @@ -1577,7 +1520,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se if table, ok := stmt.From[0].(*AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(TableName); ok { tableName = tableExpr.Name.String() - if tableExpr.Qualifier.String() != "" { + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } } @@ -2290,18 +2233,51 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { // Add partition paths to execution plan details plan.Details["partition_paths"] = partitions + // Persist time filter details for downstream pruning/diagnostics + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + + if isDebugMode(ctx) { + fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs) + } // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string for _, partitionPath := range partitions { // Get parquet files for this partition if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { + // Prune files by time range with debug logging + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if isDebugMode(ctx) { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + for _, stats := range filteredStats { parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } } // Merge accurate parquet sources from metadata @@ -2320,6 +2296,11 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } } } @@ -2329,11 +2310,20 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if len(liveLogFiles) > 0 { plan.Details["live_log_files"] = liveLogFiles } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } // Update scan statistics for execution plan display plan.PartitionsScanned = len(partitions) plan.ParquetFilesScanned = len(parquetFiles) plan.LiveLogFilesScanned = len(liveLogFiles) + } else { + // Handle partition discovery error + plan.Details["error_partition_discovery"] = discoverErr.Error() } } else { // Normal mode - just get results @@ -2377,6 +2367,23 @@ func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) { return startTimeNs, stopTimeNs } +// extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates +// Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe +func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) { + startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true + + // Recursively extract time filters and validate predicates + e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates) + + // Special case: if startTimeNs == stopTimeNs, treat it like an equality query + if startTimeNs != 0 && startTimeNs == stopTimeNs { + stopTimeNs = 0 + } + + return startTimeNs, stopTimeNs, onlyTimePredicates +} + // extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) { switch exprType := expr.(type) { @@ -2396,6 +2403,39 @@ func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stop } } +// extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates +func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) { + switch exprType := expr.(type) { + case *ComparisonExpr: + // Check if this is a time-based comparison + leftCol := e.getColumnName(exprType.Left) + rightCol := e.getColumnName(exprType.Right) + + isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol) + if isTimeComparison { + // Extract time filter from this comparison + e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs) + } else { + // Non-time predicate found - fast path is not safe + *onlyTimePredicates = false + } + case *AndExpr: + // For AND expressions, both sides must be time-only for fast path to be safe + e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates) + e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates) + case *OrExpr: + // OR expressions are complex and not supported in fast path + *onlyTimePredicates = false + return + case *ParenExpr: + // Unwrap parentheses and continue + e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates) + default: + // Unknown expression type - not safe for fast path + *onlyTimePredicates = false + } +} + // extractTimeFromComparison extracts time bounds from comparison expressions // Handles comparisons against timestamp columns (system columns and schema-defined timestamp types) func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) { @@ -2465,7 +2505,7 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { } // System timestamp columns are always time columns - if columnName == SW_COLUMN_NAME_TIMESTAMP { + if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP { return true } @@ -2495,6 +2535,280 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { return false } +// getTimeFiltersFromPlan extracts time filter values from execution plan details +func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) { + if plan == nil || plan.Details == nil { + return 0, 0 + } + if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok { + if startNs, ok2 := startNsVal.(int64); ok2 { + startTimeNs = startNs + } + } + if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok { + if stopNs, ok2 := stopNsVal.(int64); ok2 { + stopTimeNs = stopNs + } + } + return +} + +// pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging +func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats { + if startTimeNs == 0 && stopTimeNs == 0 { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + qStart := startTimeNs + qStop := stopTimeNs + if qStop == 0 { + qStop = math.MaxInt64 + } + + n := 0 + for _, fs := range parquetStats { + if debugEnabled { + fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName) + } + if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok { + if debugEnabled { + fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop) + } + if qStop < minNs || (qStart != 0 && qStart > maxNs) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName) + } + continue + } + } else if debugEnabled { + fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName) + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates +func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats { + if whereExpr == nil { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + n := 0 + for _, fs := range parquetStats { + if e.canSkipParquetFile(ctx, fs, whereExpr) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName) + } + continue + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// canSkipParquetFile determines if a parquet file can be skipped based on column statistics +func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool { + switch expr := whereExpr.(type) { + case *ComparisonExpr: + return e.canSkipFileByComparison(ctx, fileStats, expr) + case *AndExpr: + // For AND: skip if ANY condition allows skipping (more aggressive pruning) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right) + case *OrExpr: + // For OR: skip only if ALL conditions allow skipping (conservative) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right) + default: + // Unknown expression type - don't skip + return false + } +} + +// canSkipFileByComparison checks if a file can be skipped based on a comparison predicate +func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool { + // Extract column name and comparison value + var columnName string + var compareSchemaValue *schema_pb.Value + var operator string = expr.Operator + + // Determine which side is the column and which is the value + if colRef, ok := expr.Left.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Right.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + } else { + return false // Can't optimize complex expressions + } + } else if colRef, ok := expr.Right.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Left.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + // Flip operator for reversed comparison + operator = e.flipOperator(operator) + } else { + return false + } + } else { + return false // No column reference found + } + + // Validate comparison value + if compareSchemaValue == nil { + return false + } + + // Get column statistics + colStats, exists := fileStats.ColumnStats[columnName] + if !exists || colStats == nil { + // Try case-insensitive lookup + for colName, stats := range fileStats.ColumnStats { + if strings.EqualFold(colName, columnName) { + colStats = stats + exists = true + break + } + } + } + + if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil { + return false // No statistics available + } + + // Apply pruning logic based on operator + switch operator { + case ">": + // Skip if max(column) <= compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0 + case ">=": + // Skip if max(column) < compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0 + case "<": + // Skip if min(column) >= compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0 + case "<=": + // Skip if min(column) > compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) > 0 + case "=": + // Skip if compareValue is outside [min, max] range + return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 || + e.compareValues(compareSchemaValue, colStats.MaxValue) > 0 + case "!=", "<>": + // Skip if min == max == compareValue (all values are the same and equal to compareValue) + return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 && + e.compareValues(colStats.MinValue, compareSchemaValue) == 0 + default: + return false // Unknown operator + } +} + +// flipOperator flips comparison operators when operands are swapped +func (e *SQLEngine) flipOperator(op string) string { + switch op { + case ">": + return "<" + case ">=": + return "<=" + case "<": + return ">" + case "<=": + return ">=" + case "=", "!=", "<>": + return op // These are symmetric + default: + return op + } +} + +// populatePlanFileDetails populates execution plan with detailed file information for partitions +// Includes column statistics pruning optimization when WHERE clause is provided +func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) { + debugEnabled := ctx != nil && isDebugMode(ctx) + // Collect actual file information for each partition + var parquetFiles []string + var liveLogFiles []string + parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string + + // Extract time filters from plan details + startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan) + + for _, partitionPath := range partitions { + // Get parquet files for this partition + if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { + // Prune files by time range + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt != nil && stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if debugEnabled { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + + for _, stats := range filteredStats { + parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) + } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } + } + + // Merge accurate parquet sources from metadata + if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { + for src := range sources { + parquetSources[src] = true + } + } + + // Get live log files for this partition + if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { + for _, fileName := range liveFiles { + // Exclude live log files that have been converted to parquet (deduplicated) + if parquetSources[fileName] { + continue + } + liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) + } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } + } + } + + // Add file lists to plan details + if len(parquetFiles) > 0 { + plan.Details["parquet_files"] = parquetFiles + } + if len(liveLogFiles) > 0 { + plan.Details["live_log_files"] = liveLogFiles + } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } +} + // isSQLTypeTimestamp checks if a SQL type string represents a timestamp type func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { upperType := strings.ToUpper(strings.TrimSpace(sqlType)) @@ -2664,56 +2978,6 @@ func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []Selec } } -// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support -func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var compareValue interface{} - var operator string - - // Extract the comparison details, resolving aliases if needed - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - rightCol := e.getColumnNameWithAliases(expr.Right, aliases) - operator = e.normalizeOperator(expr.Operator) - - if leftCol != "" && rightCol == "" { - // Left side is column, right side is value - columnName = e.getSystemColumnInternalName(leftCol) - val, err := e.extractValueFromExpr(expr.Right) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) - } else if rightCol != "" && leftCol == "" { - // Right side is column, left side is value - columnName = e.getSystemColumnInternalName(rightCol) - val, err := e.extractValueFromExpr(expr.Left) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) - // Reverse the operator when column is on the right - operator = e.reverseOperator(operator) - } else if leftCol != "" && rightCol != "" { - return nil, fmt.Errorf("column-to-column comparisons not yet supported") - } else { - return nil, fmt.Errorf("at least one side of comparison must be a column") - } - - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - return e.evaluateComparison(fieldValue, operator, compareValue) - }, nil -} - -// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.) -// Handles column names on both left and right sides of the comparison -func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) { - return e.buildComparisonPredicateWithContext(expr, nil) -} - // buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { var columnName string @@ -2836,54 +3100,6 @@ func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectEx }, nil } -// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support -func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var fromValue, toValue interface{} - - // Extract column name from left side with alias resolution - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - if leftCol == "" { - return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) - } - columnName = e.getSystemColumnInternalName(leftCol) - - // Extract FROM value - fromVal, err := e.extractValueFromExpr(expr.From) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) - } - fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) - - // Extract TO value - toVal, err := e.extractValueFromExpr(expr.To) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) - } - toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - - // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue - greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) - lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) - - result := greaterThanOrEqualFrom && lessThanOrEqualTo - - // Handle NOT BETWEEN - if expr.Not { - result = !result - } - - return result - }, nil -} - // buildIsNullPredicateWithContext creates a predicate for IS NULL operations func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { // Check if the expression is a column name @@ -2936,50 +3152,6 @@ func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, sele } } -// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support -func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's null or missing - fieldValue, exists := record.Fields[columnName] - if !exists { - return true // Field doesn't exist = NULL - } - - // Check if the field value itself is null/empty - return e.isValueNull(fieldValue) - }, nil -} - -// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support -func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's not null - fieldValue, exists := record.Fields[columnName] - if !exists { - return false // Field doesn't exist = NULL, so NOT NULL is false - } - - // Check if the field value itself is not null/empty - return !e.isValueNull(fieldValue) - }, nil -} - // isValueNull checks if a schema_pb.Value is null or represents a null value func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { if value == nil { @@ -3019,33 +3191,6 @@ func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { } } -// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed -func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string { - switch exprType := expr.(type) { - case *ColName: - colName := exprType.Name.String() - // Check if this is an alias that should be resolved - if aliases != nil { - if actualExpr, exists := aliases[colName]; exists { - // Recursively resolve the aliased expression - return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases - } - } - return colName - } - return "" -} - -// extractValueFromExpr extracts a value from an expression node (for alias support) -func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) { - return e.extractComparisonValue(expr) -} - -// normalizeOperator normalizes comparison operators -func (e *SQLEngine) normalizeOperator(op string) string { - return op // For now, just return as-is -} - // extractComparisonValue extracts the comparison value from a SQL expression func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) { switch val := expr.(type) { @@ -4178,31 +4323,6 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { return t.UnixNano() } -// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition -func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { - filerClient, err := e.catalog.brokerClient.GetFilerClient() - if err != nil { - return 0, err - } - - totalRows := int64(0) - err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { - return nil // Skip directories and parquet files - } - - // Count rows in live log file - rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) - if err != nil { - fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) - return nil // Continue with other files - } - totalRows += rowCount - return nil - }) - return totalRows, err -} - // extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { sourceFiles := make(map[string]bool) @@ -4226,6 +4346,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err @@ -4242,14 +4363,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Second, get duplicate files from log buffer metadata logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) if err != nil { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) } logBufferDuplicates = make(map[string]bool) } // Debug: Show deduplication status (only in explain mode) - if isDebugMode(ctx) { + if debugEnabled { if len(actualSourceFiles) > 0 { fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) } @@ -4266,7 +4387,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that have been converted to parquet if actualSourceFiles[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) } return nil @@ -4274,7 +4395,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that are duplicated due to log buffer metadata if logBufferDuplicates[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) } return nil @@ -4345,6 +4466,7 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer // buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) if e.catalog.brokerClient == nil { return make(map[string]bool), nil } @@ -4390,7 +4512,7 @@ func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitio if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { // Ranges overlap - this file contains duplicate buffer indexes isDuplicate = true - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) } diff --git a/weed/query/engine/fast_path_predicate_validation_test.go b/weed/query/engine/fast_path_predicate_validation_test.go new file mode 100644 index 000000000..3322ed51f --- /dev/null +++ b/weed/query/engine/fast_path_predicate_validation_test.go @@ -0,0 +1,272 @@ +package engine + +import ( + "testing" +) + +// TestFastPathPredicateValidation tests the critical fix for fast-path aggregation +// to ensure non-time predicates are properly detected and fast-path is blocked +func TestFastPathPredicateValidation(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + name string + whereClause string + expectedTimeOnly bool + expectedStartTimeNs int64 + expectedStopTimeNs int64 + description string + }{ + { + name: "No WHERE clause", + whereClause: "", + expectedTimeOnly: true, // No WHERE means time-only is true + description: "Queries without WHERE clause should allow fast path", + }, + { + name: "Time-only predicate (greater than)", + whereClause: "_ts > 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + expectedStopTimeNs: 0, + description: "Pure time predicates should allow fast path", + }, + { + name: "Time-only predicate (less than)", + whereClause: "_ts < 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 0, + expectedStopTimeNs: 1640995200000000000, + description: "Pure time predicates should allow fast path", + }, + { + name: "Time-only predicate (range with AND)", + whereClause: "_ts > 1640995200000000000 AND _ts < 1641081600000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + expectedStopTimeNs: 1641081600000000000, + description: "Time range predicates should allow fast path", + }, + { + name: "Mixed predicate (time + non-time)", + whereClause: "_ts > 1640995200000000000 AND user_id = 'user123'", + expectedTimeOnly: false, + description: "CRITICAL: Mixed predicates must block fast path to prevent incorrect results", + }, + { + name: "Non-time predicate only", + whereClause: "user_id = 'user123'", + expectedTimeOnly: false, + description: "Non-time predicates must block fast path", + }, + { + name: "Multiple non-time predicates", + whereClause: "user_id = 'user123' AND status = 'active'", + expectedTimeOnly: false, + description: "Multiple non-time predicates must block fast path", + }, + { + name: "OR with time predicate (unsafe)", + whereClause: "_ts > 1640995200000000000 OR user_id = 'user123'", + expectedTimeOnly: false, + description: "OR expressions are complex and must block fast path", + }, + { + name: "OR with only time predicates (still unsafe)", + whereClause: "_ts > 1640995200000000000 OR _ts < 1640908800000000000", + expectedTimeOnly: false, + description: "Even time-only OR expressions must block fast path due to complexity", + }, + // Note: Parenthesized expressions are not supported by the current parser + // These test cases are commented out until parser support is added + { + name: "String column comparison", + whereClause: "event_type = 'click'", + expectedTimeOnly: false, + description: "String column comparisons must block fast path", + }, + { + name: "Numeric column comparison", + whereClause: "id > 1000", + expectedTimeOnly: false, + description: "Numeric column comparisons must block fast path", + }, + { + name: "Internal timestamp column", + whereClause: "_timestamp_ns > 1640995200000000000", + expectedTimeOnly: true, + expectedStartTimeNs: 1640995200000000000, + description: "Internal timestamp column should allow fast path", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Parse the WHERE clause if present + var whereExpr ExprNode + if tc.whereClause != "" { + sql := "SELECT COUNT(*) FROM test WHERE " + tc.whereClause + stmt, err := ParseSQL(sql) + if err != nil { + t.Fatalf("Failed to parse SQL: %v", err) + } + selectStmt := stmt.(*SelectStatement) + whereExpr = selectStmt.Where.Expr + } + + // Test the validation function + var startTimeNs, stopTimeNs int64 + var onlyTimePredicates bool + + if whereExpr == nil { + // No WHERE clause case + onlyTimePredicates = true + } else { + startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(whereExpr) + } + + // Verify the results + if onlyTimePredicates != tc.expectedTimeOnly { + t.Errorf("Expected onlyTimePredicates=%v, got %v. %s", + tc.expectedTimeOnly, onlyTimePredicates, tc.description) + } + + // Check time filters if expected + if tc.expectedStartTimeNs != 0 && startTimeNs != tc.expectedStartTimeNs { + t.Errorf("Expected startTimeNs=%d, got %d", tc.expectedStartTimeNs, startTimeNs) + } + if tc.expectedStopTimeNs != 0 && stopTimeNs != tc.expectedStopTimeNs { + t.Errorf("Expected stopTimeNs=%d, got %d", tc.expectedStopTimeNs, stopTimeNs) + } + + t.Logf("✅ %s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d", + tc.name, onlyTimePredicates, startTimeNs, stopTimeNs) + }) + } +} + +// TestFastPathAggregationSafety tests that fast-path aggregation is only attempted +// when it's safe to do so (no non-time predicates) +func TestFastPathAggregationSafety(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + name string + sql string + shouldUseFastPath bool + description string + }{ + { + name: "No WHERE - should use fast path", + sql: "SELECT COUNT(*) FROM test", + shouldUseFastPath: true, + description: "Queries without WHERE should use fast path", + }, + { + name: "Time-only WHERE - should use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000", + shouldUseFastPath: true, + description: "Time-only predicates should use fast path", + }, + { + name: "Mixed WHERE - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 AND user_id = 'user123'", + shouldUseFastPath: false, + description: "CRITICAL: Mixed predicates must NOT use fast path to prevent wrong results", + }, + { + name: "Non-time WHERE - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE user_id = 'user123'", + shouldUseFastPath: false, + description: "Non-time predicates must NOT use fast path", + }, + { + name: "OR expression - should NOT use fast path", + sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 OR user_id = 'user123'", + shouldUseFastPath: false, + description: "OR expressions must NOT use fast path due to complexity", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Parse the SQL + stmt, err := ParseSQL(tc.sql) + if err != nil { + t.Fatalf("Failed to parse SQL: %v", err) + } + selectStmt := stmt.(*SelectStatement) + + // Test the fast path decision logic + startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true + if selectStmt.Where != nil { + startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(selectStmt.Where.Expr) + } + + canAttemptFastPath := selectStmt.Where == nil || onlyTimePredicates + + // Verify the decision + if canAttemptFastPath != tc.shouldUseFastPath { + t.Errorf("Expected canAttemptFastPath=%v, got %v. %s", + tc.shouldUseFastPath, canAttemptFastPath, tc.description) + } + + t.Logf("✅ %s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)", + tc.name, canAttemptFastPath, onlyTimePredicates, startTimeNs, stopTimeNs) + }) + } +} + +// TestTimestampColumnDetection tests that the engine correctly identifies timestamp columns +func TestTimestampColumnDetection(t *testing.T) { + engine := NewTestSQLEngine() + + testCases := []struct { + columnName string + isTimestamp bool + description string + }{ + { + columnName: "_ts", + isTimestamp: true, + description: "System timestamp display column should be detected", + }, + { + columnName: "_timestamp_ns", + isTimestamp: true, + description: "Internal timestamp column should be detected", + }, + { + columnName: "user_id", + isTimestamp: false, + description: "Non-timestamp column should not be detected as timestamp", + }, + { + columnName: "id", + isTimestamp: false, + description: "ID column should not be detected as timestamp", + }, + { + columnName: "status", + isTimestamp: false, + description: "Status column should not be detected as timestamp", + }, + { + columnName: "event_type", + isTimestamp: false, + description: "Event type column should not be detected as timestamp", + }, + } + + for _, tc := range testCases { + t.Run(tc.columnName, func(t *testing.T) { + isTimestamp := engine.SQLEngine.isTimestampColumn(tc.columnName) + if isTimestamp != tc.isTimestamp { + t.Errorf("Expected isTimestampColumn(%s)=%v, got %v. %s", + tc.columnName, tc.isTimestamp, isTimestamp, tc.description) + } + t.Logf("✅ Column '%s': isTimestamp=%v", tc.columnName, isTimestamp) + }) + } +} diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2584b54a6..eee57bc23 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -3,6 +3,7 @@ package engine import ( "container/heap" "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -145,6 +146,46 @@ type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats + // Optional file-level timestamp range from filer extended attributes + MinTimestampNs int64 + MaxTimestampNs int64 +} + +// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns +func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) { + if fileStats == nil { + return 0, 0, false + } + // Prefer column stats for _ts_ns if present + if len(fileStats.ColumnStats) > 0 { + if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil { + if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin { + if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax { + return minNs, maxNs, true + } + } + } + } + // Fallback to file-level range if present in filer extended metadata + if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 { + return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true + } + return 0, 0, false +} + +// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns +func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) { + if v == nil { + return 0, false + } + switch k := v.Kind.(type) { + case *schema_pb.Value_Int64Value: + return k.Int64Value, true + case *schema_pb.Value_Int32Value: + return int64(k.Int32Value), true + default: + return 0, false + } } // StreamingDataSource provides a streaming interface for reading scan results @@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } + // Populate optional min/max from filer extended attributes (writer stores ns timestamps) + if entry != nil && entry.Extended != nil { + if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) + } + if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) + } + } // Get schema information schema := fileView.Schema() diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go index 08be17fc0..edcd5bd9a 100644 --- a/weed/query/engine/types.go +++ b/weed/query/engine/types.go @@ -87,6 +87,12 @@ type QueryExecutionPlan struct { BufferStartIndex int64 `json:"buffer_start_index,omitempty"` } +// Plan detail keys +const ( + PlanDetailStartTimeNs = "StartTimeNs" + PlanDetailStopTimeNs = "StopTimeNs" +) + // QueryResult represents the result of a SQL query execution type QueryResult struct { Columns []string `json:"columns"`