Browse Source

explain

pull/7185/head
chrislu 1 month ago
parent
commit
93a09f5da4
  1. 6
      weed/command/sql.go
  2. 483
      weed/query/engine/engine.go

6
weed/command/sql.go

@ -539,6 +539,11 @@ ADVANCED QUERYING:
SELECT * FROM table WHERE id <= 100; - Range filtering
SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching
SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value
SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions
QUERY ANALYSIS:
EXPLAIN SELECT ...; - Show hierarchical execution plan
(data sources, optimizations, timing)
DDL OPERATIONS:
CREATE TABLE topic (field1 INT, field2 STRING); - Create topic
@ -561,6 +566,7 @@ EXAMPLES:
SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted';
SELECT username FROM users WHERE email LIKE '%@company.com';
SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01';
EXPLAIN SELECT MAX(id) FROM events; -- View execution plan
Current Status: Full WHERE clause support + Real MQ integration`)
}

483
weed/query/engine/engine.go

@ -31,11 +31,28 @@ type SQLEngine struct {
catalog *SchemaCatalog
}
// QueryExecutionPlan contains information about how a query was executed
type QueryExecutionPlan struct {
QueryType string `json:"query_type"` // SELECT, INSERT, etc.
ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid
DataSources []string `json:"data_sources"` // parquet_files, live_logs
PartitionsScanned int `json:"partitions_scanned"`
ParquetFilesScanned int `json:"parquet_files_scanned"`
LiveLogFilesScanned int `json:"live_log_files_scanned"`
TotalRowsProcessed int64 `json:"total_rows_processed"`
OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc.
TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"`
Aggregations []string `json:"aggregations,omitempty"`
ExecutionTimeMs float64 `json:"execution_time_ms"`
Details map[string]interface{} `json:"details,omitempty"`
}
// QueryResult represents the result of a SQL query execution
type QueryResult struct {
Columns []string `json:"columns"`
Rows [][]sqltypes.Value `json:"rows"`
Error error `json:"error,omitempty"`
Columns []string `json:"columns"`
Rows [][]sqltypes.Value `json:"rows"`
Error error `json:"error,omitempty"`
ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"`
}
// NewSQLEngine creates a new SQL execution engine
@ -64,8 +81,18 @@ func (e *SQLEngine) GetCatalog() *SchemaCatalog {
// 3. DML operations (SELECT) query Parquet files directly
// 4. Error handling follows MySQL conventions
func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) {
startTime := time.Now()
// Handle EXPLAIN as a special case
sqlTrimmed := strings.TrimSpace(sql)
sqlUpper := strings.ToUpper(sqlTrimmed)
if strings.HasPrefix(sqlUpper, "EXPLAIN") {
// Extract the actual query after EXPLAIN
actualSQL := strings.TrimSpace(sqlTrimmed[7:]) // Remove "EXPLAIN"
return e.executeExplain(ctx, actualSQL, startTime)
}
// Handle DESCRIBE/DESC as a special case since it's not parsed as a standard statement
sqlUpper := strings.ToUpper(strings.TrimSpace(sql))
if strings.HasPrefix(sqlUpper, "DESCRIBE") || strings.HasPrefix(sqlUpper, "DESC") {
return e.handleDescribeCommand(ctx, sql)
}
@ -92,6 +119,271 @@ func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, e
}
}
// executeExplain handles EXPLAIN statements by executing the query with plan tracking
func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) {
// Parse the actual SQL statement
stmt, err := sqlparser.Parse(actualSQL)
if err != nil {
return &QueryResult{
Error: fmt.Errorf("SQL parse error in EXPLAIN query: %v", err),
}, err
}
// Create execution plan
plan := &QueryExecutionPlan{
QueryType: strings.ToUpper(strings.Fields(actualSQL)[0]),
DataSources: []string{},
OptimizationsUsed: []string{},
Details: make(map[string]interface{}),
}
var result *QueryResult
// Route to appropriate handler based on statement type (with plan tracking)
switch stmt := stmt.(type) {
case *sqlparser.Select:
result, err = e.executeSelectStatementWithPlan(ctx, stmt, plan)
if err != nil {
plan.Details["error"] = err.Error()
}
case *sqlparser.Show:
plan.QueryType = "SHOW"
plan.ExecutionStrategy = "metadata_only"
result, err = e.executeShowStatementWithDescribe(ctx, stmt)
default:
err := fmt.Errorf("EXPLAIN not supported for statement type: %T", stmt)
return &QueryResult{Error: err}, err
}
// Calculate execution time
plan.ExecutionTimeMs = float64(time.Since(startTime).Nanoseconds()) / 1e6
// Format execution plan as result
return e.formatExecutionPlan(plan, result, err)
}
// formatExecutionPlan converts execution plan to a hierarchical tree format for display
func (e *SQLEngine) formatExecutionPlan(plan *QueryExecutionPlan, originalResult *QueryResult, originalErr error) (*QueryResult, error) {
columns := []string{"Query Execution Plan"}
rows := [][]sqltypes.Value{}
// Build hierarchical plan display
planLines := e.buildHierarchicalPlan(plan, originalErr)
for _, line := range planLines {
rows = append(rows, []sqltypes.Value{
sqltypes.NewVarChar(line),
})
}
if originalErr != nil {
return &QueryResult{
Columns: columns,
Rows: rows,
ExecutionPlan: plan,
Error: originalErr,
}, originalErr
}
return &QueryResult{
Columns: columns,
Rows: rows,
ExecutionPlan: plan,
}, nil
}
// buildHierarchicalPlan creates a tree-like structure for the execution plan
func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) []string {
var lines []string
// Root node - Query type and strategy
lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy))
// Aggregations section (if present)
if len(plan.Aggregations) > 0 {
lines = append(lines, "├── Aggregations")
for i, agg := range plan.Aggregations {
if i == len(plan.Aggregations)-1 {
lines = append(lines, fmt.Sprintf("│ └── %s", agg))
} else {
lines = append(lines, fmt.Sprintf("│ ├── %s", agg))
}
}
}
// Data Sources section
if len(plan.DataSources) > 0 {
hasMore := len(plan.OptimizationsUsed) > 0 || plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil
if hasMore {
lines = append(lines, "├── Data Sources")
} else {
lines = append(lines, "└── Data Sources")
}
for i, source := range plan.DataSources {
prefix := "│ "
if !hasMore && i == len(plan.DataSources)-1 {
prefix = " "
}
if i == len(plan.DataSources)-1 {
lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatDataSource(source)))
} else {
lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatDataSource(source)))
}
}
}
// Optimizations section
if len(plan.OptimizationsUsed) > 0 {
hasMore := plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil
if hasMore {
lines = append(lines, "├── Optimizations")
} else {
lines = append(lines, "└── Optimizations")
}
for i, opt := range plan.OptimizationsUsed {
prefix := "│ "
if !hasMore && i == len(plan.OptimizationsUsed)-1 {
prefix = " "
}
if i == len(plan.OptimizationsUsed)-1 {
lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatOptimization(opt)))
} else {
lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatOptimization(opt)))
}
}
}
// Statistics section
statisticsPresent := plan.PartitionsScanned > 0 || plan.ParquetFilesScanned > 0 ||
plan.LiveLogFilesScanned > 0 || plan.TotalRowsProcessed > 0
if statisticsPresent {
// Performance is always present, so Statistics is never the last section
hasMoreAfterStats := true
if hasMoreAfterStats {
lines = append(lines, "├── Statistics")
} else {
lines = append(lines, "└── Statistics")
}
stats := []string{}
if plan.PartitionsScanned > 0 {
stats = append(stats, fmt.Sprintf("Partitions Scanned: %d", plan.PartitionsScanned))
}
if plan.ParquetFilesScanned > 0 {
stats = append(stats, fmt.Sprintf("Parquet Files: %d", plan.ParquetFilesScanned))
}
if plan.LiveLogFilesScanned > 0 {
stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned))
}
if plan.TotalRowsProcessed > 0 {
// Check if this is for aggregations (where we show both scanned and returned)
if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults {
stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed))
stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned))
} else {
stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed))
}
}
for i, stat := range stats {
if hasMoreAfterStats {
// More sections after Statistics, so use │ prefix
if i == len(stats)-1 {
lines = append(lines, fmt.Sprintf("│ └── %s", stat))
} else {
lines = append(lines, fmt.Sprintf("│ ├── %s", stat))
}
} else {
// This is the last main section, so use space prefix for final item
if i == len(stats)-1 {
lines = append(lines, fmt.Sprintf(" └── %s", stat))
} else {
lines = append(lines, fmt.Sprintf(" ├── %s", stat))
}
}
}
}
// Details section
// Filter out details that are shown elsewhere
filteredDetails := make([]string, 0)
for key, value := range plan.Details {
if key != "results_returned" { // Skip as it's shown in Statistics section
filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value))
}
}
if len(filteredDetails) > 0 {
// Performance is always present, so check if there are errors after Details
hasMore := err != nil
if hasMore {
lines = append(lines, "├── Details")
} else {
lines = append(lines, "├── Details") // Performance always comes after
}
for i, detail := range filteredDetails {
if i == len(filteredDetails)-1 {
lines = append(lines, fmt.Sprintf("│ └── %s", detail))
} else {
lines = append(lines, fmt.Sprintf("│ ├── %s", detail))
}
}
}
// Performance section (always present)
if err != nil {
lines = append(lines, "├── Performance")
lines = append(lines, fmt.Sprintf("│ └── Execution Time: %.3fms", plan.ExecutionTimeMs))
lines = append(lines, "└── Error")
lines = append(lines, fmt.Sprintf(" └── %s", err.Error()))
} else {
lines = append(lines, "└── Performance")
lines = append(lines, fmt.Sprintf(" └── Execution Time: %.3fms", plan.ExecutionTimeMs))
}
return lines
}
// formatDataSource provides user-friendly names for data sources
func (e *SQLEngine) formatDataSource(source string) string {
switch source {
case "parquet_stats":
return "Parquet Statistics (fast path)"
case "parquet_files":
return "Parquet Files (full scan)"
case "live_logs":
return "Live Log Files"
default:
return source
}
}
// formatOptimization provides user-friendly names for optimizations
func (e *SQLEngine) formatOptimization(opt string) string {
switch opt {
case "parquet_statistics":
return "Parquet Statistics Usage"
case "live_log_counting":
return "Live Log Row Counting"
case "deduplication":
return "Duplicate Data Avoidance"
case "predicate_pushdown":
return "WHERE Clause Pushdown"
case "column_projection":
return "Column Selection"
case "limit_pushdown":
return "LIMIT Optimization"
default:
return opt
}
}
// executeDDLStatement handles CREATE, ALTER, DROP operations
// Assumption: These operations modify the underlying MQ topic structure
func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) {
@ -108,6 +400,132 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *sqlparser.DDL
}
}
// executeSelectStatementWithPlan handles SELECT queries with execution plan tracking
func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sqlparser.Select, plan *QueryExecutionPlan) (*QueryResult, error) {
// Parse aggregations to populate plan
var aggregations []AggregationSpec
hasAggregations := false
selectAll := false
for _, selectExpr := range stmt.SelectExprs {
switch expr := selectExpr.(type) {
case *sqlparser.StarExpr:
selectAll = true
case *sqlparser.AliasedExpr:
switch col := expr.Expr.(type) {
case *sqlparser.FuncExpr:
// This is an aggregation function
aggSpec, _ := e.parseAggregationFunction(col, expr)
if aggSpec != nil {
aggregations = append(aggregations, *aggSpec)
hasAggregations = true
plan.Aggregations = append(plan.Aggregations, aggSpec.Function+"("+aggSpec.Column+")")
}
}
}
}
// Execute the query
result, err := e.executeSelectStatement(ctx, stmt)
if err == nil && result != nil {
// Try to get topic information for partition count and row processing stats
if len(stmt.From) == 1 {
if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(sqlparser.TableName); ok {
tableName := tableExpr.Name.String()
// Try to discover partitions for statistics
if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil {
plan.PartitionsScanned = len(partitions)
}
// For aggregations, get actual row count scanned, not result count
if hasAggregations {
// Try to get the actual row count from the topic
if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil {
plan.TotalRowsProcessed = actualRowCount
plan.Details["results_returned"] = len(result.Rows)
} else {
// Fallback: use result rows but indicate it's not the scan count
plan.TotalRowsProcessed = int64(len(result.Rows))
plan.Details["note"] = "actual_scan_count_unavailable"
}
} else {
// For non-aggregations, result count is meaningful
plan.TotalRowsProcessed = int64(len(result.Rows))
}
}
}
}
// Determine execution strategy based on query type
if hasAggregations {
// For aggregations, determine if fast path conditions are met
if stmt.Where == nil {
// Check if aggregations can use fast path
canUseFastPath := true
for _, spec := range aggregations {
if !e.canUseParquetStatsForAggregation(spec) {
canUseFastPath = false
break
}
}
if canUseFastPath {
plan.ExecutionStrategy = "hybrid_fast_path"
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "parquet_statistics", "live_log_counting", "deduplication")
plan.DataSources = []string{"parquet_stats", "live_logs"}
} else {
plan.ExecutionStrategy = "full_scan"
plan.DataSources = []string{"live_logs", "parquet_files"}
}
} else {
plan.ExecutionStrategy = "full_scan"
plan.DataSources = []string{"live_logs", "parquet_files"}
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown")
}
} else {
// For regular SELECT queries
if selectAll {
plan.ExecutionStrategy = "hybrid_scan"
plan.DataSources = []string{"live_logs", "parquet_files"}
} else {
plan.ExecutionStrategy = "column_projection"
plan.DataSources = []string{"live_logs", "parquet_files"}
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_projection")
}
}
// Add WHERE clause information
if stmt.Where != nil {
// Only add predicate_pushdown if not already added
alreadyHasPredicate := false
for _, opt := range plan.OptimizationsUsed {
if opt == "predicate_pushdown" {
alreadyHasPredicate = true
break
}
}
if !alreadyHasPredicate {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown")
}
plan.Details["where_clause"] = "present"
}
// Add LIMIT information
if stmt.Limit != nil {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "limit_pushdown")
if stmt.Limit.Rowcount != nil {
if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal {
plan.Details["limit"] = string(limitExpr.Val)
}
}
}
}
return result, err
}
// executeSelectStatement handles SELECT queries
// Assumptions:
// 1. Queries run against Parquet files in MQ topics
@ -2233,6 +2651,63 @@ func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]stri
return partitions, err
}
// getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs)
func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, error) {
// Create a hybrid scanner to access parquet statistics
var filerClient filer_pb.FilerClient
if e.catalog.brokerClient != nil {
var filerClientErr error
filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
if filerClientErr != nil {
return 0, filerClientErr
}
}
hybridScanner, err := NewHybridMessageScanner(filerClient, namespace, topicName)
if err != nil {
return 0, err
}
// Get all partitions for this topic
relativePartitions, err := e.discoverTopicPartitions(namespace, topicName)
if err != nil {
return 0, err
}
// Convert relative partition paths to full paths
topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
partitions := make([]string, len(relativePartitions))
for i, relPartition := range relativePartitions {
partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition)
}
totalRowCount := int64(0)
// For each partition, count both parquet and live log rows
for _, partition := range partitions {
// Count parquet rows
parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition)
if parquetErr == nil {
for _, stats := range parquetStats {
totalRowCount += stats.RowCount
}
}
// Count live log rows (with deduplication)
parquetSourceFiles := make(map[string]bool)
if parquetErr == nil {
parquetSourceFiles = e.extractParquetSourceFiles(parquetStats)
}
liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles)
if liveLogErr == nil {
totalRowCount += liveLogCount
}
}
return totalRowCount, nil
}
func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value {
switch spec.Function {
case "COUNT":

Loading…
Cancel
Save