From eaa7136c92fa0c994c9f78d03f440b4aeb99afe5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 17:19:36 -0700 Subject: [PATCH] explain the execution plan --- weed/query/engine/engine.go | 208 +++++++++++++++++++++++++++++------- 1 file changed, 168 insertions(+), 40 deletions(-) diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 47ad41868..be695d364 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -280,14 +280,17 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [ if plan.LiveLogFilesScanned > 0 { stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned)) } - if plan.TotalRowsProcessed > 0 { - // Check if this is for aggregations (where we show both scanned and returned) - if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults { - stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed)) - stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned)) - } else { - stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) + // Always show row statistics for aggregations, even if 0 (to show fast path efficiency) + if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults { + stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed)) + stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned)) + + // Add fast path explanation when no rows were scanned + if plan.TotalRowsProcessed == 0 { + stats = append(stats, "Scan Method: Parquet Metadata Only") } + } else if plan.TotalRowsProcessed > 0 { + stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) } for i, stat := range stats { @@ -313,11 +316,11 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [ // Filter out details that are shown elsewhere filteredDetails := make([]string, 0) for key, value := range plan.Details { - if key != "results_returned" { // Skip as it's shown in Statistics section + if key != "results_returned" { // Skip as it's shown in Statistics section filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value)) } } - + if len(filteredDetails) > 0 { // Performance is always present, so check if there are errors after Details hasMore := err != nil @@ -428,47 +431,117 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq // Execute the query result, err := e.executeSelectStatement(ctx, stmt) - if err == nil && result != nil { - // Try to get topic information for partition count and row processing stats + if err == nil && result != nil { + // Extract table name for use in execution strategy determination + var tableName string if len(stmt.From) == 1 { if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(sqlparser.TableName); ok { - tableName := tableExpr.Name.String() - // Try to discover partitions for statistics - if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil { - plan.PartitionsScanned = len(partitions) + tableName = tableExpr.Name.String() + } + } + } + + // Try to get topic information for partition count and row processing stats + if tableName != "" { + // Try to discover partitions for statistics + if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil { + plan.PartitionsScanned = len(partitions) + } + + // For aggregations, determine actual processing based on execution strategy + if hasAggregations { + plan.Details["results_returned"] = len(result.Rows) + + // Determine actual work done based on execution strategy + if stmt.Where == nil { + // Use the same logic as actual execution to determine if fast path was used + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, _ = e.catalog.brokerClient.GetFilerClient() + } + + hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, "test", tableName) + var canUseFastPath bool + if scannerErr == nil { + // Test if fast path can be used (same as actual execution) + _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) + canUseFastPath = canOptimize + } else { + // Fallback to simple check + canUseFastPath = true + for _, spec := range aggregations { + if !e.canUseParquetStatsForAggregation(spec) { + canUseFastPath = false + break + } + } } - - // For aggregations, get actual row count scanned, not result count - if hasAggregations { - // Try to get the actual row count from the topic + + if canUseFastPath { + // Fast path: minimal scanning (only live logs that weren't converted) + if actualScanCount, countErr := e.getActualRowsScannedForFastPath("test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualScanCount + } else { + plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning + } + } else { + // Full scan: count all rows if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { plan.TotalRowsProcessed = actualRowCount - plan.Details["results_returned"] = len(result.Rows) } else { - // Fallback: use result rows but indicate it's not the scan count plan.TotalRowsProcessed = int64(len(result.Rows)) - plan.Details["note"] = "actual_scan_count_unavailable" + plan.Details["note"] = "scan_count_unavailable" } + } + } else { + // With WHERE clause: full scan required + if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualRowCount } else { - // For non-aggregations, result count is meaningful plan.TotalRowsProcessed = int64(len(result.Rows)) + plan.Details["note"] = "scan_count_unavailable" } } + } else { + // For non-aggregations, result count is meaningful + plan.TotalRowsProcessed = int64(len(result.Rows)) } } - // Determine execution strategy based on query type + // Determine execution strategy based on query type (reuse fast path detection from above) if hasAggregations { // For aggregations, determine if fast path conditions are met if stmt.Where == nil { - // Check if aggregations can use fast path - canUseFastPath := true - for _, spec := range aggregations { - if !e.canUseParquetStatsForAggregation(spec) { - canUseFastPath = false - break + // Reuse the same logic used above for row counting + var canUseFastPath bool + if tableName != "" { + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, _ = e.catalog.brokerClient.GetFilerClient() + } + + if filerClient != nil { + hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, "test", tableName) + if scannerErr == nil { + // Test if fast path can be used (same as actual execution) + _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) + canUseFastPath = canOptimize + } else { + canUseFastPath = false + } + } else { + // Fallback check + canUseFastPath = true + for _, spec := range aggregations { + if !e.canUseParquetStatsForAggregation(spec) { + canUseFastPath = false + break + } + } } + } else { + canUseFastPath = false } if canUseFastPath { @@ -1813,13 +1886,13 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner partitionsWithLiveLogs := 0 for _, partition := range partitions { - partitionPath := fmt.Sprintf("/topics/%s/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition) + // partition is already a full path like "/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630" + partitionPath := partition - // Get parquet file statistics (always try this) + // Get parquet file statistics (try this, but don't fail if missing) fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath) if err != nil { - // If we can't read stats from any partition, fall back to full scan - return nil, false + fileStats = []*ParquetFileStats{} // Empty stats, but continue } if len(fileStats) > 0 { @@ -1835,8 +1908,8 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner // Check if there are live log files and count their rows (excluding parquet-converted files) liveLogRowCount, err := e.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles) if err != nil { - // If we can't count live logs, fall back to full scan - return nil, false + // Set to 0 for this partition and continue (no live logs is acceptable) + liveLogRowCount = 0 } if liveLogRowCount > 0 { totalLiveLogRowCount += liveLogRowCount @@ -2674,7 +2747,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e return 0, err } - // Convert relative partition paths to full paths + // Convert relative partition paths to full paths topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName) partitions := make([]string, len(relativePartitions)) for i, relPartition := range relativePartitions { @@ -2682,7 +2755,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e } totalRowCount := int64(0) - + // For each partition, count both parquet and live log rows for _, partition := range partitions { // Count parquet rows @@ -2692,13 +2765,13 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e totalRowCount += stats.RowCount } } - + // Count live log rows (with deduplication) parquetSourceFiles := make(map[string]bool) if parquetErr == nil { parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) } - + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) if liveLogErr == nil { totalRowCount += liveLogCount @@ -2708,6 +2781,61 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e return totalRowCount, nil } +// getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations +// (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only) +func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) (int64, error) { + // Create a hybrid scanner to access parquet statistics + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + return 0, filerClientErr + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, namespace, topicName) + if err != nil { + return 0, err + } + + // Get all partitions for this topic + relativePartitions, err := e.discoverTopicPartitions(namespace, topicName) + if err != nil { + return 0, err + } + + // Convert relative partition paths to full paths + topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName) + partitions := make([]string, len(relativePartitions)) + for i, relPartition := range relativePartitions { + partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) + } + + totalScannedRows := int64(0) + + // For each partition, count ONLY the live log rows that need scanning + // (parquet files use metadata/statistics, so they contribute 0 to scan count) + for _, partition := range partitions { + // Get parquet files to determine what was converted + parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) + parquetSourceFiles := make(map[string]bool) + if parquetErr == nil { + parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) + } + + // Count only live log rows that haven't been converted to parquet + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) + if liveLogErr == nil { + totalScannedRows += liveLogCount + } + + // Note: Parquet files contribute 0 to scan count since we use their metadata/statistics + } + + return totalScannedRows, nil +} + func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { switch spec.Function { case "COUNT":