Browse Source

explain the execution plan

pull/7185/head
chrislu 1 month ago
parent
commit
eaa7136c92
  1. 208
      weed/query/engine/engine.go

208
weed/query/engine/engine.go

@ -280,14 +280,17 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [
if plan.LiveLogFilesScanned > 0 { if plan.LiveLogFilesScanned > 0 {
stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned)) stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned))
} }
if plan.TotalRowsProcessed > 0 {
// Check if this is for aggregations (where we show both scanned and returned)
if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults {
stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed))
stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned))
} else {
stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed))
// Always show row statistics for aggregations, even if 0 (to show fast path efficiency)
if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults {
stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed))
stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned))
// Add fast path explanation when no rows were scanned
if plan.TotalRowsProcessed == 0 {
stats = append(stats, "Scan Method: Parquet Metadata Only")
} }
} else if plan.TotalRowsProcessed > 0 {
stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed))
} }
for i, stat := range stats { for i, stat := range stats {
@ -313,11 +316,11 @@ func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) [
// Filter out details that are shown elsewhere // Filter out details that are shown elsewhere
filteredDetails := make([]string, 0) filteredDetails := make([]string, 0)
for key, value := range plan.Details { for key, value := range plan.Details {
if key != "results_returned" { // Skip as it's shown in Statistics section
if key != "results_returned" { // Skip as it's shown in Statistics section
filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value)) filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value))
} }
} }
if len(filteredDetails) > 0 { if len(filteredDetails) > 0 {
// Performance is always present, so check if there are errors after Details // Performance is always present, so check if there are errors after Details
hasMore := err != nil hasMore := err != nil
@ -428,47 +431,117 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq
// Execute the query // Execute the query
result, err := e.executeSelectStatement(ctx, stmt) result, err := e.executeSelectStatement(ctx, stmt)
if err == nil && result != nil {
// Try to get topic information for partition count and row processing stats
if err == nil && result != nil {
// Extract table name for use in execution strategy determination
var tableName string
if len(stmt.From) == 1 { if len(stmt.From) == 1 {
if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok { if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(sqlparser.TableName); ok { if tableExpr, ok := table.Expr.(sqlparser.TableName); ok {
tableName := tableExpr.Name.String()
// Try to discover partitions for statistics
if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil {
plan.PartitionsScanned = len(partitions)
tableName = tableExpr.Name.String()
}
}
}
// Try to get topic information for partition count and row processing stats
if tableName != "" {
// Try to discover partitions for statistics
if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil {
plan.PartitionsScanned = len(partitions)
}
// For aggregations, determine actual processing based on execution strategy
if hasAggregations {
plan.Details["results_returned"] = len(result.Rows)
// Determine actual work done based on execution strategy
if stmt.Where == nil {
// Use the same logic as actual execution to determine if fast path was used
var filerClient filer_pb.FilerClient
if e.catalog.brokerClient != nil {
filerClient, _ = e.catalog.brokerClient.GetFilerClient()
}
hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, "test", tableName)
var canUseFastPath bool
if scannerErr == nil {
// Test if fast path can be used (same as actual execution)
_, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
canUseFastPath = canOptimize
} else {
// Fallback to simple check
canUseFastPath = true
for _, spec := range aggregations {
if !e.canUseParquetStatsForAggregation(spec) {
canUseFastPath = false
break
}
}
} }
// For aggregations, get actual row count scanned, not result count
if hasAggregations {
// Try to get the actual row count from the topic
if canUseFastPath {
// Fast path: minimal scanning (only live logs that weren't converted)
if actualScanCount, countErr := e.getActualRowsScannedForFastPath("test", tableName); countErr == nil {
plan.TotalRowsProcessed = actualScanCount
} else {
plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning
}
} else {
// Full scan: count all rows
if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil {
plan.TotalRowsProcessed = actualRowCount plan.TotalRowsProcessed = actualRowCount
plan.Details["results_returned"] = len(result.Rows)
} else { } else {
// Fallback: use result rows but indicate it's not the scan count
plan.TotalRowsProcessed = int64(len(result.Rows)) plan.TotalRowsProcessed = int64(len(result.Rows))
plan.Details["note"] = "actual_scan_count_unavailable"
plan.Details["note"] = "scan_count_unavailable"
} }
}
} else {
// With WHERE clause: full scan required
if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil {
plan.TotalRowsProcessed = actualRowCount
} else { } else {
// For non-aggregations, result count is meaningful
plan.TotalRowsProcessed = int64(len(result.Rows)) plan.TotalRowsProcessed = int64(len(result.Rows))
plan.Details["note"] = "scan_count_unavailable"
} }
} }
} else {
// For non-aggregations, result count is meaningful
plan.TotalRowsProcessed = int64(len(result.Rows))
} }
} }
// Determine execution strategy based on query type
// Determine execution strategy based on query type (reuse fast path detection from above)
if hasAggregations { if hasAggregations {
// For aggregations, determine if fast path conditions are met // For aggregations, determine if fast path conditions are met
if stmt.Where == nil { if stmt.Where == nil {
// Check if aggregations can use fast path
canUseFastPath := true
for _, spec := range aggregations {
if !e.canUseParquetStatsForAggregation(spec) {
canUseFastPath = false
break
// Reuse the same logic used above for row counting
var canUseFastPath bool
if tableName != "" {
var filerClient filer_pb.FilerClient
if e.catalog.brokerClient != nil {
filerClient, _ = e.catalog.brokerClient.GetFilerClient()
}
if filerClient != nil {
hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, "test", tableName)
if scannerErr == nil {
// Test if fast path can be used (same as actual execution)
_, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
canUseFastPath = canOptimize
} else {
canUseFastPath = false
}
} else {
// Fallback check
canUseFastPath = true
for _, spec := range aggregations {
if !e.canUseParquetStatsForAggregation(spec) {
canUseFastPath = false
break
}
}
} }
} else {
canUseFastPath = false
} }
if canUseFastPath { if canUseFastPath {
@ -1813,13 +1886,13 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
partitionsWithLiveLogs := 0 partitionsWithLiveLogs := 0
for _, partition := range partitions { for _, partition := range partitions {
partitionPath := fmt.Sprintf("/topics/%s/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition)
// partition is already a full path like "/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630"
partitionPath := partition
// Get parquet file statistics (always try this)
// Get parquet file statistics (try this, but don't fail if missing)
fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath) fileStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err != nil { if err != nil {
// If we can't read stats from any partition, fall back to full scan
return nil, false
fileStats = []*ParquetFileStats{} // Empty stats, but continue
} }
if len(fileStats) > 0 { if len(fileStats) > 0 {
@ -1835,8 +1908,8 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner
// Check if there are live log files and count their rows (excluding parquet-converted files) // Check if there are live log files and count their rows (excluding parquet-converted files)
liveLogRowCount, err := e.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles) liveLogRowCount, err := e.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSourceFiles)
if err != nil { if err != nil {
// If we can't count live logs, fall back to full scan
return nil, false
// Set to 0 for this partition and continue (no live logs is acceptable)
liveLogRowCount = 0
} }
if liveLogRowCount > 0 { if liveLogRowCount > 0 {
totalLiveLogRowCount += liveLogRowCount totalLiveLogRowCount += liveLogRowCount
@ -2674,7 +2747,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e
return 0, err return 0, err
} }
// Convert relative partition paths to full paths
// Convert relative partition paths to full paths
topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName) topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
partitions := make([]string, len(relativePartitions)) partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions { for i, relPartition := range relativePartitions {
@ -2682,7 +2755,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e
} }
totalRowCount := int64(0) totalRowCount := int64(0)
// For each partition, count both parquet and live log rows // For each partition, count both parquet and live log rows
for _, partition := range partitions { for _, partition := range partitions {
// Count parquet rows // Count parquet rows
@ -2692,13 +2765,13 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e
totalRowCount += stats.RowCount totalRowCount += stats.RowCount
} }
} }
// Count live log rows (with deduplication) // Count live log rows (with deduplication)
parquetSourceFiles := make(map[string]bool) parquetSourceFiles := make(map[string]bool)
if parquetErr == nil { if parquetErr == nil {
parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) parquetSourceFiles = e.extractParquetSourceFiles(parquetStats)
} }
liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles)
if liveLogErr == nil { if liveLogErr == nil {
totalRowCount += liveLogCount totalRowCount += liveLogCount
@ -2708,6 +2781,61 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e
return totalRowCount, nil return totalRowCount, nil
} }
// getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations
// (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only)
func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) (int64, error) {
// Create a hybrid scanner to access parquet statistics
var filerClient filer_pb.FilerClient
if e.catalog.brokerClient != nil {
var filerClientErr error
filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
if filerClientErr != nil {
return 0, filerClientErr
}
}
hybridScanner, err := NewHybridMessageScanner(filerClient, namespace, topicName)
if err != nil {
return 0, err
}
// Get all partitions for this topic
relativePartitions, err := e.discoverTopicPartitions(namespace, topicName)
if err != nil {
return 0, err
}
// Convert relative partition paths to full paths
topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions {
partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition)
}
totalScannedRows := int64(0)
// For each partition, count ONLY the live log rows that need scanning
// (parquet files use metadata/statistics, so they contribute 0 to scan count)
for _, partition := range partitions {
// Get parquet files to determine what was converted
parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition)
parquetSourceFiles := make(map[string]bool)
if parquetErr == nil {
parquetSourceFiles = e.extractParquetSourceFiles(parquetStats)
}
// Count only live log rows that haven't been converted to parquet
liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles)
if liveLogErr == nil {
totalScannedRows += liveLogCount
}
// Note: Parquet files contribute 0 to scan count since we use their metadata/statistics
}
return totalScannedRows, nil
}
func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value {
switch spec.Function { switch spec.Function {
case "COUNT": case "COUNT":

Loading…
Cancel
Save