diff --git a/weed/command/sql.go b/weed/command/sql.go index 743bb42ca..99722adf7 100644 --- a/weed/command/sql.go +++ b/weed/command/sql.go @@ -539,6 +539,11 @@ ADVANCED QUERYING: SELECT * FROM table WHERE id <= 100; - Range filtering SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value + SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions + +QUERY ANALYSIS: + EXPLAIN SELECT ...; - Show hierarchical execution plan + (data sources, optimizations, timing) DDL OPERATIONS: CREATE TABLE topic (field1 INT, field2 STRING); - Create topic @@ -561,6 +566,7 @@ EXAMPLES: SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted'; SELECT username FROM users WHERE email LIKE '%@company.com'; SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01'; + EXPLAIN SELECT MAX(id) FROM events; -- View execution plan Current Status: Full WHERE clause support + Real MQ integration`) } diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index e1bab2eb7..47ad41868 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -31,11 +31,28 @@ type SQLEngine struct { catalog *SchemaCatalog } +// QueryExecutionPlan contains information about how a query was executed +type QueryExecutionPlan struct { + QueryType string `json:"query_type"` // SELECT, INSERT, etc. + ExecutionStrategy string `json:"execution_strategy"` // fast_path, full_scan, hybrid + DataSources []string `json:"data_sources"` // parquet_files, live_logs + PartitionsScanned int `json:"partitions_scanned"` + ParquetFilesScanned int `json:"parquet_files_scanned"` + LiveLogFilesScanned int `json:"live_log_files_scanned"` + TotalRowsProcessed int64 `json:"total_rows_processed"` + OptimizationsUsed []string `json:"optimizations_used"` // parquet_stats, predicate_pushdown, etc. + TimeRangeFilters map[string]interface{} `json:"time_range_filters,omitempty"` + Aggregations []string `json:"aggregations,omitempty"` + ExecutionTimeMs float64 `json:"execution_time_ms"` + Details map[string]interface{} `json:"details,omitempty"` +} + // QueryResult represents the result of a SQL query execution type QueryResult struct { - Columns []string `json:"columns"` - Rows [][]sqltypes.Value `json:"rows"` - Error error `json:"error,omitempty"` + Columns []string `json:"columns"` + Rows [][]sqltypes.Value `json:"rows"` + Error error `json:"error,omitempty"` + ExecutionPlan *QueryExecutionPlan `json:"execution_plan,omitempty"` } // NewSQLEngine creates a new SQL execution engine @@ -64,8 +81,18 @@ func (e *SQLEngine) GetCatalog() *SchemaCatalog { // 3. DML operations (SELECT) query Parquet files directly // 4. Error handling follows MySQL conventions func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { + startTime := time.Now() + + // Handle EXPLAIN as a special case + sqlTrimmed := strings.TrimSpace(sql) + sqlUpper := strings.ToUpper(sqlTrimmed) + if strings.HasPrefix(sqlUpper, "EXPLAIN") { + // Extract the actual query after EXPLAIN + actualSQL := strings.TrimSpace(sqlTrimmed[7:]) // Remove "EXPLAIN" + return e.executeExplain(ctx, actualSQL, startTime) + } + // Handle DESCRIBE/DESC as a special case since it's not parsed as a standard statement - sqlUpper := strings.ToUpper(strings.TrimSpace(sql)) if strings.HasPrefix(sqlUpper, "DESCRIBE") || strings.HasPrefix(sqlUpper, "DESC") { return e.handleDescribeCommand(ctx, sql) } @@ -92,6 +119,271 @@ func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, e } } +// executeExplain handles EXPLAIN statements by executing the query with plan tracking +func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) { + // Parse the actual SQL statement + stmt, err := sqlparser.Parse(actualSQL) + if err != nil { + return &QueryResult{ + Error: fmt.Errorf("SQL parse error in EXPLAIN query: %v", err), + }, err + } + + // Create execution plan + plan := &QueryExecutionPlan{ + QueryType: strings.ToUpper(strings.Fields(actualSQL)[0]), + DataSources: []string{}, + OptimizationsUsed: []string{}, + Details: make(map[string]interface{}), + } + + var result *QueryResult + + // Route to appropriate handler based on statement type (with plan tracking) + switch stmt := stmt.(type) { + case *sqlparser.Select: + result, err = e.executeSelectStatementWithPlan(ctx, stmt, plan) + if err != nil { + plan.Details["error"] = err.Error() + } + case *sqlparser.Show: + plan.QueryType = "SHOW" + plan.ExecutionStrategy = "metadata_only" + result, err = e.executeShowStatementWithDescribe(ctx, stmt) + default: + err := fmt.Errorf("EXPLAIN not supported for statement type: %T", stmt) + return &QueryResult{Error: err}, err + } + + // Calculate execution time + plan.ExecutionTimeMs = float64(time.Since(startTime).Nanoseconds()) / 1e6 + + // Format execution plan as result + return e.formatExecutionPlan(plan, result, err) +} + +// formatExecutionPlan converts execution plan to a hierarchical tree format for display +func (e *SQLEngine) formatExecutionPlan(plan *QueryExecutionPlan, originalResult *QueryResult, originalErr error) (*QueryResult, error) { + columns := []string{"Query Execution Plan"} + rows := [][]sqltypes.Value{} + + // Build hierarchical plan display + planLines := e.buildHierarchicalPlan(plan, originalErr) + + for _, line := range planLines { + rows = append(rows, []sqltypes.Value{ + sqltypes.NewVarChar(line), + }) + } + + if originalErr != nil { + return &QueryResult{ + Columns: columns, + Rows: rows, + ExecutionPlan: plan, + Error: originalErr, + }, originalErr + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + ExecutionPlan: plan, + }, nil +} + +// buildHierarchicalPlan creates a tree-like structure for the execution plan +func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) []string { + var lines []string + + // Root node - Query type and strategy + lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy)) + + // Aggregations section (if present) + if len(plan.Aggregations) > 0 { + lines = append(lines, "├── Aggregations") + for i, agg := range plan.Aggregations { + if i == len(plan.Aggregations)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", agg)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", agg)) + } + } + } + + // Data Sources section + if len(plan.DataSources) > 0 { + hasMore := len(plan.OptimizationsUsed) > 0 || plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil + if hasMore { + lines = append(lines, "├── Data Sources") + } else { + lines = append(lines, "└── Data Sources") + } + + for i, source := range plan.DataSources { + prefix := "│ " + if !hasMore && i == len(plan.DataSources)-1 { + prefix = " " + } + + if i == len(plan.DataSources)-1 { + lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatDataSource(source))) + } else { + lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatDataSource(source))) + } + } + } + + // Optimizations section + if len(plan.OptimizationsUsed) > 0 { + hasMore := plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil + if hasMore { + lines = append(lines, "├── Optimizations") + } else { + lines = append(lines, "└── Optimizations") + } + + for i, opt := range plan.OptimizationsUsed { + prefix := "│ " + if !hasMore && i == len(plan.OptimizationsUsed)-1 { + prefix = " " + } + + if i == len(plan.OptimizationsUsed)-1 { + lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatOptimization(opt))) + } else { + lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatOptimization(opt))) + } + } + } + + // Statistics section + statisticsPresent := plan.PartitionsScanned > 0 || plan.ParquetFilesScanned > 0 || + plan.LiveLogFilesScanned > 0 || plan.TotalRowsProcessed > 0 + + if statisticsPresent { + // Performance is always present, so Statistics is never the last section + hasMoreAfterStats := true + if hasMoreAfterStats { + lines = append(lines, "├── Statistics") + } else { + lines = append(lines, "└── Statistics") + } + + stats := []string{} + if plan.PartitionsScanned > 0 { + stats = append(stats, fmt.Sprintf("Partitions Scanned: %d", plan.PartitionsScanned)) + } + if plan.ParquetFilesScanned > 0 { + stats = append(stats, fmt.Sprintf("Parquet Files: %d", plan.ParquetFilesScanned)) + } + if plan.LiveLogFilesScanned > 0 { + stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned)) + } + if plan.TotalRowsProcessed > 0 { + // Check if this is for aggregations (where we show both scanned and returned) + if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults { + stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed)) + stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned)) + } else { + stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) + } + } + + for i, stat := range stats { + if hasMoreAfterStats { + // More sections after Statistics, so use │ prefix + if i == len(stats)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", stat)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", stat)) + } + } else { + // This is the last main section, so use space prefix for final item + if i == len(stats)-1 { + lines = append(lines, fmt.Sprintf(" └── %s", stat)) + } else { + lines = append(lines, fmt.Sprintf(" ├── %s", stat)) + } + } + } + } + + // Details section + // Filter out details that are shown elsewhere + filteredDetails := make([]string, 0) + for key, value := range plan.Details { + if key != "results_returned" { // Skip as it's shown in Statistics section + filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value)) + } + } + + if len(filteredDetails) > 0 { + // Performance is always present, so check if there are errors after Details + hasMore := err != nil + if hasMore { + lines = append(lines, "├── Details") + } else { + lines = append(lines, "├── Details") // Performance always comes after + } + + for i, detail := range filteredDetails { + if i == len(filteredDetails)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", detail)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", detail)) + } + } + } + + // Performance section (always present) + if err != nil { + lines = append(lines, "├── Performance") + lines = append(lines, fmt.Sprintf("│ └── Execution Time: %.3fms", plan.ExecutionTimeMs)) + lines = append(lines, "└── Error") + lines = append(lines, fmt.Sprintf(" └── %s", err.Error())) + } else { + lines = append(lines, "└── Performance") + lines = append(lines, fmt.Sprintf(" └── Execution Time: %.3fms", plan.ExecutionTimeMs)) + } + + return lines +} + +// formatDataSource provides user-friendly names for data sources +func (e *SQLEngine) formatDataSource(source string) string { + switch source { + case "parquet_stats": + return "Parquet Statistics (fast path)" + case "parquet_files": + return "Parquet Files (full scan)" + case "live_logs": + return "Live Log Files" + default: + return source + } +} + +// formatOptimization provides user-friendly names for optimizations +func (e *SQLEngine) formatOptimization(opt string) string { + switch opt { + case "parquet_statistics": + return "Parquet Statistics Usage" + case "live_log_counting": + return "Live Log Row Counting" + case "deduplication": + return "Duplicate Data Avoidance" + case "predicate_pushdown": + return "WHERE Clause Pushdown" + case "column_projection": + return "Column Selection" + case "limit_pushdown": + return "LIMIT Optimization" + default: + return opt + } +} + // executeDDLStatement handles CREATE, ALTER, DROP operations // Assumption: These operations modify the underlying MQ topic structure func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *sqlparser.DDL) (*QueryResult, error) { @@ -108,6 +400,132 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *sqlparser.DDL } } +// executeSelectStatementWithPlan handles SELECT queries with execution plan tracking +func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sqlparser.Select, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse aggregations to populate plan + var aggregations []AggregationSpec + hasAggregations := false + selectAll := false + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *sqlparser.StarExpr: + selectAll = true + case *sqlparser.AliasedExpr: + switch col := expr.Expr.(type) { + case *sqlparser.FuncExpr: + // This is an aggregation function + aggSpec, _ := e.parseAggregationFunction(col, expr) + if aggSpec != nil { + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + plan.Aggregations = append(plan.Aggregations, aggSpec.Function+"("+aggSpec.Column+")") + } + } + } + } + + // Execute the query + result, err := e.executeSelectStatement(ctx, stmt) + + if err == nil && result != nil { + // Try to get topic information for partition count and row processing stats + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*sqlparser.AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(sqlparser.TableName); ok { + tableName := tableExpr.Name.String() + // Try to discover partitions for statistics + if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil { + plan.PartitionsScanned = len(partitions) + } + + // For aggregations, get actual row count scanned, not result count + if hasAggregations { + // Try to get the actual row count from the topic + if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualRowCount + plan.Details["results_returned"] = len(result.Rows) + } else { + // Fallback: use result rows but indicate it's not the scan count + plan.TotalRowsProcessed = int64(len(result.Rows)) + plan.Details["note"] = "actual_scan_count_unavailable" + } + } else { + // For non-aggregations, result count is meaningful + plan.TotalRowsProcessed = int64(len(result.Rows)) + } + } + } + } + + // Determine execution strategy based on query type + if hasAggregations { + // For aggregations, determine if fast path conditions are met + if stmt.Where == nil { + // Check if aggregations can use fast path + canUseFastPath := true + for _, spec := range aggregations { + if !e.canUseParquetStatsForAggregation(spec) { + canUseFastPath = false + break + } + } + + if canUseFastPath { + plan.ExecutionStrategy = "hybrid_fast_path" + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "parquet_statistics", "live_log_counting", "deduplication") + plan.DataSources = []string{"parquet_stats", "live_logs"} + } else { + plan.ExecutionStrategy = "full_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + } + } else { + plan.ExecutionStrategy = "full_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") + } + } else { + // For regular SELECT queries + if selectAll { + plan.ExecutionStrategy = "hybrid_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + } else { + plan.ExecutionStrategy = "column_projection" + plan.DataSources = []string{"live_logs", "parquet_files"} + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_projection") + } + } + + // Add WHERE clause information + if stmt.Where != nil { + // Only add predicate_pushdown if not already added + alreadyHasPredicate := false + for _, opt := range plan.OptimizationsUsed { + if opt == "predicate_pushdown" { + alreadyHasPredicate = true + break + } + } + if !alreadyHasPredicate { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") + } + plan.Details["where_clause"] = "present" + } + + // Add LIMIT information + if stmt.Limit != nil { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "limit_pushdown") + if stmt.Limit.Rowcount != nil { + if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal { + plan.Details["limit"] = string(limitExpr.Val) + } + } + } + } + + return result, err +} + // executeSelectStatement handles SELECT queries // Assumptions: // 1. Queries run against Parquet files in MQ topics @@ -2233,6 +2651,63 @@ func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]stri return partitions, err } +// getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs) +func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, error) { + // Create a hybrid scanner to access parquet statistics + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + return 0, filerClientErr + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, namespace, topicName) + if err != nil { + return 0, err + } + + // Get all partitions for this topic + relativePartitions, err := e.discoverTopicPartitions(namespace, topicName) + if err != nil { + return 0, err + } + + // Convert relative partition paths to full paths + topicBasePath := fmt.Sprintf("/topics/%s/%s", namespace, topicName) + partitions := make([]string, len(relativePartitions)) + for i, relPartition := range relativePartitions { + partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) + } + + totalRowCount := int64(0) + + // For each partition, count both parquet and live log rows + for _, partition := range partitions { + // Count parquet rows + parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) + if parquetErr == nil { + for _, stats := range parquetStats { + totalRowCount += stats.RowCount + } + } + + // Count live log rows (with deduplication) + parquetSourceFiles := make(map[string]bool) + if parquetErr == nil { + parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) + } + + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) + if liveLogErr == nil { + totalRowCount += liveLogCount + } + } + + return totalRowCount, nil +} + func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { switch spec.Function { case "COUNT":