package engine import ( "context" "encoding/binary" "encoding/json" "fmt" "math" "math/big" "regexp" "strconv" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/protobuf/proto" ) // SQL Function Name Constants const ( // Aggregation Functions FuncCOUNT = "COUNT" FuncSUM = "SUM" FuncAVG = "AVG" FuncMIN = "MIN" FuncMAX = "MAX" // String Functions FuncUPPER = "UPPER" FuncLOWER = "LOWER" FuncLENGTH = "LENGTH" FuncTRIM = "TRIM" FuncBTRIM = "BTRIM" // CockroachDB's internal name for TRIM FuncLTRIM = "LTRIM" FuncRTRIM = "RTRIM" FuncSUBSTRING = "SUBSTRING" FuncLEFT = "LEFT" FuncRIGHT = "RIGHT" FuncCONCAT = "CONCAT" // DateTime Functions FuncCURRENT_DATE = "CURRENT_DATE" FuncCURRENT_TIME = "CURRENT_TIME" FuncCURRENT_TIMESTAMP = "CURRENT_TIMESTAMP" FuncNOW = "NOW" FuncEXTRACT = "EXTRACT" FuncDATE_TRUNC = "DATE_TRUNC" // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(), MONTH(), etc. ) // PostgreSQL-compatible SQL AST types type Statement interface { isStatement() } type ShowStatement struct { Type string // "databases", "tables", "columns" Table string // for SHOW COLUMNS FROM table Schema string // for database context OnTable NameRef // for compatibility with existing code that checks OnTable } func (s *ShowStatement) isStatement() {} type UseStatement struct { Database string // database name to switch to } func (u *UseStatement) isStatement() {} type DDLStatement struct { Action string // "create", "alter", "drop" NewName NameRef TableSpec *TableSpec } type NameRef struct { Name StringGetter Qualifier StringGetter } type StringGetter interface { String() string } type stringValue string func (s stringValue) String() string { return string(s) } type TableSpec struct { Columns []ColumnDef } type ColumnDef struct { Name StringGetter Type TypeRef } type TypeRef struct { Type string } func (d *DDLStatement) isStatement() {} type SelectStatement struct { SelectExprs []SelectExpr From []TableExpr Where *WhereClause Limit *LimitClause WindowFunctions []*WindowFunction } type WhereClause struct { Expr ExprNode } type LimitClause struct { Rowcount ExprNode Offset ExprNode } func (s *SelectStatement) isStatement() {} // Window function types for time-series analytics type WindowSpec struct { PartitionBy []ExprNode OrderBy []*OrderByClause } type WindowFunction struct { Function string // ROW_NUMBER, RANK, LAG, LEAD Args []ExprNode // Function arguments Over *WindowSpec Alias string // Column alias for the result } type OrderByClause struct { Column string Order string // ASC or DESC } type SelectExpr interface { isSelectExpr() } type StarExpr struct{} func (s *StarExpr) isSelectExpr() {} type AliasedExpr struct { Expr ExprNode As AliasRef } type AliasRef interface { IsEmpty() bool String() string } type aliasValue string func (a aliasValue) IsEmpty() bool { return string(a) == "" } func (a aliasValue) String() string { return string(a) } func (a *AliasedExpr) isSelectExpr() {} type TableExpr interface { isTableExpr() } type AliasedTableExpr struct { Expr interface{} } func (a *AliasedTableExpr) isTableExpr() {} type TableName struct { Name StringGetter Qualifier StringGetter } type ExprNode interface { isExprNode() } type FuncExpr struct { Name StringGetter Exprs []SelectExpr } func (f *FuncExpr) isExprNode() {} type ColName struct { Name StringGetter } func (c *ColName) isExprNode() {} // ArithmeticExpr represents arithmetic operations like id+user_id and string concatenation like name||suffix type ArithmeticExpr struct { Left ExprNode Right ExprNode Operator string // +, -, *, /, %, || } func (a *ArithmeticExpr) isExprNode() {} type ComparisonExpr struct { Left ExprNode Right ExprNode Operator string } func (c *ComparisonExpr) isExprNode() {} type AndExpr struct { Left ExprNode Right ExprNode } func (a *AndExpr) isExprNode() {} type OrExpr struct { Left ExprNode Right ExprNode } func (o *OrExpr) isExprNode() {} type ParenExpr struct { Expr ExprNode } func (p *ParenExpr) isExprNode() {} type SQLVal struct { Type int Val []byte } func (s *SQLVal) isExprNode() {} type ValTuple []ExprNode func (v ValTuple) isExprNode() {} type IntervalExpr struct { Value string // The interval value (e.g., "1 hour", "30 minutes") Unit string // The unit (parsed from value) } func (i *IntervalExpr) isExprNode() {} type BetweenExpr struct { Left ExprNode // The expression to test From ExprNode // Lower bound (inclusive) To ExprNode // Upper bound (inclusive) Not bool // true for NOT BETWEEN } func (b *BetweenExpr) isExprNode() {} type IsNullExpr struct { Expr ExprNode // The expression to test for null } func (i *IsNullExpr) isExprNode() {} type IsNotNullExpr struct { Expr ExprNode // The expression to test for not null } func (i *IsNotNullExpr) isExprNode() {} // SQLVal types const ( IntVal = iota StrVal FloatVal ) // Operator constants const ( CreateStr = "create" AlterStr = "alter" DropStr = "drop" EqualStr = "=" LessThanStr = "<" GreaterThanStr = ">" LessEqualStr = "<=" GreaterEqualStr = ">=" NotEqualStr = "!=" ) // parseIdentifier properly parses a potentially quoted identifier (database/table name) func parseIdentifier(identifier string) string { identifier = strings.TrimSpace(identifier) identifier = strings.TrimSuffix(identifier, ";") // Remove trailing semicolon // Handle double quotes (PostgreSQL standard) if len(identifier) >= 2 && identifier[0] == '"' && identifier[len(identifier)-1] == '"' { return identifier[1 : len(identifier)-1] } // Handle backticks (MySQL compatibility) if len(identifier) >= 2 && identifier[0] == '`' && identifier[len(identifier)-1] == '`' { return identifier[1 : len(identifier)-1] } return identifier } // ParseSQL parses PostgreSQL-compatible SQL statements using CockroachDB parser for SELECT queries func ParseSQL(sql string) (Statement, error) { sql = strings.TrimSpace(sql) sqlUpper := strings.ToUpper(sql) // Handle USE statement if strings.HasPrefix(sqlUpper, "USE ") { parts := strings.Fields(sql) if len(parts) < 2 { return nil, fmt.Errorf("USE statement requires a database name") } // Parse the database name properly, handling quoted identifiers dbName := parseIdentifier(strings.Join(parts[1:], " ")) return &UseStatement{Database: dbName}, nil } // Handle DESCRIBE/DESC statements as aliases for SHOW COLUMNS FROM if strings.HasPrefix(sqlUpper, "DESCRIBE ") || strings.HasPrefix(sqlUpper, "DESC ") { parts := strings.Fields(sql) if len(parts) < 2 { return nil, fmt.Errorf("DESCRIBE/DESC statement requires a table name") } var tableName string var database string // Get the raw table name (before parsing identifiers) var rawTableName string if len(parts) >= 3 && strings.ToUpper(parts[1]) == "TABLE" { rawTableName = parts[2] } else { rawTableName = parts[1] } // Parse database.table format first, then apply parseIdentifier to each part if strings.Contains(rawTableName, ".") { // Handle quoted database.table like "db"."table" if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") { // Find the closing quote and the dot var quoteChar byte = '"' if rawTableName[0] == '`' { quoteChar = '`' } // Find the matching closing quote closingIndex := -1 for i := 1; i < len(rawTableName); i++ { if rawTableName[i] == quoteChar { closingIndex = i break } } if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' { // Valid quoted database name database = parseIdentifier(rawTableName[:closingIndex+1]) tableName = parseIdentifier(rawTableName[closingIndex+2:]) } else { // Fall back to simple split then parse dbTableParts := strings.SplitN(rawTableName, ".", 2) database = parseIdentifier(dbTableParts[0]) tableName = parseIdentifier(dbTableParts[1]) } } else { // Simple case: no quotes, just split then parse dbTableParts := strings.SplitN(rawTableName, ".", 2) database = parseIdentifier(dbTableParts[0]) tableName = parseIdentifier(dbTableParts[1]) } } else { // No database.table format, just parse the table name tableName = parseIdentifier(rawTableName) } stmt := &ShowStatement{Type: "columns"} stmt.OnTable.Name = stringValue(tableName) if database != "" { stmt.OnTable.Qualifier = stringValue(database) } return stmt, nil } // Handle SHOW statements (keep custom parsing for these simple cases) if strings.HasPrefix(sqlUpper, "SHOW DATABASES") || strings.HasPrefix(sqlUpper, "SHOW SCHEMAS") { return &ShowStatement{Type: "databases"}, nil } if strings.HasPrefix(sqlUpper, "SHOW TABLES") { stmt := &ShowStatement{Type: "tables"} // Handle "SHOW TABLES FROM database" syntax if strings.Contains(sqlUpper, "FROM") { partsUpper := strings.Fields(sqlUpper) partsOriginal := strings.Fields(sql) // Use original casing for i, part := range partsUpper { if part == "FROM" && i+1 < len(partsOriginal) { // Parse the database name properly dbName := parseIdentifier(partsOriginal[i+1]) stmt.Schema = dbName // Set the Schema field for the test stmt.OnTable.Name = stringValue(dbName) // Keep for compatibility break } } } return stmt, nil } if strings.HasPrefix(sqlUpper, "SHOW COLUMNS FROM") { // Parse "SHOW COLUMNS FROM table" or "SHOW COLUMNS FROM database.table" parts := strings.Fields(sql) if len(parts) < 4 { return nil, fmt.Errorf("SHOW COLUMNS FROM statement requires a table name") } // Get the raw table name (before parsing identifiers) rawTableName := parts[3] var tableName string var database string // Parse database.table format first, then apply parseIdentifier to each part if strings.Contains(rawTableName, ".") { // Handle quoted database.table like "db"."table" if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") { // Find the closing quote and the dot var quoteChar byte = '"' if rawTableName[0] == '`' { quoteChar = '`' } // Find the matching closing quote closingIndex := -1 for i := 1; i < len(rawTableName); i++ { if rawTableName[i] == quoteChar { closingIndex = i break } } if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' { // Valid quoted database name database = parseIdentifier(rawTableName[:closingIndex+1]) tableName = parseIdentifier(rawTableName[closingIndex+2:]) } else { // Fall back to simple split then parse dbTableParts := strings.SplitN(rawTableName, ".", 2) database = parseIdentifier(dbTableParts[0]) tableName = parseIdentifier(dbTableParts[1]) } } else { // Simple case: no quotes, just split then parse dbTableParts := strings.SplitN(rawTableName, ".", 2) database = parseIdentifier(dbTableParts[0]) tableName = parseIdentifier(dbTableParts[1]) } } else { // No database.table format, just parse the table name tableName = parseIdentifier(rawTableName) } stmt := &ShowStatement{Type: "columns"} stmt.OnTable.Name = stringValue(tableName) if database != "" { stmt.OnTable.Qualifier = stringValue(database) } return stmt, nil } // Use CockroachDB parser for SELECT statements if strings.HasPrefix(sqlUpper, "SELECT") { parser := NewCockroachSQLParser() return parser.ParseSQL(sql) } return nil, UnsupportedFeatureError{ Feature: fmt.Sprintf("statement type: %s", strings.Fields(sqlUpper)[0]), Reason: "statement parsing not implemented", } } // extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser func extractFunctionArguments(expr string) ([]SelectExpr, error) { // Find the parentheses startParen := strings.Index(expr, "(") endParen := strings.LastIndex(expr, ")") if startParen == -1 || endParen == -1 || endParen <= startParen { return nil, fmt.Errorf("invalid function syntax") } // Extract arguments string argsStr := strings.TrimSpace(expr[startParen+1 : endParen]) // Handle empty arguments if argsStr == "" { return []SelectExpr{}, nil } // Handle single * argument (for COUNT(*)) if argsStr == "*" { return []SelectExpr{&StarExpr{}}, nil } // Parse multiple arguments separated by commas args := []SelectExpr{} argParts := strings.Split(argsStr, ",") // Use CockroachDB parser to parse each argument as a SELECT expression cockroachParser := NewCockroachSQLParser() for _, argPart := range argParts { argPart = strings.TrimSpace(argPart) if argPart == "*" { args = append(args, &StarExpr{}) } else { // Create a dummy SELECT statement to parse the argument expression dummySelect := fmt.Sprintf("SELECT %s", argPart) // Parse using CockroachDB parser stmt, err := cockroachParser.ParseSQL(dummySelect) if err != nil { // If CockroachDB parser fails, fall back to simple column name args = append(args, &AliasedExpr{ Expr: &ColName{Name: stringValue(argPart)}, }) continue } // Extract the expression from the parsed SELECT statement if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 { args = append(args, selectStmt.SelectExprs[0]) } else { // Fallback to column name if parsing fails args = append(args, &AliasedExpr{ Expr: &ColName{Name: stringValue(argPart)}, }) } } } return args, nil } // debugModeKey is used to store debug mode flag in context type debugModeKey struct{} // isDebugMode checks if we're in debug/explain mode func isDebugMode(ctx context.Context) bool { debug, ok := ctx.Value(debugModeKey{}).(bool) return ok && debug } // withDebugMode returns a context with debug mode enabled func withDebugMode(ctx context.Context) context.Context { return context.WithValue(ctx, debugModeKey{}, true) } // LogBufferStart tracks the starting buffer index for a file // Buffer indexes are monotonically increasing, count = len(chunks) type LogBufferStart struct { StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) } // SQLEngine provides SQL query execution capabilities for SeaweedFS // Assumptions: // 1. MQ namespaces map directly to SQL databases // 2. MQ topics map directly to SQL tables // 3. Schema evolution is handled transparently with backward compatibility // 4. Queries run against Parquet-stored MQ messages type SQLEngine struct { catalog *SchemaCatalog } // NewSQLEngine creates a new SQL execution engine // Uses master address for service discovery and initialization func NewSQLEngine(masterAddress string) *SQLEngine { // Initialize global HTTP client if not already done // This is needed for reading partition data from the filer if util_http.GetGlobalHttpClient() == nil { util_http.InitGlobalHttpClient() } return &SQLEngine{ catalog: NewSchemaCatalog(masterAddress), } } // NewSQLEngineWithCatalog creates a new SQL execution engine with a custom catalog // Used for testing or when you want to provide a pre-configured catalog func NewSQLEngineWithCatalog(catalog *SchemaCatalog) *SQLEngine { // Initialize global HTTP client if not already done // This is needed for reading partition data from the filer if util_http.GetGlobalHttpClient() == nil { util_http.InitGlobalHttpClient() } return &SQLEngine{ catalog: catalog, } } // GetCatalog returns the schema catalog for external access func (e *SQLEngine) GetCatalog() *SchemaCatalog { return e.catalog } // ExecuteSQL parses and executes a SQL statement // Assumptions: // 1. All SQL statements are PostgreSQL-compatible via pg_query_go // 2. DDL operations (CREATE/ALTER/DROP) modify underlying MQ topics // 3. DML operations (SELECT) query Parquet files directly // 4. Error handling follows PostgreSQL conventions func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { startTime := time.Now() // Handle EXPLAIN as a special case sqlTrimmed := strings.TrimSpace(sql) sqlUpper := strings.ToUpper(sqlTrimmed) if strings.HasPrefix(sqlUpper, "EXPLAIN") { // Extract the actual query after EXPLAIN actualSQL := strings.TrimSpace(sqlTrimmed[7:]) // Remove "EXPLAIN" return e.executeExplain(ctx, actualSQL, startTime) } // Parse the SQL statement using PostgreSQL parser stmt, err := ParseSQL(sql) if err != nil { return &QueryResult{ Error: fmt.Errorf("SQL parse error: %v", err), }, err } // Route to appropriate handler based on statement type switch stmt := stmt.(type) { case *ShowStatement: return e.executeShowStatementWithDescribe(ctx, stmt) case *UseStatement: return e.executeUseStatement(ctx, stmt) case *DDLStatement: return e.executeDDLStatement(ctx, stmt) case *SelectStatement: return e.executeSelectStatement(ctx, stmt) default: err := fmt.Errorf("unsupported SQL statement type: %T", stmt) return &QueryResult{Error: err}, err } } // executeExplain handles EXPLAIN statements by executing the query with plan tracking func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) { // Enable debug mode for EXPLAIN queries ctx = withDebugMode(ctx) // Parse the actual SQL statement using PostgreSQL parser stmt, err := ParseSQL(actualSQL) if err != nil { return &QueryResult{ Error: fmt.Errorf("SQL parse error in EXPLAIN query: %v", err), }, err } // Create execution plan plan := &QueryExecutionPlan{ QueryType: strings.ToUpper(strings.Fields(actualSQL)[0]), DataSources: []string{}, OptimizationsUsed: []string{}, Details: make(map[string]interface{}), } var result *QueryResult // Route to appropriate handler based on statement type (with plan tracking) switch stmt := stmt.(type) { case *SelectStatement: result, err = e.executeSelectStatementWithPlan(ctx, stmt, plan) if err != nil { plan.Details["error"] = err.Error() } case *ShowStatement: plan.QueryType = "SHOW" plan.ExecutionStrategy = "metadata_only" result, err = e.executeShowStatementWithDescribe(ctx, stmt) default: err := fmt.Errorf("EXPLAIN not supported for statement type: %T", stmt) return &QueryResult{Error: err}, err } // Calculate execution time plan.ExecutionTimeMs = float64(time.Since(startTime).Nanoseconds()) / 1e6 // Format execution plan as result return e.formatExecutionPlan(plan, result, err) } // formatExecutionPlan converts execution plan to a hierarchical tree format for display func (e *SQLEngine) formatExecutionPlan(plan *QueryExecutionPlan, originalResult *QueryResult, originalErr error) (*QueryResult, error) { columns := []string{"Query Execution Plan"} rows := [][]sqltypes.Value{} // Build hierarchical plan display planLines := e.buildHierarchicalPlan(plan, originalErr) for _, line := range planLines { rows = append(rows, []sqltypes.Value{ sqltypes.NewVarChar(line), }) } if originalErr != nil { return &QueryResult{ Columns: columns, Rows: rows, ExecutionPlan: plan, Error: originalErr, }, originalErr } return &QueryResult{ Columns: columns, Rows: rows, ExecutionPlan: plan, }, nil } // buildHierarchicalPlan creates a tree-like structure for the execution plan func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) []string { var lines []string // Root node - Query type and strategy lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy)) // Aggregations section (if present) if len(plan.Aggregations) > 0 { lines = append(lines, "├── Aggregations") for i, agg := range plan.Aggregations { if i == len(plan.Aggregations)-1 { lines = append(lines, fmt.Sprintf("│ └── %s", agg)) } else { lines = append(lines, fmt.Sprintf("│ ├── %s", agg)) } } } // Data Sources section if len(plan.DataSources) > 0 { hasMore := len(plan.OptimizationsUsed) > 0 || plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil if hasMore { lines = append(lines, "├── Data Sources") } else { lines = append(lines, "└── Data Sources") } for i, source := range plan.DataSources { prefix := "│ " if !hasMore && i == len(plan.DataSources)-1 { prefix = " " } if i == len(plan.DataSources)-1 { lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatDataSource(source))) } else { lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatDataSource(source))) } } } // Optimizations section if len(plan.OptimizationsUsed) > 0 { hasMore := plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil if hasMore { lines = append(lines, "├── Optimizations") } else { lines = append(lines, "└── Optimizations") } for i, opt := range plan.OptimizationsUsed { prefix := "│ " if !hasMore && i == len(plan.OptimizationsUsed)-1 { prefix = " " } if i == len(plan.OptimizationsUsed)-1 { lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatOptimization(opt))) } else { lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatOptimization(opt))) } } } // Check for data sources tree availability partitionPaths, hasPartitions := plan.Details["partition_paths"].([]string) parquetFiles, _ := plan.Details["parquet_files"].([]string) liveLogFiles, _ := plan.Details["live_log_files"].([]string) // Statistics section statisticsPresent := plan.PartitionsScanned > 0 || plan.ParquetFilesScanned > 0 || plan.LiveLogFilesScanned > 0 || plan.TotalRowsProcessed > 0 if statisticsPresent { // Check if there are sections after Statistics (Data Sources Tree, Details, Performance) hasDataSourcesTree := hasPartitions && len(partitionPaths) > 0 hasMoreAfterStats := hasDataSourcesTree || len(plan.Details) > 0 || err != nil || true // Performance is always present if hasMoreAfterStats { lines = append(lines, "├── Statistics") } else { lines = append(lines, "└── Statistics") } stats := []string{} if plan.PartitionsScanned > 0 { stats = append(stats, fmt.Sprintf("Partitions Scanned: %d", plan.PartitionsScanned)) } if plan.ParquetFilesScanned > 0 { stats = append(stats, fmt.Sprintf("Parquet Files: %d", plan.ParquetFilesScanned)) } if plan.LiveLogFilesScanned > 0 { stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned)) } // Always show row statistics for aggregations, even if 0 (to show fast path efficiency) if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults { stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed)) stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned)) // Add fast path explanation when no rows were scanned if plan.TotalRowsProcessed == 0 { // Use the actual scan method from Details instead of hardcoding if scanMethod, exists := plan.Details["scan_method"].(string); exists { stats = append(stats, fmt.Sprintf("Scan Method: %s", scanMethod)) } else { stats = append(stats, "Scan Method: Metadata Only") } } } else if plan.TotalRowsProcessed > 0 { stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) } // Broker buffer information if plan.BrokerBufferQueried { stats = append(stats, fmt.Sprintf("Broker Buffer Queried: Yes (%d messages)", plan.BrokerBufferMessages)) if plan.BufferStartIndex > 0 { stats = append(stats, fmt.Sprintf("Buffer Start Index: %d (deduplication enabled)", plan.BufferStartIndex)) } } for i, stat := range stats { if hasMoreAfterStats { // More sections after Statistics, so use │ prefix if i == len(stats)-1 { lines = append(lines, fmt.Sprintf("│ └── %s", stat)) } else { lines = append(lines, fmt.Sprintf("│ ├── %s", stat)) } } else { // This is the last main section, so use space prefix for final item if i == len(stats)-1 { lines = append(lines, fmt.Sprintf(" └── %s", stat)) } else { lines = append(lines, fmt.Sprintf(" ├── %s", stat)) } } } } // Data Sources Tree section (if file paths are available) if hasPartitions && len(partitionPaths) > 0 { // Check if there are more sections after this hasMore := len(plan.Details) > 0 || err != nil if hasMore { lines = append(lines, "├── Data Sources Tree") } else { lines = append(lines, "├── Data Sources Tree") // Performance always comes after } // Build a tree structure for each partition for i, partition := range partitionPaths { isLastPartition := i == len(partitionPaths)-1 // Show partition directory partitionPrefix := "├── " if isLastPartition { partitionPrefix = "└── " } lines = append(lines, fmt.Sprintf("│ %s%s/", partitionPrefix, partition)) // Show parquet files in this partition partitionParquetFiles := make([]string, 0) for _, file := range parquetFiles { if strings.HasPrefix(file, partition+"/") { fileName := file[len(partition)+1:] partitionParquetFiles = append(partitionParquetFiles, fileName) } } // Show live log files in this partition partitionLiveLogFiles := make([]string, 0) for _, file := range liveLogFiles { if strings.HasPrefix(file, partition+"/") { fileName := file[len(partition)+1:] partitionLiveLogFiles = append(partitionLiveLogFiles, fileName) } } // Display files with proper tree formatting totalFiles := len(partitionParquetFiles) + len(partitionLiveLogFiles) fileIndex := 0 // Display parquet files for _, fileName := range partitionParquetFiles { fileIndex++ isLastFile := fileIndex == totalFiles && isLastPartition var filePrefix string if isLastPartition { if isLastFile { filePrefix = " └── " } else { filePrefix = " ├── " } } else { if isLastFile { filePrefix = "│ └── " } else { filePrefix = "│ ├── " } } lines = append(lines, fmt.Sprintf("│ %s%s (parquet)", filePrefix, fileName)) } // Display live log files for _, fileName := range partitionLiveLogFiles { fileIndex++ isLastFile := fileIndex == totalFiles && isLastPartition var filePrefix string if isLastPartition { if isLastFile { filePrefix = " └── " } else { filePrefix = " ├── " } } else { if isLastFile { filePrefix = "│ └── " } else { filePrefix = "│ ├── " } } lines = append(lines, fmt.Sprintf("│ %s%s (live log)", filePrefix, fileName)) } } } // Details section // Filter out details that are shown elsewhere filteredDetails := make([]string, 0) for key, value := range plan.Details { // Skip keys that are already formatted and displayed in the Statistics section if key != "results_returned" && key != "partition_paths" && key != "parquet_files" && key != "live_log_files" { filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value)) } } if len(filteredDetails) > 0 { // Performance is always present, so check if there are errors after Details hasMore := err != nil if hasMore { lines = append(lines, "├── Details") } else { lines = append(lines, "├── Details") // Performance always comes after } for i, detail := range filteredDetails { if i == len(filteredDetails)-1 { lines = append(lines, fmt.Sprintf("│ └── %s", detail)) } else { lines = append(lines, fmt.Sprintf("│ ├── %s", detail)) } } } // Performance section (always present) if err != nil { lines = append(lines, "├── Performance") lines = append(lines, fmt.Sprintf("│ └── Execution Time: %.3fms", plan.ExecutionTimeMs)) lines = append(lines, "└── Error") lines = append(lines, fmt.Sprintf(" └── %s", err.Error())) } else { lines = append(lines, "└── Performance") lines = append(lines, fmt.Sprintf(" └── Execution Time: %.3fms", plan.ExecutionTimeMs)) } return lines } // formatDataSource provides user-friendly names for data sources func (e *SQLEngine) formatDataSource(source string) string { switch source { case "parquet_stats": return "Parquet Statistics (fast path)" case "parquet_files": return "Parquet Files (full scan)" case "live_logs": return "Live Log Files" case "broker_buffer": return "Broker Buffer (real-time)" default: return source } } // formatOptimization provides user-friendly names for optimizations func (e *SQLEngine) formatOptimization(opt string) string { switch opt { case "parquet_statistics": return "Parquet Statistics Usage" case "live_log_counting": return "Live Log Row Counting" case "deduplication": return "Duplicate Data Avoidance" case "predicate_pushdown": return "WHERE Clause Pushdown" case "column_projection": return "Column Selection" case "limit_pushdown": return "LIMIT Optimization" default: return opt } } // executeUseStatement handles USE database statements to switch current database context func (e *SQLEngine) executeUseStatement(ctx context.Context, stmt *UseStatement) (*QueryResult, error) { // Validate database name if stmt.Database == "" { err := fmt.Errorf("database name cannot be empty") return &QueryResult{Error: err}, err } // Set the current database in the catalog e.catalog.SetCurrentDatabase(stmt.Database) // Return success message result := &QueryResult{ Columns: []string{"message"}, Rows: [][]sqltypes.Value{ {sqltypes.MakeString([]byte(fmt.Sprintf("Database changed to: %s", stmt.Database)))}, }, Error: nil, } return result, nil } // executeDDLStatement handles CREATE operations only // Note: ALTER TABLE and DROP TABLE are not supported to protect topic data func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) { switch stmt.Action { case CreateStr: return e.createTable(ctx, stmt) case AlterStr: err := fmt.Errorf("ALTER TABLE is not supported") return &QueryResult{Error: err}, err case DropStr: err := fmt.Errorf("DROP TABLE is not supported") return &QueryResult{Error: err}, err default: err := fmt.Errorf("unsupported DDL action: %s", stmt.Action) return &QueryResult{Error: err}, err } } // executeSelectStatementWithPlan handles SELECT queries with execution plan tracking func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { // Parse aggregations to populate plan var aggregations []AggregationSpec hasAggregations := false selectAll := false for _, selectExpr := range stmt.SelectExprs { switch expr := selectExpr.(type) { case *StarExpr: selectAll = true case *AliasedExpr: switch col := expr.Expr.(type) { case *FuncExpr: // This is an aggregation function aggSpec, err := e.parseAggregationFunction(col, expr) if err != nil { return &QueryResult{Error: err}, err } if aggSpec != nil { aggregations = append(aggregations, *aggSpec) hasAggregations = true plan.Aggregations = append(plan.Aggregations, aggSpec.Function+"("+aggSpec.Column+")") } } } } // Execute the query (handle aggregations specially for plan tracking) var result *QueryResult var err error if hasAggregations { // Extract table information for aggregation execution var database, tableName string if len(stmt.From) == 1 { if table, ok := stmt.From[0].(*AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(TableName); ok { tableName = tableExpr.Name.String() if tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } } } } // Use current database if not specified if database == "" { database = e.catalog.currentDatabase if database == "" { database = "default" } } // Create hybrid scanner for aggregation execution var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { filerClient, err = e.catalog.brokerClient.GetFilerClient() if err != nil { return &QueryResult{Error: err}, err } } hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) if err != nil { return &QueryResult{Error: err}, err } // Execute aggregation query with plan tracking result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan) } else { // Regular SELECT query with plan tracking result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan) } if err == nil && result != nil { // Extract table name for use in execution strategy determination var tableName string if len(stmt.From) == 1 { if table, ok := stmt.From[0].(*AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(TableName); ok { tableName = tableExpr.Name.String() } } } // Try to get topic information for partition count and row processing stats if tableName != "" { // Try to discover partitions for statistics if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil { plan.PartitionsScanned = len(partitions) } // For aggregations, determine actual processing based on execution strategy if hasAggregations { plan.Details["results_returned"] = len(result.Rows) // Determine actual work done based on execution strategy if stmt.Where == nil { // Use the same logic as actual execution to determine if fast path was used var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { filerClient, _ = e.catalog.brokerClient.GetFilerClient() } hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e) var canUseFastPath bool if scannerErr == nil { // Test if fast path can be used (same as actual execution) _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) canUseFastPath = canOptimize } else { // Fallback to simple check canUseFastPath = true for _, spec := range aggregations { if !e.canUseParquetStatsForAggregation(spec) { canUseFastPath = false break } } } if canUseFastPath { // Fast path: minimal scanning (only live logs that weren't converted) if actualScanCount, countErr := e.getActualRowsScannedForFastPath(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualScanCount } else { plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning } } else { // Full scan: count all rows if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualRowCount } else { plan.TotalRowsProcessed = int64(len(result.Rows)) plan.Details["note"] = "scan_count_unavailable" } } } else { // With WHERE clause: full scan required if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualRowCount } else { plan.TotalRowsProcessed = int64(len(result.Rows)) plan.Details["note"] = "scan_count_unavailable" } } } else { // For non-aggregations, result count is meaningful plan.TotalRowsProcessed = int64(len(result.Rows)) } } // Determine execution strategy based on query type (reuse fast path detection from above) if hasAggregations { // Skip execution strategy determination if plan was already populated by aggregation execution // This prevents overwriting the correctly built plan from BuildAggregationPlan if plan.ExecutionStrategy == "" { // For aggregations, determine if fast path conditions are met if stmt.Where == nil { // Reuse the same logic used above for row counting var canUseFastPath bool if tableName != "" { var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { filerClient, _ = e.catalog.brokerClient.GetFilerClient() } if filerClient != nil { hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e) if scannerErr == nil { // Test if fast path can be used (same as actual execution) _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) canUseFastPath = canOptimize } else { canUseFastPath = false } } else { // Fallback check canUseFastPath = true for _, spec := range aggregations { if !e.canUseParquetStatsForAggregation(spec) { canUseFastPath = false break } } } } else { canUseFastPath = false } if canUseFastPath { plan.ExecutionStrategy = "hybrid_fast_path" plan.OptimizationsUsed = append(plan.OptimizationsUsed, "parquet_statistics", "live_log_counting", "deduplication") plan.DataSources = []string{"parquet_stats", "live_logs"} } else { plan.ExecutionStrategy = "full_scan" plan.DataSources = []string{"live_logs", "parquet_files"} } } else { plan.ExecutionStrategy = "full_scan" plan.DataSources = []string{"live_logs", "parquet_files"} plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") } } } else { // For regular SELECT queries if selectAll { plan.ExecutionStrategy = "hybrid_scan" plan.DataSources = []string{"live_logs", "parquet_files"} } else { plan.ExecutionStrategy = "column_projection" plan.DataSources = []string{"live_logs", "parquet_files"} plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_projection") } } // Add WHERE clause information if stmt.Where != nil { // Only add predicate_pushdown if not already added alreadyHasPredicate := false for _, opt := range plan.OptimizationsUsed { if opt == "predicate_pushdown" { alreadyHasPredicate = true break } } if !alreadyHasPredicate { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") } plan.Details["where_clause"] = "present" } // Add LIMIT information if stmt.Limit != nil { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "limit_pushdown") if stmt.Limit.Rowcount != nil { if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal { plan.Details["limit"] = string(limitExpr.Val) } } } } return result, err } // executeSelectStatement handles SELECT queries // Assumptions: // 1. Queries run against Parquet files in MQ topics // 2. Predicate pushdown is used for efficiency // 3. Cross-topic joins are supported via partition-aware execution func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStatement) (*QueryResult, error) { // Parse FROM clause to get table (topic) information if len(stmt.From) != 1 { err := fmt.Errorf("SELECT supports single table queries only") return &QueryResult{Error: err}, err } // Extract table reference var database, tableName string switch table := stmt.From[0].(type) { case *AliasedTableExpr: switch tableExpr := table.Expr.(type) { case TableName: tableName = tableExpr.Name.String() if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } default: err := fmt.Errorf("unsupported table expression: %T", tableExpr) return &QueryResult{Error: err}, err } default: err := fmt.Errorf("unsupported FROM clause: %T", table) return &QueryResult{Error: err}, err } // Use current database context if not specified if database == "" { database = e.catalog.GetCurrentDatabase() if database == "" { database = "default" } } // Auto-discover and register topic if not already in catalog if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { // Topic not in catalog, try to discover and register it if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { // Return error immediately for non-existent topics instead of falling back to sample data return &QueryResult{Error: regErr}, regErr } } // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) // Get filerClient from broker connection (works with both real and mock brokers) var filerClient filer_pb.FilerClient var filerClientErr error filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() if filerClientErr != nil { // Return error if filer client is not available for topic access return &QueryResult{Error: filerClientErr}, filerClientErr } hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) if err != nil { // Handle quiet topics gracefully: topics exist but have no active schema/brokers if IsNoSchemaError(err) { // Return empty result for quiet topics (normal in production environments) return &QueryResult{ Columns: []string{}, Rows: [][]sqltypes.Value{}, Database: database, Table: tableName, }, nil } // Return error for other access issues (truly non-existent topics, etc.) topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) return &QueryResult{Error: topicErr}, topicErr } // Parse SELECT columns and detect aggregation functions var columns []string var aggregations []AggregationSpec selectAll := false hasAggregations := false _ = hasAggregations // Used later in aggregation routing // Track required base columns for arithmetic expressions baseColumnsSet := make(map[string]bool) for _, selectExpr := range stmt.SelectExprs { switch expr := selectExpr.(type) { case *StarExpr: selectAll = true case *AliasedExpr: switch col := expr.Expr.(type) { case *ColName: colName := col.Name.String() // Check if this "column" is actually an arithmetic expression with functions if arithmeticExpr := e.parseColumnLevelCalculation(colName); arithmeticExpr != nil { columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) e.extractBaseColumns(arithmeticExpr, baseColumnsSet) } else { columns = append(columns, colName) baseColumnsSet[colName] = true } case *ArithmeticExpr: // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix columns = append(columns, e.getArithmeticExpressionAlias(col)) // Extract base columns needed for this arithmetic expression e.extractBaseColumns(col, baseColumnsSet) case *SQLVal: // Handle string/numeric literals like 'good', 123, etc. columns = append(columns, e.getSQLValAlias(col)) case *FuncExpr: // Distinguish between aggregation functions and string functions funcName := strings.ToUpper(col.Name.String()) if e.isAggregationFunction(funcName) { // Handle aggregation functions aggSpec, err := e.parseAggregationFunction(col, expr) if err != nil { return &QueryResult{Error: err}, err } aggregations = append(aggregations, *aggSpec) hasAggregations = true } else if e.isStringFunction(funcName) { // Handle string functions like UPPER, LENGTH, etc. columns = append(columns, e.getStringFunctionAlias(col)) // Extract base columns needed for this string function e.extractBaseColumnsFromFunction(col, baseColumnsSet) } else if e.isDateTimeFunction(funcName) { // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC columns = append(columns, e.getDateTimeFunctionAlias(col)) // Extract base columns needed for this datetime function e.extractBaseColumnsFromFunction(col, baseColumnsSet) } else { return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) } default: err := fmt.Errorf("unsupported SELECT expression: %T", col) return &QueryResult{Error: err}, err } default: err := fmt.Errorf("unsupported SELECT expression: %T", expr) return &QueryResult{Error: err}, err } } // If we have aggregations, use aggregation query path if hasAggregations { return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) } // Parse WHERE clause for predicate pushdown var predicate func(*schema_pb.RecordValue) bool if stmt.Where != nil { predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) if err != nil { return &QueryResult{Error: err}, err } } // Parse LIMIT and OFFSET clauses // Use -1 to distinguish "no LIMIT" from "LIMIT 0" limit := -1 offset := 0 if stmt.Limit != nil && stmt.Limit.Rowcount != nil { switch limitExpr := stmt.Limit.Rowcount.(type) { case *SQLVal: if limitExpr.Type == IntVal { var parseErr error limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) if parseErr != nil { return &QueryResult{Error: parseErr}, parseErr } if limit64 > math.MaxInt32 || limit64 < 0 { return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) } limit = int(limit64) } } } // Parse OFFSET clause if present if stmt.Limit != nil && stmt.Limit.Offset != nil { switch offsetExpr := stmt.Limit.Offset.(type) { case *SQLVal: if offsetExpr.Type == IntVal { var parseErr error offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) if parseErr != nil { return &QueryResult{Error: parseErr}, parseErr } if offset64 > math.MaxInt32 || offset64 < 0 { return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) } offset = int(offset64) } } } // Build hybrid scan options // Extract time filters from WHERE clause to optimize scanning startTimeNs, stopTimeNs := int64(0), int64(0) if stmt.Where != nil { startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) } hybridScanOptions := HybridScanOptions{ StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons Limit: limit, Offset: offset, Predicate: predicate, } if !selectAll { // Convert baseColumnsSet to slice for hybrid scan options baseColumns := make([]string, 0, len(baseColumnsSet)) for columnName := range baseColumnsSet { baseColumns = append(baseColumns, columnName) } // Use base columns (not expression aliases) for data retrieval if len(baseColumns) > 0 { hybridScanOptions.Columns = baseColumns } else { // If no base columns found (shouldn't happen), use original columns hybridScanOptions.Columns = columns } } // Execute the hybrid scan (live logs + Parquet files) results, err := hybridScanner.Scan(ctx, hybridScanOptions) if err != nil { return &QueryResult{Error: err}, err } // Convert to SQL result format if selectAll { if len(columns) > 0 { // SELECT *, specific_columns - include both auto-discovered and explicit columns return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil } else { // SELECT * only - let converter determine all columns (excludes system columns) columns = nil return hybridScanner.ConvertToSQLResult(results, columns), nil } } // Handle custom column expressions (including arithmetic) return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil } // executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture // This is used by EXPLAIN queries to capture complete data source information including broker memory func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { // Parse FROM clause to get table (topic) information if len(stmt.From) != 1 { err := fmt.Errorf("SELECT supports single table queries only") return &QueryResult{Error: err}, err } // Extract table reference var database, tableName string switch table := stmt.From[0].(type) { case *AliasedTableExpr: switch tableExpr := table.Expr.(type) { case TableName: tableName = tableExpr.Name.String() if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } default: err := fmt.Errorf("unsupported table expression: %T", tableExpr) return &QueryResult{Error: err}, err } default: err := fmt.Errorf("unsupported FROM clause: %T", table) return &QueryResult{Error: err}, err } // Use current database context if not specified if database == "" { database = e.catalog.GetCurrentDatabase() if database == "" { database = "default" } } // Auto-discover and register topic if not already in catalog if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { // Topic not in catalog, try to discover and register it if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { // Return error immediately for non-existent topics instead of falling back to sample data return &QueryResult{Error: regErr}, regErr } } // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) // Get filerClient from broker connection (works with both real and mock brokers) var filerClient filer_pb.FilerClient var filerClientErr error filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() if filerClientErr != nil { // Return error if filer client is not available for topic access return &QueryResult{Error: filerClientErr}, filerClientErr } hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) if err != nil { // Handle quiet topics gracefully: topics exist but have no active schema/brokers if IsNoSchemaError(err) { // Return empty result for quiet topics (normal in production environments) return &QueryResult{ Columns: []string{}, Rows: [][]sqltypes.Value{}, Database: database, Table: tableName, }, nil } // Return error for other access issues (truly non-existent topics, etc.) topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) return &QueryResult{Error: topicErr}, topicErr } // Parse SELECT columns and detect aggregation functions var columns []string var aggregations []AggregationSpec selectAll := false hasAggregations := false _ = hasAggregations // Used later in aggregation routing // Track required base columns for arithmetic expressions baseColumnsSet := make(map[string]bool) for _, selectExpr := range stmt.SelectExprs { switch expr := selectExpr.(type) { case *StarExpr: selectAll = true case *AliasedExpr: switch col := expr.Expr.(type) { case *ColName: colName := col.Name.String() columns = append(columns, colName) baseColumnsSet[colName] = true case *ArithmeticExpr: // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix columns = append(columns, e.getArithmeticExpressionAlias(col)) // Extract base columns needed for this arithmetic expression e.extractBaseColumns(col, baseColumnsSet) case *SQLVal: // Handle string/numeric literals like 'good', 123, etc. columns = append(columns, e.getSQLValAlias(col)) case *FuncExpr: // Distinguish between aggregation functions and string functions funcName := strings.ToUpper(col.Name.String()) if e.isAggregationFunction(funcName) { // Handle aggregation functions aggSpec, err := e.parseAggregationFunction(col, expr) if err != nil { return &QueryResult{Error: err}, err } aggregations = append(aggregations, *aggSpec) hasAggregations = true } else if e.isStringFunction(funcName) { // Handle string functions like UPPER, LENGTH, etc. columns = append(columns, e.getStringFunctionAlias(col)) // Extract base columns needed for this string function e.extractBaseColumnsFromFunction(col, baseColumnsSet) } else if e.isDateTimeFunction(funcName) { // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC columns = append(columns, e.getDateTimeFunctionAlias(col)) // Extract base columns needed for this datetime function e.extractBaseColumnsFromFunction(col, baseColumnsSet) } else { return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) } default: err := fmt.Errorf("unsupported SELECT expression: %T", col) return &QueryResult{Error: err}, err } default: err := fmt.Errorf("unsupported SELECT expression: %T", expr) return &QueryResult{Error: err}, err } } // If we have aggregations, use aggregation query path if hasAggregations { return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) } // Parse WHERE clause for predicate pushdown var predicate func(*schema_pb.RecordValue) bool if stmt.Where != nil { predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) if err != nil { return &QueryResult{Error: err}, err } } // Parse LIMIT and OFFSET clauses // Use -1 to distinguish "no LIMIT" from "LIMIT 0" limit := -1 offset := 0 if stmt.Limit != nil && stmt.Limit.Rowcount != nil { switch limitExpr := stmt.Limit.Rowcount.(type) { case *SQLVal: if limitExpr.Type == IntVal { var parseErr error limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) if parseErr != nil { return &QueryResult{Error: parseErr}, parseErr } if limit64 > math.MaxInt32 || limit64 < 0 { return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) } limit = int(limit64) } } } // Parse OFFSET clause if present if stmt.Limit != nil && stmt.Limit.Offset != nil { switch offsetExpr := stmt.Limit.Offset.(type) { case *SQLVal: if offsetExpr.Type == IntVal { var parseErr error offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) if parseErr != nil { return &QueryResult{Error: parseErr}, parseErr } if offset64 > math.MaxInt32 || offset64 < 0 { return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) } offset = int(offset64) } } } // Build hybrid scan options // Extract time filters from WHERE clause to optimize scanning startTimeNs, stopTimeNs := int64(0), int64(0) if stmt.Where != nil { startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) } hybridScanOptions := HybridScanOptions{ StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons Limit: limit, Offset: offset, Predicate: predicate, } if !selectAll { // Convert baseColumnsSet to slice for hybrid scan options baseColumns := make([]string, 0, len(baseColumnsSet)) for columnName := range baseColumnsSet { baseColumns = append(baseColumns, columnName) } // Use base columns (not expression aliases) for data retrieval if len(baseColumns) > 0 { hybridScanOptions.Columns = baseColumns } else { // If no base columns found (shouldn't happen), use original columns hybridScanOptions.Columns = columns } } // Execute the hybrid scan with stats capture for EXPLAIN var results []HybridScanResult if plan != nil { // EXPLAIN mode - capture broker buffer stats var stats *HybridScanStats results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) if err != nil { return &QueryResult{Error: err}, err } // Populate plan with broker buffer information if stats != nil { plan.BrokerBufferQueried = stats.BrokerBufferQueried plan.BrokerBufferMessages = stats.BrokerBufferMessages plan.BufferStartIndex = stats.BufferStartIndex // Add broker_buffer to data sources if buffer was queried if stats.BrokerBufferQueried { // Check if broker_buffer is already in data sources hasBrokerBuffer := false for _, source := range plan.DataSources { if source == "broker_buffer" { hasBrokerBuffer = true break } } if !hasBrokerBuffer { plan.DataSources = append(plan.DataSources, "broker_buffer") } } } } else { // Normal mode - just get results results, err = hybridScanner.Scan(ctx, hybridScanOptions) if err != nil { return &QueryResult{Error: err}, err } } // Convert to SQL result format if selectAll { if len(columns) > 0 { // SELECT *, specific_columns - include both auto-discovered and explicit columns return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil } else { // SELECT * only - let converter determine all columns (excludes system columns) columns = nil return hybridScanner.ConvertToSQLResult(results, columns), nil } } // Handle custom column expressions (including arithmetic) return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil } // extractTimeFilters extracts time range filters from WHERE clause for optimization // This allows push-down of time-based queries to improve scan performance // Returns (startTimeNs, stopTimeNs) where 0 means unbounded func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) { startTimeNs, stopTimeNs := int64(0), int64(0) // Recursively extract time filters from expression tree e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs) // Special case: if startTimeNs == stopTimeNs, treat it like an equality query // to avoid premature scan termination. The predicate will handle exact matching. if startTimeNs != 0 && startTimeNs == stopTimeNs { stopTimeNs = 0 } return startTimeNs, stopTimeNs } // extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) { switch exprType := expr.(type) { case *ComparisonExpr: e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs) case *AndExpr: // For AND expressions, combine time filters (intersection) e.extractTimeFiltersRecursive(exprType.Left, startTimeNs, stopTimeNs) e.extractTimeFiltersRecursive(exprType.Right, startTimeNs, stopTimeNs) case *OrExpr: // For OR expressions, we can't easily optimize time ranges // Skip time filter extraction for OR clauses to avoid incorrect results return case *ParenExpr: // Unwrap parentheses and continue e.extractTimeFiltersRecursive(exprType.Expr, startTimeNs, stopTimeNs) } } // extractTimeFromComparison extracts time bounds from comparison expressions // Handles comparisons against timestamp columns (system columns and schema-defined timestamp types) func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) { // Check if this is a time-related column comparison leftCol := e.getColumnName(comp.Left) rightCol := e.getColumnName(comp.Right) var valueExpr ExprNode var reversed bool // Determine which side is the time column (using schema types) if e.isTimestampColumn(leftCol) { valueExpr = comp.Right reversed = false } else if e.isTimestampColumn(rightCol) { valueExpr = comp.Left reversed = true } else { // Not a time comparison return } // Extract the time value timeValue := e.extractTimeValue(valueExpr) if timeValue == 0 { // Couldn't parse time value return } // Apply the comparison operator to determine time bounds operator := comp.Operator if reversed { // Reverse the operator if column and value are swapped operator = e.reverseOperator(operator) } switch operator { case GreaterThanStr: // timestamp > value if *startTimeNs == 0 || timeValue > *startTimeNs { *startTimeNs = timeValue } case GreaterEqualStr: // timestamp >= value if *startTimeNs == 0 || timeValue >= *startTimeNs { *startTimeNs = timeValue } case LessThanStr: // timestamp < value if *stopTimeNs == 0 || timeValue < *stopTimeNs { *stopTimeNs = timeValue } case LessEqualStr: // timestamp <= value if *stopTimeNs == 0 || timeValue <= *stopTimeNs { *stopTimeNs = timeValue } case EqualStr: // timestamp = value (point query) // For exact matches, we set startTimeNs slightly before the target // This works around a scan boundary bug where >= X starts after X instead of at X // The predicate function will handle exact matching *startTimeNs = timeValue - 1 // Do NOT set stopTimeNs - let the predicate handle exact matching } } // isTimestampColumn checks if a column is a timestamp using schema type information func (e *SQLEngine) isTimestampColumn(columnName string) bool { if columnName == "" { return false } // System timestamp columns are always time columns if columnName == SW_COLUMN_NAME_TIMESTAMP { return true } // For user-defined columns, check actual schema type information if e.catalog != nil { currentDB := e.catalog.GetCurrentDatabase() if currentDB == "" { currentDB = "default" } // Get current table context from query execution // Note: This is a limitation - we need table context here // In a full implementation, this would be passed from the query context tableInfo, err := e.getCurrentTableInfo(currentDB) if err == nil && tableInfo != nil { for _, col := range tableInfo.Columns { if strings.EqualFold(col.Name, columnName) { // Use actual SQL type to determine if this is a timestamp return e.isSQLTypeTimestamp(col.Type) } } } } // Only return true if we have explicit type information // No guessing based on column names return false } // isSQLTypeTimestamp checks if a SQL type string represents a timestamp type func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { upperType := strings.ToUpper(strings.TrimSpace(sqlType)) // Handle type with precision/length specifications if idx := strings.Index(upperType, "("); idx != -1 { upperType = upperType[:idx] } switch upperType { case "TIMESTAMP", "DATETIME": return true case "BIGINT": // BIGINT could be a timestamp if it follows the pattern for timestamp storage // This is a heuristic - in a better system, we'd have semantic type information return false // Conservative approach - require explicit TIMESTAMP type default: return false } } // getCurrentTableInfo attempts to get table info for the current query context // This is a simplified implementation - ideally table context would be passed explicitly func (e *SQLEngine) getCurrentTableInfo(database string) (*TableInfo, error) { // This is a limitation of the current architecture // In practice, we'd need the table context from the current query // For now, return nil to fallback to naming conventions // TODO: Enhance architecture to pass table context through query execution return nil, fmt.Errorf("table context not available in current architecture") } // getColumnName extracts column name from expression (handles ColName types) func (e *SQLEngine) getColumnName(expr ExprNode) string { switch exprType := expr.(type) { case *ColName: return exprType.Name.String() } return "" } // resolveColumnAlias tries to resolve a column name that might be an alias func (e *SQLEngine) resolveColumnAlias(columnName string, selectExprs []SelectExpr) string { if selectExprs == nil { return columnName } // Check if this column name is actually an alias in the SELECT list for _, selectExpr := range selectExprs { if aliasedExpr, ok := selectExpr.(*AliasedExpr); ok && aliasedExpr != nil { // Check if the alias matches our column name if aliasedExpr.As != nil && !aliasedExpr.As.IsEmpty() && aliasedExpr.As.String() == columnName { // If the aliased expression is a column, return the actual column name if colExpr, ok := aliasedExpr.Expr.(*ColName); ok && colExpr != nil { return colExpr.Name.String() } } } } // If no alias found, return the original column name return columnName } // extractTimeValue parses time values from SQL expressions // Supports nanosecond timestamps, ISO dates, and relative times func (e *SQLEngine) extractTimeValue(expr ExprNode) int64 { switch exprType := expr.(type) { case *SQLVal: switch exprType.Type { case IntVal: // Parse as nanosecond timestamp if val, err := strconv.ParseInt(string(exprType.Val), 10, 64); err == nil { return val } case StrVal: // Parse as ISO date or other string formats timeStr := string(exprType.Val) // Try parsing as RFC3339 (ISO 8601) if t, err := time.Parse(time.RFC3339, timeStr); err == nil { return t.UnixNano() } // Try parsing as RFC3339 with nanoseconds if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil { return t.UnixNano() } // Try parsing as date only (YYYY-MM-DD) if t, err := time.Parse("2006-01-02", timeStr); err == nil { return t.UnixNano() } // Try parsing as datetime (YYYY-MM-DD HH:MM:SS) if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil { return t.UnixNano() } } } return 0 // Couldn't parse } // reverseOperator reverses comparison operators when column and value are swapped func (e *SQLEngine) reverseOperator(op string) string { switch op { case GreaterThanStr: return LessThanStr case GreaterEqualStr: return LessEqualStr case LessThanStr: return GreaterThanStr case LessEqualStr: return GreaterEqualStr case EqualStr: return EqualStr case NotEqualStr: return NotEqualStr default: return op } } // buildPredicate creates a predicate function from a WHERE clause expression // This is a simplified implementation - a full implementation would be much more complex func (e *SQLEngine) buildPredicate(expr ExprNode) (func(*schema_pb.RecordValue) bool, error) { return e.buildPredicateWithContext(expr, nil) } // buildPredicateWithContext creates a predicate function with SELECT context for alias resolution func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { switch exprType := expr.(type) { case *ComparisonExpr: return e.buildComparisonPredicateWithContext(exprType, selectExprs) case *BetweenExpr: return e.buildBetweenPredicateWithContext(exprType, selectExprs) case *IsNullExpr: return e.buildIsNullPredicateWithContext(exprType, selectExprs) case *IsNotNullExpr: return e.buildIsNotNullPredicateWithContext(exprType, selectExprs) case *AndExpr: leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs) if err != nil { return nil, err } rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs) if err != nil { return nil, err } return func(record *schema_pb.RecordValue) bool { return leftPred(record) && rightPred(record) }, nil case *OrExpr: leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs) if err != nil { return nil, err } rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs) if err != nil { return nil, err } return func(record *schema_pb.RecordValue) bool { return leftPred(record) || rightPred(record) }, nil default: return nil, fmt.Errorf("unsupported WHERE expression: %T", expr) } } // buildPredicateWithAliases creates a predicate function with alias resolution support func (e *SQLEngine) buildPredicateWithAliases(expr ExprNode, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { switch exprType := expr.(type) { case *ComparisonExpr: return e.buildComparisonPredicateWithAliases(exprType, aliases) case *BetweenExpr: return e.buildBetweenPredicateWithAliases(exprType, aliases) case *IsNullExpr: return e.buildIsNullPredicateWithAliases(exprType, aliases) case *IsNotNullExpr: return e.buildIsNotNullPredicateWithAliases(exprType, aliases) case *AndExpr: leftPred, err := e.buildPredicateWithAliases(exprType.Left, aliases) if err != nil { return nil, err } rightPred, err := e.buildPredicateWithAliases(exprType.Right, aliases) if err != nil { return nil, err } return func(record *schema_pb.RecordValue) bool { return leftPred(record) && rightPred(record) }, nil case *OrExpr: leftPred, err := e.buildPredicateWithAliases(exprType.Left, aliases) if err != nil { return nil, err } rightPred, err := e.buildPredicateWithAliases(exprType.Right, aliases) if err != nil { return nil, err } return func(record *schema_pb.RecordValue) bool { return leftPred(record) || rightPred(record) }, nil default: return nil, fmt.Errorf("unsupported WHERE expression: %T", expr) } } // buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { var columnName string var compareValue interface{} var operator string // Extract the comparison details, resolving aliases if needed leftCol := e.getColumnNameWithAliases(expr.Left, aliases) rightCol := e.getColumnNameWithAliases(expr.Right, aliases) operator = e.normalizeOperator(expr.Operator) if leftCol != "" && rightCol == "" { // Left side is column, right side is value columnName = e.getSystemColumnInternalName(leftCol) val, err := e.extractValueFromExpr(expr.Right) if err != nil { return nil, err } compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) } else if rightCol != "" && leftCol == "" { // Right side is column, left side is value columnName = e.getSystemColumnInternalName(rightCol) val, err := e.extractValueFromExpr(expr.Left) if err != nil { return nil, err } compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) // Reverse the operator when column is on the right operator = e.reverseOperator(operator) } else if leftCol != "" && rightCol != "" { return nil, fmt.Errorf("column-to-column comparisons not yet supported") } else { return nil, fmt.Errorf("at least one side of comparison must be a column") } return func(record *schema_pb.RecordValue) bool { fieldValue, exists := record.Fields[columnName] if !exists { return false } return e.evaluateComparison(fieldValue, operator, compareValue) }, nil } // buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.) // Handles column names on both left and right sides of the comparison func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) { return e.buildComparisonPredicateWithContext(expr, nil) } // buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { var columnName string var compareValue interface{} var operator string // Check if column is on the left side (normal case: column > value) if colName, ok := expr.Left.(*ColName); ok { rawColumnName := colName.Name.String() // Resolve potential alias to actual column name columnName = e.resolveColumnAlias(rawColumnName, selectExprs) // Map display names to internal names for system columns columnName = e.getSystemColumnInternalName(columnName) operator = expr.Operator // Extract comparison value from right side val, err := e.extractComparisonValue(expr.Right) if err != nil { return nil, fmt.Errorf("failed to extract right-side value: %v", err) } compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) } else if colName, ok := expr.Right.(*ColName); ok { // Column is on the right side (reversed case: value < column) rawColumnName := colName.Name.String() // Resolve potential alias to actual column name columnName = e.resolveColumnAlias(rawColumnName, selectExprs) // Map display names to internal names for system columns columnName = e.getSystemColumnInternalName(columnName) // Reverse the operator when column is on right side operator = e.reverseOperator(expr.Operator) // Extract comparison value from left side val, err := e.extractComparisonValue(expr.Left) if err != nil { return nil, fmt.Errorf("failed to extract left-side value: %v", err) } compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) } else { // Handle literal-only comparisons like 1 = 0, 'a' = 'b', etc. leftVal, leftErr := e.extractComparisonValue(expr.Left) rightVal, rightErr := e.extractComparisonValue(expr.Right) if leftErr != nil || rightErr != nil { return nil, fmt.Errorf("no column name found in comparison expression, left: %T, right: %T", expr.Left, expr.Right) } // Evaluate the literal comparison once result := e.compareLiteralValues(leftVal, rightVal, expr.Operator) // Return a constant predicate return func(record *schema_pb.RecordValue) bool { return result }, nil } // Return the predicate function return func(record *schema_pb.RecordValue) bool { fieldValue, exists := record.Fields[columnName] if !exists { return false // Column doesn't exist in record } // Use the comparison evaluation function return e.evaluateComparison(fieldValue, operator, compareValue) }, nil } // buildBetweenPredicateWithContext creates a predicate for BETWEEN operations func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { var columnName string var fromValue, toValue interface{} // Check if left side is a column name if colName, ok := expr.Left.(*ColName); ok { rawColumnName := colName.Name.String() // Resolve potential alias to actual column name columnName = e.resolveColumnAlias(rawColumnName, selectExprs) // Map display names to internal names for system columns columnName = e.getSystemColumnInternalName(columnName) // Extract FROM value fromVal, err := e.extractComparisonValue(expr.From) if err != nil { return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) } fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) // Extract TO value toVal, err := e.extractComparisonValue(expr.To) if err != nil { return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) } toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) } else { return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) } // Return the predicate function return func(record *schema_pb.RecordValue) bool { fieldValue, exists := record.Fields[columnName] if !exists { return false } // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) result := greaterThanOrEqualFrom && lessThanOrEqualTo // Handle NOT BETWEEN if expr.Not { result = !result } return result }, nil } // buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { var columnName string var fromValue, toValue interface{} // Extract column name from left side with alias resolution leftCol := e.getColumnNameWithAliases(expr.Left, aliases) if leftCol == "" { return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) } columnName = e.getSystemColumnInternalName(leftCol) // Extract FROM value fromVal, err := e.extractValueFromExpr(expr.From) if err != nil { return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) } fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) // Extract TO value toVal, err := e.extractValueFromExpr(expr.To) if err != nil { return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) } toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) // Return the predicate function return func(record *schema_pb.RecordValue) bool { fieldValue, exists := record.Fields[columnName] if !exists { return false } // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) result := greaterThanOrEqualFrom && lessThanOrEqualTo // Handle NOT BETWEEN if expr.Not { result = !result } return result }, nil } // buildIsNullPredicateWithContext creates a predicate for IS NULL operations func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { // Check if the expression is a column name if colName, ok := expr.Expr.(*ColName); ok { rawColumnName := colName.Name.String() // Resolve potential alias to actual column name columnName := e.resolveColumnAlias(rawColumnName, selectExprs) // Map display names to internal names for system columns columnName = e.getSystemColumnInternalName(columnName) // Return the predicate function return func(record *schema_pb.RecordValue) bool { // Check if field exists and if it's null or missing fieldValue, exists := record.Fields[columnName] if !exists { return true // Field doesn't exist = NULL } // Check if the field value itself is null/empty return e.isValueNull(fieldValue) }, nil } else { return nil, fmt.Errorf("IS NULL left operand must be a column name, got: %T", expr.Expr) } } // buildIsNotNullPredicateWithContext creates a predicate for IS NOT NULL operations func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { // Check if the expression is a column name if colName, ok := expr.Expr.(*ColName); ok { rawColumnName := colName.Name.String() // Resolve potential alias to actual column name columnName := e.resolveColumnAlias(rawColumnName, selectExprs) // Map display names to internal names for system columns columnName = e.getSystemColumnInternalName(columnName) // Return the predicate function return func(record *schema_pb.RecordValue) bool { // Check if field exists and if it's not null fieldValue, exists := record.Fields[columnName] if !exists { return false // Field doesn't exist = NULL, so NOT NULL is false } // Check if the field value itself is not null/empty return !e.isValueNull(fieldValue) }, nil } else { return nil, fmt.Errorf("IS NOT NULL left operand must be a column name, got: %T", expr.Expr) } } // buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { // Extract column name from expression with alias resolution columnName := e.getColumnNameWithAliases(expr.Expr, aliases) if columnName == "" { return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr) } columnName = e.getSystemColumnInternalName(columnName) // Return the predicate function return func(record *schema_pb.RecordValue) bool { // Check if field exists and if it's null or missing fieldValue, exists := record.Fields[columnName] if !exists { return true // Field doesn't exist = NULL } // Check if the field value itself is null/empty return e.isValueNull(fieldValue) }, nil } // buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { // Extract column name from expression with alias resolution columnName := e.getColumnNameWithAliases(expr.Expr, aliases) if columnName == "" { return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr) } columnName = e.getSystemColumnInternalName(columnName) // Return the predicate function return func(record *schema_pb.RecordValue) bool { // Check if field exists and if it's not null fieldValue, exists := record.Fields[columnName] if !exists { return false // Field doesn't exist = NULL, so NOT NULL is false } // Check if the field value itself is not null/empty return !e.isValueNull(fieldValue) }, nil } // isValueNull checks if a schema_pb.Value is null or represents a null value func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { if value == nil { return true } // Check the Kind field to see if it represents a null value if value.Kind == nil { return true } // For different value types, check if they represent null/empty values switch kind := value.Kind.(type) { case *schema_pb.Value_StringValue: // Empty string could be considered null depending on semantics // For now, treat empty string as not null (SQL standard behavior) return false case *schema_pb.Value_BoolValue: return false // Boolean values are never null case *schema_pb.Value_Int32Value, *schema_pb.Value_Int64Value: return false // Integer values are never null case *schema_pb.Value_FloatValue, *schema_pb.Value_DoubleValue: return false // Numeric values are never null case *schema_pb.Value_BytesValue: // Bytes could be null if empty, but for now treat as not null return false case *schema_pb.Value_TimestampValue: // Check if timestamp is zero/uninitialized return kind.TimestampValue == nil case *schema_pb.Value_DateValue: return kind.DateValue == nil case *schema_pb.Value_TimeValue: return kind.TimeValue == nil default: // Unknown type, consider it null to be safe return true } } // getColumnNameWithAliases extracts column name from expression, resolving aliases if needed func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string { switch exprType := expr.(type) { case *ColName: colName := exprType.Name.String() // Check if this is an alias that should be resolved if aliases != nil { if actualExpr, exists := aliases[colName]; exists { // Recursively resolve the aliased expression return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases } } return colName } return "" } // extractValueFromExpr extracts a value from an expression node (for alias support) func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) { return e.extractComparisonValue(expr) } // normalizeOperator normalizes comparison operators func (e *SQLEngine) normalizeOperator(op string) string { return op // For now, just return as-is } // extractSelectAliases builds a map of aliases to their underlying expressions func (e *SQLEngine) extractSelectAliases(selectExprs []SelectExpr) map[string]ExprNode { aliases := make(map[string]ExprNode) if selectExprs == nil { return aliases } for _, selectExpr := range selectExprs { if selectExpr == nil { continue } if aliasedExpr, ok := selectExpr.(*AliasedExpr); ok && aliasedExpr != nil { // Additional safety checks if aliasedExpr.As != nil && !aliasedExpr.As.IsEmpty() && aliasedExpr.Expr != nil { // Map the alias name to the underlying expression aliases[aliasedExpr.As.String()] = aliasedExpr.Expr } } } return aliases } // extractComparisonValue extracts the comparison value from a SQL expression func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) { switch val := expr.(type) { case *SQLVal: switch val.Type { case IntVal: intVal, err := strconv.ParseInt(string(val.Val), 10, 64) if err != nil { return nil, err } return intVal, nil case StrVal: return string(val.Val), nil case FloatVal: floatVal, err := strconv.ParseFloat(string(val.Val), 64) if err != nil { return nil, err } return floatVal, nil default: return nil, fmt.Errorf("unsupported SQL value type: %v", val.Type) } case *ArithmeticExpr: // Handle arithmetic expressions like CURRENT_TIMESTAMP - INTERVAL '1 hour' return e.evaluateArithmeticExpressionForComparison(val) case *FuncExpr: // Handle function calls like NOW(), CURRENT_TIMESTAMP return e.evaluateFunctionExpressionForComparison(val) case *IntervalExpr: // Handle standalone INTERVAL expressions nanos, err := e.evaluateInterval(val.Value) if err != nil { return nil, err } return nanos, nil case ValTuple: // Handle IN expressions with multiple values: column IN (value1, value2, value3) var inValues []interface{} for _, tupleVal := range val { switch v := tupleVal.(type) { case *SQLVal: switch v.Type { case IntVal: intVal, err := strconv.ParseInt(string(v.Val), 10, 64) if err != nil { return nil, err } inValues = append(inValues, intVal) case StrVal: inValues = append(inValues, string(v.Val)) case FloatVal: floatVal, err := strconv.ParseFloat(string(v.Val), 64) if err != nil { return nil, err } inValues = append(inValues, floatVal) } } } return inValues, nil default: return nil, fmt.Errorf("unsupported comparison value type: %T", expr) } } // evaluateArithmeticExpressionForComparison evaluates an arithmetic expression for WHERE clause comparisons func (e *SQLEngine) evaluateArithmeticExpressionForComparison(expr *ArithmeticExpr) (interface{}, error) { // Check if this is timestamp arithmetic with intervals if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") { // Evaluate timestamp arithmetic and return the result as nanoseconds result, err := e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator) if err != nil { return nil, err } // Extract the timestamp value as nanoseconds for comparison if result.Kind != nil { switch resultKind := result.Kind.(type) { case *schema_pb.Value_Int64Value: return resultKind.Int64Value, nil case *schema_pb.Value_StringValue: // If it's a formatted timestamp string, parse it back to nanoseconds if timestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", resultKind.StringValue); err == nil { return timestamp.UnixNano(), nil } return nil, fmt.Errorf("could not parse timestamp string: %s", resultKind.StringValue) } } return nil, fmt.Errorf("invalid timestamp arithmetic result") } // For other arithmetic operations, we'd need to evaluate them differently // For now, return an error for unsupported arithmetic return nil, fmt.Errorf("unsupported arithmetic expression in WHERE clause: %s", expr.Operator) } // evaluateFunctionExpressionForComparison evaluates a function expression for WHERE clause comparisons func (e *SQLEngine) evaluateFunctionExpressionForComparison(expr *FuncExpr) (interface{}, error) { funcName := strings.ToUpper(expr.Name.String()) switch funcName { case "NOW", "CURRENT_TIMESTAMP": result, err := e.Now() if err != nil { return nil, err } // Return as nanoseconds for comparison if result.Kind != nil { if resultKind, ok := result.Kind.(*schema_pb.Value_TimestampValue); ok { // Convert microseconds to nanoseconds return resultKind.TimestampValue.TimestampMicros * 1000, nil } } return nil, fmt.Errorf("invalid NOW() result: expected TimestampValue, got %T", result.Kind) case "CURRENT_DATE": result, err := e.CurrentDate() if err != nil { return nil, err } // Convert date to nanoseconds (start of day) if result.Kind != nil { if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok { if date, err := time.Parse("2006-01-02", resultKind.StringValue); err == nil { return date.UnixNano(), nil } } } return nil, fmt.Errorf("invalid CURRENT_DATE result") case "CURRENT_TIME": result, err := e.CurrentTime() if err != nil { return nil, err } // For time comparison, we might need special handling // For now, just return the string value if result.Kind != nil { if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok { return resultKind.StringValue, nil } } return nil, fmt.Errorf("invalid CURRENT_TIME result") default: return nil, fmt.Errorf("unsupported function in WHERE clause: %s", funcName) } } // evaluateComparison performs the actual comparison func (e *SQLEngine) evaluateComparison(fieldValue *schema_pb.Value, operator string, compareValue interface{}) bool { // This is a simplified implementation // A full implementation would handle type coercion and all comparison operators switch operator { case "=": return e.valuesEqual(fieldValue, compareValue) case "<": return e.valueLessThan(fieldValue, compareValue) case ">": return e.valueGreaterThan(fieldValue, compareValue) case "<=": return e.valuesEqual(fieldValue, compareValue) || e.valueLessThan(fieldValue, compareValue) case ">=": return e.valuesEqual(fieldValue, compareValue) || e.valueGreaterThan(fieldValue, compareValue) case "!=", "<>": return !e.valuesEqual(fieldValue, compareValue) case "LIKE", "like": return e.valueLike(fieldValue, compareValue) case "IN", "in": return e.valueIn(fieldValue, compareValue) default: return false } } // Helper functions for value comparison with proper type coercion func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interface{}) bool { // Handle string comparisons first if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { if strVal, ok := compareValue.(string); ok { return strField.StringValue == strVal } return false } // Handle boolean comparisons if boolField, ok := fieldValue.Kind.(*schema_pb.Value_BoolValue); ok { if boolVal, ok := compareValue.(bool); ok { return boolField.BoolValue == boolVal } return false } // Handle logical type comparisons if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { if timestampVal, ok := compareValue.(int64); ok { return timestampField.TimestampValue.TimestampMicros == timestampVal } return false } if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { if dateVal, ok := compareValue.(int32); ok { return dateField.DateValue.DaysSinceEpoch == dateVal } return false } // Handle DecimalValue comparison (convert to string for comparison) if decimalField, ok := fieldValue.Kind.(*schema_pb.Value_DecimalValue); ok { if decimalStr, ok := compareValue.(string); ok { // Convert decimal bytes back to string for comparison decimalValue := e.decimalToString(decimalField.DecimalValue) return decimalValue == decimalStr } return false } if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { if timeVal, ok := compareValue.(int64); ok { return timeField.TimeValue.TimeMicros == timeVal } return false } // Handle direct int64 comparisons for timestamp precision (before float64 conversion) if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { if int64Val, ok := compareValue.(int64); ok { return int64Field.Int64Value == int64Val } if intVal, ok := compareValue.(int); ok { return int64Field.Int64Value == int64(intVal) } } // Handle direct int32 comparisons if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { if int32Val, ok := compareValue.(int32); ok { return int32Field.Int32Value == int32Val } if intVal, ok := compareValue.(int); ok { return int32Field.Int32Value == int32(intVal) } if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { return int32Field.Int32Value == int32(int64Val) } } // Handle numeric comparisons with type coercion (fallback for other numeric types) fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue) if fieldNum != nil && compareNum != nil { return *fieldNum == *compareNum } return false } // convertCompareValueToNumber converts compare values from SQL queries to float64 func (e *SQLEngine) convertCompareValueToNumber(compareValue interface{}) *float64 { switch v := compareValue.(type) { case int: result := float64(v) return &result case int32: result := float64(v) return &result case int64: result := float64(v) return &result case float32: result := float64(v) return &result case float64: return &v case string: // Try to parse string as number for flexible comparisons if parsed, err := strconv.ParseFloat(v, 64); err == nil { return &parsed } } return nil } // decimalToString converts a DecimalValue back to string representation func (e *SQLEngine) decimalToString(decimalValue *schema_pb.DecimalValue) string { if decimalValue == nil || decimalValue.Value == nil { return "0" } // Convert bytes back to big.Int intValue := new(big.Int).SetBytes(decimalValue.Value) // Convert to string with proper decimal placement str := intValue.String() // Handle decimal placement based on scale scale := int(decimalValue.Scale) if scale > 0 && len(str) > scale { // Insert decimal point decimalPos := len(str) - scale return str[:decimalPos] + "." + str[decimalPos:] } return str } func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { // Handle string comparisons lexicographically if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { if strVal, ok := compareValue.(string); ok { return strField.StringValue < strVal } return false } // Handle logical type comparisons if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { if timestampVal, ok := compareValue.(int64); ok { return timestampField.TimestampValue.TimestampMicros < timestampVal } return false } if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { if dateVal, ok := compareValue.(int32); ok { return dateField.DateValue.DaysSinceEpoch < dateVal } return false } if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { if timeVal, ok := compareValue.(int64); ok { return timeField.TimeValue.TimeMicros < timeVal } return false } // Handle direct int64 comparisons for timestamp precision (before float64 conversion) if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { if int64Val, ok := compareValue.(int64); ok { return int64Field.Int64Value < int64Val } if intVal, ok := compareValue.(int); ok { return int64Field.Int64Value < int64(intVal) } } // Handle direct int32 comparisons if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { if int32Val, ok := compareValue.(int32); ok { return int32Field.Int32Value < int32Val } if intVal, ok := compareValue.(int); ok { return int32Field.Int32Value < int32(intVal) } if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { return int32Field.Int32Value < int32(int64Val) } } // Handle numeric comparisons with type coercion (fallback for other numeric types) fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue) if fieldNum != nil && compareNum != nil { return *fieldNum < *compareNum } return false } func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { // Handle string comparisons lexicographically if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { if strVal, ok := compareValue.(string); ok { return strField.StringValue > strVal } return false } // Handle logical type comparisons if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { if timestampVal, ok := compareValue.(int64); ok { return timestampField.TimestampValue.TimestampMicros > timestampVal } return false } if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { if dateVal, ok := compareValue.(int32); ok { return dateField.DateValue.DaysSinceEpoch > dateVal } return false } if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { if timeVal, ok := compareValue.(int64); ok { return timeField.TimeValue.TimeMicros > timeVal } return false } // Handle direct int64 comparisons for timestamp precision (before float64 conversion) if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { if int64Val, ok := compareValue.(int64); ok { return int64Field.Int64Value > int64Val } if intVal, ok := compareValue.(int); ok { return int64Field.Int64Value > int64(intVal) } } // Handle direct int32 comparisons if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { if int32Val, ok := compareValue.(int32); ok { return int32Field.Int32Value > int32Val } if intVal, ok := compareValue.(int); ok { return int32Field.Int32Value > int32(intVal) } if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { return int32Field.Int32Value > int32(int64Val) } } // Handle numeric comparisons with type coercion (fallback for other numeric types) fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue) if fieldNum != nil && compareNum != nil { return *fieldNum > *compareNum } return false } // valueLike implements SQL LIKE pattern matching with % and _ wildcards func (e *SQLEngine) valueLike(fieldValue *schema_pb.Value, compareValue interface{}) bool { // Only support LIKE for string values stringVal, ok := fieldValue.Kind.(*schema_pb.Value_StringValue) if !ok { return false } pattern, ok := compareValue.(string) if !ok { return false } // Convert SQL LIKE pattern to Go regex pattern // % matches any sequence of characters (.*), _ matches single character (.) regexPattern := strings.ReplaceAll(pattern, "%", ".*") regexPattern = strings.ReplaceAll(regexPattern, "_", ".") regexPattern = "^" + regexPattern + "$" // Anchor to match entire string // Compile and match regex regex, err := regexp.Compile(regexPattern) if err != nil { return false // Invalid pattern } return regex.MatchString(stringVal.StringValue) } // valueIn implements SQL IN operator for checking if value exists in a list func (e *SQLEngine) valueIn(fieldValue *schema_pb.Value, compareValue interface{}) bool { // For now, handle simple case where compareValue is a slice of values // In a full implementation, this would handle SQL IN expressions properly values, ok := compareValue.([]interface{}) if !ok { return false } // Check if fieldValue matches any value in the list for _, value := range values { if e.valuesEqual(fieldValue, value) { return true } } return false } // Helper methods for specific operations func (e *SQLEngine) showDatabases(ctx context.Context) (*QueryResult, error) { databases := e.catalog.ListDatabases() result := &QueryResult{ Columns: []string{"Database"}, Rows: make([][]sqltypes.Value, len(databases)), } for i, db := range databases { result.Rows[i] = []sqltypes.Value{ sqltypes.NewVarChar(db), } } return result, nil } func (e *SQLEngine) showTables(ctx context.Context, dbName string) (*QueryResult, error) { // Use current database context if no database specified if dbName == "" { dbName = e.catalog.GetCurrentDatabase() if dbName == "" { dbName = "default" } } tables, err := e.catalog.ListTables(dbName) if err != nil { return &QueryResult{Error: err}, err } result := &QueryResult{ Columns: []string{"Tables_in_" + dbName}, Rows: make([][]sqltypes.Value, len(tables)), } for i, table := range tables { result.Rows[i] = []sqltypes.Value{ sqltypes.NewVarChar(table), } } return result, nil } // compareLiteralValues compares two literal values with the given operator func (e *SQLEngine) compareLiteralValues(left, right interface{}, operator string) bool { switch operator { case "=", "==": return e.literalValuesEqual(left, right) case "!=", "<>": return !e.literalValuesEqual(left, right) case "<": return e.compareLiteralNumber(left, right) < 0 case "<=": return e.compareLiteralNumber(left, right) <= 0 case ">": return e.compareLiteralNumber(left, right) > 0 case ">=": return e.compareLiteralNumber(left, right) >= 0 default: // For unsupported operators, default to false return false } } // literalValuesEqual checks if two literal values are equal func (e *SQLEngine) literalValuesEqual(left, right interface{}) bool { // Convert both to strings for comparison leftStr := fmt.Sprintf("%v", left) rightStr := fmt.Sprintf("%v", right) return leftStr == rightStr } // compareLiteralNumber compares two values as numbers func (e *SQLEngine) compareLiteralNumber(left, right interface{}) int { leftNum, leftOk := e.convertToFloat64(left) rightNum, rightOk := e.convertToFloat64(right) if !leftOk || !rightOk { // Fall back to string comparison if not numeric leftStr := fmt.Sprintf("%v", left) rightStr := fmt.Sprintf("%v", right) if leftStr < rightStr { return -1 } else if leftStr > rightStr { return 1 } else { return 0 } } if leftNum < rightNum { return -1 } else if leftNum > rightNum { return 1 } else { return 0 } } // convertToFloat64 attempts to convert a value to float64 func (e *SQLEngine) convertToFloat64(value interface{}) (float64, bool) { switch v := value.(type) { case int64: return float64(v), true case int32: return float64(v), true case int: return float64(v), true case float64: return v, true case float32: return float64(v), true case string: if num, err := strconv.ParseFloat(v, 64); err == nil { return num, true } return 0, false default: return 0, false } } func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) { // Parse CREATE TABLE statement // Assumption: Table name format is [database.]table_name tableName := stmt.NewName.Name.String() database := "" // Check if database is specified in table name if stmt.NewName.Qualifier.String() != "" { database = stmt.NewName.Qualifier.String() } else { // Use current database context or default database = e.catalog.GetCurrentDatabase() if database == "" { database = "default" } } // Parse column definitions from CREATE TABLE // Assumption: stmt.TableSpec contains column definitions if stmt.TableSpec == nil || len(stmt.TableSpec.Columns) == 0 { err := fmt.Errorf("CREATE TABLE requires column definitions") return &QueryResult{Error: err}, err } // Convert SQL columns to MQ schema fields fields := make([]*schema_pb.Field, len(stmt.TableSpec.Columns)) for i, col := range stmt.TableSpec.Columns { fieldType, err := e.convertSQLTypeToMQ(col.Type) if err != nil { return &QueryResult{Error: err}, err } fields[i] = &schema_pb.Field{ Name: col.Name.String(), Type: fieldType, } } // Create record type for the topic recordType := &schema_pb.RecordType{ Fields: fields, } // Create the topic via broker using configurable partition count partitionCount := e.catalog.GetDefaultPartitionCount() err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType) if err != nil { return &QueryResult{Error: err}, err } // Register the new topic in catalog mqSchema := &schema.Schema{ Namespace: database, Name: tableName, RecordType: recordType, RevisionId: 1, // Initial revision } err = e.catalog.RegisterTopic(database, tableName, mqSchema) if err != nil { return &QueryResult{Error: err}, err } // Return success result result := &QueryResult{ Columns: []string{"Result"}, Rows: [][]sqltypes.Value{ {sqltypes.NewVarChar(fmt.Sprintf("Table '%s.%s' created successfully", database, tableName))}, }, } return result, nil } // ExecutionPlanBuilder handles building execution plans for queries type ExecutionPlanBuilder struct { engine *SQLEngine } // NewExecutionPlanBuilder creates a new execution plan builder func NewExecutionPlanBuilder(engine *SQLEngine) *ExecutionPlanBuilder { return &ExecutionPlanBuilder{engine: engine} } // BuildAggregationPlan builds an execution plan for aggregation queries func (builder *ExecutionPlanBuilder) BuildAggregationPlan( stmt *SelectStatement, aggregations []AggregationSpec, strategy AggregationStrategy, dataSources *TopicDataSources, ) *QueryExecutionPlan { plan := &QueryExecutionPlan{ QueryType: "SELECT", ExecutionStrategy: builder.determineExecutionStrategy(stmt, strategy), DataSources: builder.buildDataSourcesList(strategy, dataSources), PartitionsScanned: dataSources.PartitionsCount, ParquetFilesScanned: builder.countParquetFiles(dataSources), LiveLogFilesScanned: builder.countLiveLogFiles(dataSources), OptimizationsUsed: builder.buildOptimizationsList(stmt, strategy, dataSources), Aggregations: builder.buildAggregationsList(aggregations), Details: make(map[string]interface{}), } // Set row counts based on strategy if strategy.CanUseFastPath { plan.TotalRowsProcessed = dataSources.LiveLogRowCount // Only live logs are scanned, parquet uses metadata // Set scan method based on what data sources actually exist if dataSources.ParquetRowCount > 0 && dataSources.LiveLogRowCount > 0 { plan.Details["scan_method"] = "Parquet Metadata + Live Log Counting" } else if dataSources.ParquetRowCount > 0 { plan.Details["scan_method"] = "Parquet Metadata Only" } else { plan.Details["scan_method"] = "Live Log Counting Only" } } else { plan.TotalRowsProcessed = dataSources.ParquetRowCount + dataSources.LiveLogRowCount plan.Details["scan_method"] = "Full Data Scan" } return plan } // determineExecutionStrategy determines the execution strategy based on query characteristics func (builder *ExecutionPlanBuilder) determineExecutionStrategy(stmt *SelectStatement, strategy AggregationStrategy) string { if stmt.Where != nil { return "full_scan" } if strategy.CanUseFastPath { return "hybrid_fast_path" } return "full_scan" } // buildDataSourcesList builds the list of data sources used func (builder *ExecutionPlanBuilder) buildDataSourcesList(strategy AggregationStrategy, dataSources *TopicDataSources) []string { sources := []string{} if strategy.CanUseFastPath { // Only show parquet stats if there are actual parquet files if dataSources.ParquetRowCount > 0 { sources = append(sources, "parquet_stats") } if dataSources.LiveLogRowCount > 0 { sources = append(sources, "live_logs") } } else { sources = append(sources, "live_logs", "parquet_files") } // Note: broker_buffer is added dynamically during execution when broker is queried // See aggregations.go lines 397-409 for the broker buffer data source addition logic return sources } // countParquetFiles counts the total number of parquet files across all partitions func (builder *ExecutionPlanBuilder) countParquetFiles(dataSources *TopicDataSources) int { count := 0 for _, fileStats := range dataSources.ParquetFiles { count += len(fileStats) } return count } // countLiveLogFiles returns the total number of live log files across all partitions func (builder *ExecutionPlanBuilder) countLiveLogFiles(dataSources *TopicDataSources) int { return dataSources.LiveLogFilesCount } // buildOptimizationsList builds the list of optimizations used func (builder *ExecutionPlanBuilder) buildOptimizationsList(stmt *SelectStatement, strategy AggregationStrategy, dataSources *TopicDataSources) []string { optimizations := []string{} if strategy.CanUseFastPath { // Only include parquet statistics if there are actual parquet files if dataSources.ParquetRowCount > 0 { optimizations = append(optimizations, "parquet_statistics") } if dataSources.LiveLogRowCount > 0 { optimizations = append(optimizations, "live_log_counting") } // Always include deduplication when using fast path optimizations = append(optimizations, "deduplication") } if stmt.Where != nil { // Check if "predicate_pushdown" is already in the list found := false for _, opt := range optimizations { if opt == "predicate_pushdown" { found = true break } } if !found { optimizations = append(optimizations, "predicate_pushdown") } } return optimizations } // buildAggregationsList builds the list of aggregations for display func (builder *ExecutionPlanBuilder) buildAggregationsList(aggregations []AggregationSpec) []string { aggList := make([]string, len(aggregations)) for i, spec := range aggregations { aggList[i] = fmt.Sprintf("%s(%s)", spec.Function, spec.Column) } return aggList } // parseAggregationFunction parses an aggregation function expression func (e *SQLEngine) parseAggregationFunction(funcExpr *FuncExpr, aliasExpr *AliasedExpr) (*AggregationSpec, error) { funcName := strings.ToUpper(funcExpr.Name.String()) spec := &AggregationSpec{ Function: funcName, } // Parse function arguments switch funcName { case FuncCOUNT: if len(funcExpr.Exprs) != 1 { return nil, fmt.Errorf("COUNT function expects exactly 1 argument") } switch arg := funcExpr.Exprs[0].(type) { case *StarExpr: spec.Column = "*" spec.Alias = "COUNT(*)" case *AliasedExpr: if colName, ok := arg.Expr.(*ColName); ok { spec.Column = colName.Name.String() spec.Alias = fmt.Sprintf("COUNT(%s)", spec.Column) } else { return nil, fmt.Errorf("COUNT argument must be a column name or *") } default: return nil, fmt.Errorf("unsupported COUNT argument: %T", arg) } case FuncSUM, FuncAVG, FuncMIN, FuncMAX: if len(funcExpr.Exprs) != 1 { return nil, fmt.Errorf("%s function expects exactly 1 argument", funcName) } switch arg := funcExpr.Exprs[0].(type) { case *AliasedExpr: if colName, ok := arg.Expr.(*ColName); ok { spec.Column = colName.Name.String() spec.Alias = fmt.Sprintf("%s(%s)", funcName, spec.Column) } else { return nil, fmt.Errorf("%s argument must be a column name", funcName) } default: return nil, fmt.Errorf("unsupported %s argument: %T", funcName, arg) } default: return nil, fmt.Errorf("unsupported aggregation function: %s", funcName) } // Override with user-specified alias if provided if aliasExpr != nil && aliasExpr.As != nil && !aliasExpr.As.IsEmpty() { spec.Alias = aliasExpr.As.String() } return spec, nil } // computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) { if e.catalog.brokerClient == nil { return nil, nil, fmt.Errorf("no broker client available") } filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return nil, nil, fmt.Errorf("failed to get filer client: %v", err) } var minValue, maxValue interface{} var minSchemaValue, maxSchemaValue *schema_pb.Value // Process each live log file err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Skip parquet files and directories if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { return nil } // Skip files that have been converted to parquet (deduplication) if parquetSourceFiles[entry.Name] { return nil } filePath := partitionPath + "/" + entry.Name // Scan this log file for MIN/MAX values fileMin, fileMax, err := e.computeFileMinMax(filerClient, filePath, columnName) if err != nil { fmt.Printf("Warning: failed to compute min/max for file %s: %v\n", filePath, err) return nil // Continue with other files } // Update global min/max if fileMin != nil { if minSchemaValue == nil || e.compareValues(fileMin, minSchemaValue) < 0 { minSchemaValue = fileMin minValue = e.extractRawValue(fileMin) } } if fileMax != nil { if maxSchemaValue == nil || e.compareValues(fileMax, maxSchemaValue) > 0 { maxSchemaValue = fileMax maxValue = e.extractRawValue(fileMax) } } return nil }) if err != nil { return nil, nil, fmt.Errorf("failed to process partition directory %s: %v", partitionPath, err) } return minValue, maxValue, nil } // computeFileMinMax scans a single log file to find MIN/MAX values for a specific column func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath string, columnName string) (*schema_pb.Value, *schema_pb.Value, error) { var minValue, maxValue *schema_pb.Value err := e.eachLogEntryInFile(filerClient, filePath, func(logEntry *filer_pb.LogEntry) error { // Convert log entry to record value recordValue, _, err := e.convertLogEntryToRecordValue(logEntry) if err != nil { return err // This will stop processing this file but not fail the overall query } // Extract the requested column value var columnValue *schema_pb.Value if e.isSystemColumn(columnName) { // Handle system columns switch strings.ToLower(columnName) { case SW_COLUMN_NAME_TIMESTAMP: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}} case SW_COLUMN_NAME_KEY: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}} case SW_COLUMN_NAME_SOURCE: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}} } } else { // Handle regular data columns if value, exists := recordValue.Fields[columnName]; exists { columnValue = value } } if columnValue == nil { return nil // Skip this record } // Update min/max if minValue == nil || e.compareValues(columnValue, minValue) < 0 { minValue = columnValue } if maxValue == nil || e.compareValues(columnValue, maxValue) > 0 { maxValue = columnValue } return nil }) return minValue, maxValue, err } // eachLogEntryInFile reads a log file and calls the provided function for each log entry func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePath string, fn func(*filer_pb.LogEntry) error) error { // Extract directory and filename // filePath is like "partitionPath/filename" lastSlash := strings.LastIndex(filePath, "/") if lastSlash == -1 { return fmt.Errorf("invalid file path: %s", filePath) } dirPath := filePath[:lastSlash] fileName := filePath[lastSlash+1:] // Get file entry var fileEntry *filer_pb.Entry err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error { if entry.Name == fileName { fileEntry = entry } return nil }) if err != nil { return fmt.Errorf("failed to find file %s: %v", filePath, err) } if fileEntry == nil { return fmt.Errorf("file not found: %s", filePath) } lookupFileIdFn := filer.LookupFn(filerClient) // eachChunkFn processes each chunk's data (pattern from countRowsInLogFile) eachChunkFn := func(buf []byte) error { for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { break } entryData := buf[pos+4 : pos+4+int(size)] logEntry := &filer_pb.LogEntry{} if err := proto.Unmarshal(entryData, logEntry); err != nil { pos += 4 + int(size) continue // Skip corrupted entries } // Call the provided function for each log entry if err := fn(logEntry); err != nil { return err } pos += 4 + int(size) } return nil } // Read file chunks and process them (pattern from countRowsInLogFile) fileSize := filer.FileSize(fileEntry) visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, fileEntry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) for x := chunkViews.Front(); x != nil; x = x.Next { chunk := x.Value urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) if err != nil { fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) continue } if len(urlStrings) == 0 { continue } // Read chunk data // urlStrings[0] is already a complete URL (http://server:port/fileId) data, _, err := util_http.Get(urlStrings[0]) if err != nil { fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) continue } // Process this chunk if err := eachChunkFn(data); err != nil { return err } } return nil } // convertLogEntryToRecordValue helper method (reuse existing logic) func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { // Parse the log entry data as Protocol Buffer (not JSON!) recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil { return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err) } // Ensure Fields map exists if recordValue.Fields == nil { recordValue.Fields = make(map[string]*schema_pb.Value) } // Add system columns recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } // User data fields are already present in the protobuf-deserialized recordValue // No additional processing needed since proto.Unmarshal already populated the Fields map return recordValue, "live_log", nil } // extractTimestampFromFilename extracts timestamp from parquet filename // Format: YYYY-MM-DD-HH-MM-SS.parquet func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { // Remove .parquet extension filename = strings.TrimSuffix(filename, ".parquet") // Parse timestamp format: 2006-01-02-15-04-05 t, err := time.Parse("2006-01-02-15-04-05", filename) if err != nil { return 0 } return t.UnixNano() } // hasLiveLogFiles checks if there are any live log files (non-parquet files) in a partition func (e *SQLEngine) hasLiveLogFiles(partitionPath string) (bool, error) { // Get FilerClient from BrokerClient filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return false, err } hasLiveLogs := false // Read all files in the partition directory err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { // Skip directories and parquet files if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { return nil } // Found a non-parquet file (live log) hasLiveLogs = true return nil // Can continue or return early, doesn't matter for existence check }) return hasLiveLogs, err } // countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err } totalRows := int64(0) err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { return nil // Skip directories and parquet files } // Count rows in live log file rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) if err != nil { fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) return nil // Continue with other files } totalRows += rowCount return nil }) return totalRows, err } // extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { sourceFiles := make(map[string]bool) for _, fileStat := range fileStats { // Each ParquetFileStats should have a reference to the original file entry // but we need to get it through the hybrid scanner to access Extended metadata // This is a simplified approach - in practice we'd need to access the filer entry // For now, we'll use filename-based deduplication as a fallback // Extract timestamp from parquet filename (YYYY-MM-DD-HH-MM-SS.parquet) if strings.HasSuffix(fileStat.FileName, ".parquet") { timeStr := strings.TrimSuffix(fileStat.FileName, ".parquet") // Mark this timestamp range as covered by parquet sourceFiles[timeStr] = true } } return sourceFiles } // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err } // First, get the actual source files from parquet metadata actualSourceFiles, err := e.getParquetSourceFilesFromMetadata(partitionPath) if err != nil { // If we can't read parquet metadata, use filename-based fallback fmt.Printf("Warning: failed to read parquet metadata, using filename-based deduplication: %v\n", err) actualSourceFiles = parquetSourceFiles } // Second, get duplicate files from log buffer metadata logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) if err != nil { if isDebugMode(ctx) { fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) } logBufferDuplicates = make(map[string]bool) } // Debug: Show deduplication status (only in explain mode) if isDebugMode(ctx) { if len(actualSourceFiles) > 0 { fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) } if len(logBufferDuplicates) > 0 { fmt.Printf("Excluding %d duplicate log buffer files from %s\n", len(logBufferDuplicates), partitionPath) } } totalRows := int64(0) err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { return nil // Skip directories and parquet files } // Skip files that have been converted to parquet if actualSourceFiles[entry.Name] { if isDebugMode(ctx) { fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) } return nil } // Skip files that are duplicated due to log buffer metadata if logBufferDuplicates[entry.Name] { if isDebugMode(ctx) { fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) } return nil } // Count rows in live log file rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) if err != nil { fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) return nil // Continue with other files } totalRows += rowCount return nil }) return totalRows, err } // getParquetSourceFilesFromMetadata reads parquet file metadata to get actual source log files func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map[string]bool, error) { filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return nil, err } sourceFiles := make(map[string]bool) err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { return nil } // Read source files from Extended metadata if entry.Extended != nil && entry.Extended["sources"] != nil { var sources []string if err := json.Unmarshal(entry.Extended["sources"], &sources); err == nil { for _, source := range sources { sourceFiles[source] = true } } } return nil }) return sourceFiles, err } // getLogBufferStartFromFile reads buffer start from file extended attributes func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) { if entry.Extended == nil { return nil, nil } // Only support binary buffer_start format if startData, exists := entry.Extended["buffer_start"]; exists { if len(startData) == 8 { startIndex := int64(binary.BigEndian.Uint64(startData)) if startIndex > 0 { return &LogBufferStart{StartIndex: startIndex}, nil } } else { return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData)) } } return nil, nil } // buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { if e.catalog.brokerClient == nil { return make(map[string]bool), nil } filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return make(map[string]bool), nil // Don't fail the query, just skip deduplication } // Track buffer ranges instead of individual indexes (much more efficient) type BufferRange struct { start, end int64 } processedRanges := make([]BufferRange, 0) duplicateFiles := make(map[string]bool) err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { return nil // Skip directories and parquet files } // Get buffer start for this file (most efficient) bufferStart, err := e.getLogBufferStartFromFile(entry) if err != nil || bufferStart == nil { return nil // No buffer info, can't deduplicate } // Calculate range for this file: [start, start + chunkCount - 1] chunkCount := int64(len(entry.GetChunks())) if chunkCount == 0 { return nil // Empty file, skip } fileRange := BufferRange{ start: bufferStart.StartIndex, end: bufferStart.StartIndex + chunkCount - 1, } // Check if this range overlaps with any processed range isDuplicate := false for _, processedRange := range processedRanges { if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { // Ranges overlap - this file contains duplicate buffer indexes isDuplicate = true if isDebugMode(ctx) { fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) } break } } if isDuplicate { duplicateFiles[entry.Name] = true } else { // Add this range to processed ranges processedRanges = append(processedRanges, fileRange) } return nil }) if err != nil { return make(map[string]bool), nil // Don't fail the query } return duplicateFiles, nil } // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { lookupFileIdFn := filer.LookupFn(filerClient) rowCount := int64(0) // eachChunkFn processes each chunk's data (pattern from read_log_from_disk.go) eachChunkFn := func(buf []byte) error { for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { break } entryData := buf[pos+4 : pos+4+int(size)] logEntry := &filer_pb.LogEntry{} if err := proto.Unmarshal(entryData, logEntry); err != nil { pos += 4 + int(size) continue // Skip corrupted entries } rowCount++ pos += 4 + int(size) } return nil } // Read file chunks and process them (pattern from read_log_from_disk.go) fileSize := filer.FileSize(entry) visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) for x := chunkViews.Front(); x != nil; x = x.Next { chunk := x.Value urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) if err != nil { fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) continue } if len(urlStrings) == 0 { continue } // Read chunk data // urlStrings[0] is already a complete URL (http://server:port/fileId) data, _, err := util_http.Get(urlStrings[0]) if err != nil { fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) continue } // Process this chunk if err := eachChunkFn(data); err != nil { return rowCount, err } } return rowCount, nil } // discoverTopicPartitions discovers all partitions for a given topic using centralized logic func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) { // Use centralized topic partition discovery t := topic.NewTopic(namespace, topicName) // Get FilerClient from BrokerClient filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return nil, err } return t.DiscoverPartitions(context.Background(), filerClient) } // getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs) func (e *SQLEngine) getTopicTotalRowCount(ctx context.Context, namespace, topicName string) (int64, error) { // Create a hybrid scanner to access parquet statistics var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { var filerClientErr error filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() if filerClientErr != nil { return 0, filerClientErr } } hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e) if err != nil { return 0, err } // Get all partitions for this topic // Note: discoverTopicPartitions always returns absolute paths partitions, err := e.discoverTopicPartitions(namespace, topicName) if err != nil { return 0, err } totalRowCount := int64(0) // For each partition, count both parquet and live log rows for _, partition := range partitions { // Count parquet rows parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) if parquetErr == nil { for _, stats := range parquetStats { totalRowCount += stats.RowCount } } // Count live log rows (with deduplication) parquetSourceFiles := make(map[string]bool) if parquetErr == nil { parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) } liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) if liveLogErr == nil { totalRowCount += liveLogCount } } return totalRowCount, nil } // getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations // (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only) func (e *SQLEngine) getActualRowsScannedForFastPath(ctx context.Context, namespace, topicName string) (int64, error) { // Create a hybrid scanner to access parquet statistics var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { var filerClientErr error filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() if filerClientErr != nil { return 0, filerClientErr } } hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e) if err != nil { return 0, err } // Get all partitions for this topic // Note: discoverTopicPartitions always returns absolute paths partitions, err := e.discoverTopicPartitions(namespace, topicName) if err != nil { return 0, err } totalScannedRows := int64(0) // For each partition, count ONLY the live log rows that need scanning // (parquet files use metadata/statistics, so they contribute 0 to scan count) for _, partition := range partitions { // Get parquet files to determine what was converted parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) parquetSourceFiles := make(map[string]bool) if parquetErr == nil { parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) } // Count only live log rows that haven't been converted to parquet liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) if liveLogErr == nil { totalScannedRows += liveLogCount } // Note: Parquet files contribute 0 to scan count since we use their metadata/statistics } return totalScannedRows, nil } // findColumnValue performs case-insensitive lookup of column values // Now includes support for system columns stored in HybridScanResult func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { // Check system columns first (stored separately in HybridScanResult) lowerColumnName := strings.ToLower(columnName) switch lowerColumnName { case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP: // For timestamp column, format as proper timestamp instead of raw nanoseconds timestamp := time.Unix(result.Timestamp/1e9, result.Timestamp%1e9) timestampStr := timestamp.UTC().Format("2006-01-02T15:04:05.000000000Z") return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: timestampStr}} case SW_COLUMN_NAME_KEY: return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} case SW_COLUMN_NAME_SOURCE: return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}} } // Then check regular columns in Values map // First try exact match if value, exists := result.Values[columnName]; exists { return value } // Then try case-insensitive match for key, value := range result.Values { if strings.ToLower(key) == lowerColumnName { return value } } return nil } // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { // First, check if topic exists by trying to get its schema from the broker/filer recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err) } // Create a schema object from the discovered record type mqSchema := &schema.Schema{ Namespace: database, Name: tableName, RecordType: recordType, RevisionId: 1, // Default to revision 1 for discovered topics } // Register the topic in the SQL catalog err = e.catalog.RegisterTopic(database, tableName, mqSchema) if err != nil { return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err) } // Note: This is a discovery operation, not query execution, so it's okay to always log return nil } // getArithmeticExpressionAlias generates a display alias for arithmetic expressions func (e *SQLEngine) getArithmeticExpressionAlias(expr *ArithmeticExpr) string { leftAlias := e.getExpressionAlias(expr.Left) rightAlias := e.getExpressionAlias(expr.Right) return leftAlias + expr.Operator + rightAlias } // getExpressionAlias generates an alias for any expression node func (e *SQLEngine) getExpressionAlias(expr ExprNode) string { switch exprType := expr.(type) { case *ColName: return exprType.Name.String() case *ArithmeticExpr: return e.getArithmeticExpressionAlias(exprType) case *SQLVal: return e.getSQLValAlias(exprType) default: return "expr" } } // evaluateArithmeticExpression evaluates an arithmetic expression for a given record func (e *SQLEngine) evaluateArithmeticExpression(expr *ArithmeticExpr, result HybridScanResult) (*schema_pb.Value, error) { // Check for timestamp arithmetic with intervals first if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") { return e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator) } // Get left operand value leftValue, err := e.evaluateExpressionValue(expr.Left, result) if err != nil { return nil, fmt.Errorf("error evaluating left operand: %v", err) } // Get right operand value rightValue, err := e.evaluateExpressionValue(expr.Right, result) if err != nil { return nil, fmt.Errorf("error evaluating right operand: %v", err) } // Handle string concatenation operator if expr.Operator == "||" { return e.Concat(leftValue, rightValue) } // Perform arithmetic operation var op ArithmeticOperator switch expr.Operator { case "+": op = OpAdd case "-": op = OpSub case "*": op = OpMul case "/": op = OpDiv case "%": op = OpMod default: return nil, fmt.Errorf("unsupported arithmetic operator: %s", expr.Operator) } return e.EvaluateArithmeticExpression(leftValue, rightValue, op) } // isTimestampArithmetic checks if an arithmetic operation involves timestamps and intervals func (e *SQLEngine) isTimestampArithmetic(left, right ExprNode) bool { // Check if left is a timestamp function (NOW, CURRENT_TIMESTAMP, etc.) leftIsTimestamp := e.isTimestampFunction(left) // Check if right is an interval rightIsInterval := e.isIntervalExpression(right) return leftIsTimestamp && rightIsInterval } // isTimestampFunction checks if an expression is a timestamp function func (e *SQLEngine) isTimestampFunction(expr ExprNode) bool { if funcExpr, ok := expr.(*FuncExpr); ok { funcName := strings.ToUpper(funcExpr.Name.String()) return funcName == "NOW" || funcName == "CURRENT_TIMESTAMP" || funcName == "CURRENT_DATE" || funcName == "CURRENT_TIME" } return false } // isIntervalExpression checks if an expression is an interval func (e *SQLEngine) isIntervalExpression(expr ExprNode) bool { _, ok := expr.(*IntervalExpr) return ok } // evaluateExpressionValue evaluates any expression to get its value from a record func (e *SQLEngine) evaluateExpressionValue(expr ExprNode, result HybridScanResult) (*schema_pb.Value, error) { switch exprType := expr.(type) { case *ColName: columnName := exprType.Name.String() upperColumnName := strings.ToUpper(columnName) // Check if this is actually a string literal that was parsed as ColName if (strings.HasPrefix(columnName, "'") && strings.HasSuffix(columnName, "'")) || (strings.HasPrefix(columnName, "\"") && strings.HasSuffix(columnName, "\"")) { // This is a string literal that was incorrectly parsed as a column name literal := strings.Trim(strings.Trim(columnName, "'"), "\"") return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}}, nil } // Check if this is actually a function call that was parsed as ColName if strings.Contains(columnName, "(") && strings.Contains(columnName, ")") { // This is a function call that was parsed incorrectly as a column name // We need to manually evaluate it as a function return e.evaluateColumnNameAsFunction(columnName, result) } // Check if this is a datetime constant if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { switch upperColumnName { case FuncCURRENT_DATE: return e.CurrentDate() case FuncCURRENT_TIME: return e.CurrentTime() case FuncCURRENT_TIMESTAMP: return e.CurrentTimestamp() case FuncNOW: return e.Now() } } // Check if this is actually a numeric literal disguised as a column name if val, err := strconv.ParseInt(columnName, 10, 64); err == nil { return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}}, nil } if val, err := strconv.ParseFloat(columnName, 64); err == nil { return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}}, nil } // Otherwise, treat as a regular column lookup value := e.findColumnValue(result, columnName) if value == nil { return nil, nil } return value, nil case *ArithmeticExpr: return e.evaluateArithmeticExpression(exprType, result) case *SQLVal: // Handle literal values return e.convertSQLValToSchemaValue(exprType), nil case *FuncExpr: // Handle function calls that are part of arithmetic expressions funcName := strings.ToUpper(exprType.Name.String()) // Route to appropriate function evaluator based on function type if e.isDateTimeFunction(funcName) { // Use datetime function evaluator return e.evaluateDateTimeFunction(exprType, result) } else { // Use string function evaluator return e.evaluateStringFunction(exprType, result) } case *IntervalExpr: // Handle interval expressions - evaluate as duration in nanoseconds nanos, err := e.evaluateInterval(exprType.Value) if err != nil { return nil, err } return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: nanos}, }, nil default: return nil, fmt.Errorf("unsupported expression type: %T", expr) } } // convertSQLValToSchemaValue converts SQLVal literal to schema_pb.Value func (e *SQLEngine) convertSQLValToSchemaValue(sqlVal *SQLVal) *schema_pb.Value { switch sqlVal.Type { case IntVal: if val, err := strconv.ParseInt(string(sqlVal.Val), 10, 64); err == nil { return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}} } case FloatVal: if val, err := strconv.ParseFloat(string(sqlVal.Val), 64); err == nil { return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}} } case StrVal: return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}} } // Default to string if parsing fails return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}} } // ConvertToSQLResultWithExpressions converts HybridScanResults to SQL query results with expression evaluation func (e *SQLEngine) ConvertToSQLResultWithExpressions(hms *HybridMessageScanner, results []HybridScanResult, selectExprs []SelectExpr) *QueryResult { if len(results) == 0 { columns := make([]string, 0, len(selectExprs)) for _, selectExpr := range selectExprs { switch expr := selectExpr.(type) { case *AliasedExpr: // Check if alias is available and use it if expr.As != nil && !expr.As.IsEmpty() { columns = append(columns, expr.As.String()) } else { // Fall back to expression-based column naming switch col := expr.Expr.(type) { case *ColName: columnName := col.Name.String() upperColumnName := strings.ToUpper(columnName) // Check if this is an arithmetic expression embedded in a ColName if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { // Use lowercase for datetime constants in column headers columns = append(columns, strings.ToLower(columnName)) } else { // Use display name for system columns displayName := e.getSystemColumnDisplayName(columnName) columns = append(columns, displayName) } case *ArithmeticExpr: columns = append(columns, e.getArithmeticExpressionAlias(col)) case *FuncExpr: columns = append(columns, e.getStringFunctionAlias(col)) case *SQLVal: columns = append(columns, e.getSQLValAlias(col)) default: columns = append(columns, "expr") } } } } return &QueryResult{ Columns: columns, Rows: [][]sqltypes.Value{}, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // Build columns from SELECT expressions columns := make([]string, 0, len(selectExprs)) for _, selectExpr := range selectExprs { switch expr := selectExpr.(type) { case *AliasedExpr: // Check if alias is available and use it if expr.As != nil && !expr.As.IsEmpty() { columns = append(columns, expr.As.String()) } else { // Fall back to expression-based column naming switch col := expr.Expr.(type) { case *ColName: columnName := col.Name.String() upperColumnName := strings.ToUpper(columnName) // Check if this is an arithmetic expression embedded in a ColName if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { // Use lowercase for datetime constants in column headers columns = append(columns, strings.ToLower(columnName)) } else { columns = append(columns, columnName) } case *ArithmeticExpr: columns = append(columns, e.getArithmeticExpressionAlias(col)) case *FuncExpr: columns = append(columns, e.getStringFunctionAlias(col)) case *SQLVal: columns = append(columns, e.getSQLValAlias(col)) default: columns = append(columns, "expr") } } } } // Convert to SQL rows with expression evaluation rows := make([][]sqltypes.Value, len(results)) for i, result := range results { row := make([]sqltypes.Value, len(selectExprs)) for j, selectExpr := range selectExprs { switch expr := selectExpr.(type) { case *AliasedExpr: switch col := expr.Expr.(type) { case *ColName: // Handle regular column, datetime constants, or arithmetic expressions columnName := col.Name.String() upperColumnName := strings.ToUpper(columnName) // Check if this is an arithmetic expression embedded in a ColName if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { // Handle as arithmetic expression if value, err := e.evaluateArithmeticExpression(arithmeticExpr, result); err == nil && value != nil { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } else if upperColumnName == "CURRENT_DATE" || upperColumnName == "CURRENT_TIME" || upperColumnName == "CURRENT_TIMESTAMP" || upperColumnName == "NOW" { // Handle as datetime function var value *schema_pb.Value var err error switch upperColumnName { case FuncCURRENT_DATE: value, err = e.CurrentDate() case FuncCURRENT_TIME: value, err = e.CurrentTime() case FuncCURRENT_TIMESTAMP: value, err = e.CurrentTimestamp() case FuncNOW: value, err = e.Now() } if err == nil && value != nil { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } else { // Handle as regular column if value := e.findColumnValue(result, columnName); value != nil { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } } case *ArithmeticExpr: // Handle arithmetic expression if value, err := e.evaluateArithmeticExpression(col, result); err == nil && value != nil { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } case *FuncExpr: // Handle function - route to appropriate evaluator funcName := strings.ToUpper(col.Name.String()) var value *schema_pb.Value var err error // Check if it's a datetime function if e.isDateTimeFunction(funcName) { value, err = e.evaluateDateTimeFunction(col, result) } else { // Default to string function evaluator value, err = e.evaluateStringFunction(col, result) } if err == nil && value != nil { row[j] = convertSchemaValueToSQL(value) } else { row[j] = sqltypes.NULL } case *SQLVal: // Handle literal value value := e.convertSQLValToSchemaValue(col) row[j] = convertSchemaValueToSQL(value) default: row[j] = sqltypes.NULL } default: row[j] = sqltypes.NULL } } rows[i] = row } return &QueryResult{ Columns: columns, Rows: rows, Database: hms.topic.Namespace, Table: hms.topic.Name, } } // extractBaseColumns recursively extracts base column names from arithmetic expressions func (e *SQLEngine) extractBaseColumns(expr *ArithmeticExpr, baseColumnsSet map[string]bool) { // Extract columns from left operand e.extractBaseColumnsFromExpression(expr.Left, baseColumnsSet) // Extract columns from right operand e.extractBaseColumnsFromExpression(expr.Right, baseColumnsSet) } // extractBaseColumnsFromExpression extracts base column names from any expression node func (e *SQLEngine) extractBaseColumnsFromExpression(expr ExprNode, baseColumnsSet map[string]bool) { switch exprType := expr.(type) { case *ColName: columnName := exprType.Name.String() // Check if it's a literal number disguised as a column name if _, err := strconv.ParseInt(columnName, 10, 64); err != nil { if _, err := strconv.ParseFloat(columnName, 64); err != nil { // Not a numeric literal, treat as actual column name baseColumnsSet[columnName] = true } } case *ArithmeticExpr: // Recursively handle nested arithmetic expressions e.extractBaseColumns(exprType, baseColumnsSet) } } // isAggregationFunction checks if a function name is an aggregation function func (e *SQLEngine) isAggregationFunction(funcName string) bool { // Convert to uppercase for case-insensitive comparison upperFuncName := strings.ToUpper(funcName) switch upperFuncName { case FuncCOUNT, FuncSUM, FuncAVG, FuncMIN, FuncMAX: return true default: return false } } // isStringFunction checks if a function name is a string function func (e *SQLEngine) isStringFunction(funcName string) bool { switch funcName { case FuncUPPER, FuncLOWER, FuncLENGTH, FuncTRIM, FuncBTRIM, FuncLTRIM, FuncRTRIM, FuncSUBSTRING, FuncLEFT, FuncRIGHT, FuncCONCAT: return true default: return false } } // isDateTimeFunction checks if a function name is a datetime function func (e *SQLEngine) isDateTimeFunction(funcName string) bool { switch funcName { case FuncCURRENT_DATE, FuncCURRENT_TIME, FuncCURRENT_TIMESTAMP, FuncNOW, FuncEXTRACT, FuncDATE_TRUNC: return true default: return false } } // getStringFunctionAlias generates an alias for string functions func (e *SQLEngine) getStringFunctionAlias(funcExpr *FuncExpr) string { funcName := funcExpr.Name.String() if len(funcExpr.Exprs) == 1 { if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { if colName, ok := aliasedExpr.Expr.(*ColName); ok { return fmt.Sprintf("%s(%s)", funcName, colName.Name.String()) } } } return fmt.Sprintf("%s(...)", funcName) } // getDateTimeFunctionAlias generates an alias for datetime functions func (e *SQLEngine) getDateTimeFunctionAlias(funcExpr *FuncExpr) string { funcName := funcExpr.Name.String() // Handle zero-argument functions like CURRENT_DATE, NOW if len(funcExpr.Exprs) == 0 { // Use lowercase for datetime constants in column headers return strings.ToLower(funcName) } // Handle EXTRACT function specially to create unique aliases if strings.ToUpper(funcName) == "EXTRACT" && len(funcExpr.Exprs) == 2 { // Try to extract the date part to make the alias unique if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { if sqlVal, ok := aliasedExpr.Expr.(*SQLVal); ok && sqlVal.Type == StrVal { datePart := strings.ToLower(string(sqlVal.Val)) return fmt.Sprintf("extract_%s", datePart) } } // Fallback to generic if we can't extract the date part return fmt.Sprintf("%s(...)", funcName) } // Handle other multi-argument functions like DATE_TRUNC if len(funcExpr.Exprs) == 2 { return fmt.Sprintf("%s(...)", funcName) } return fmt.Sprintf("%s(...)", funcName) } // extractBaseColumnsFromFunction extracts base columns needed by a string function func (e *SQLEngine) extractBaseColumnsFromFunction(funcExpr *FuncExpr, baseColumnsSet map[string]bool) { for _, expr := range funcExpr.Exprs { if aliasedExpr, ok := expr.(*AliasedExpr); ok { e.extractBaseColumnsFromExpression(aliasedExpr.Expr, baseColumnsSet) } } } // getSQLValAlias generates an alias for SQL literal values func (e *SQLEngine) getSQLValAlias(sqlVal *SQLVal) string { switch sqlVal.Type { case StrVal: // Escape single quotes by replacing ' with '' (SQL standard escaping) escapedVal := strings.ReplaceAll(string(sqlVal.Val), "'", "''") return fmt.Sprintf("'%s'", escapedVal) case IntVal: return string(sqlVal.Val) case FloatVal: return string(sqlVal.Val) default: return "literal" } } // evaluateStringFunction evaluates a string function for a given record func (e *SQLEngine) evaluateStringFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) { funcName := strings.ToUpper(funcExpr.Name.String()) // Most string functions require exactly 1 argument if len(funcExpr.Exprs) != 1 { return nil, fmt.Errorf("function %s expects exactly 1 argument", funcName) } // Get the argument value var argValue *schema_pb.Value if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { var err error argValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) if err != nil { return nil, fmt.Errorf("error evaluating function argument: %v", err) } } else { return nil, fmt.Errorf("unsupported function argument type") } if argValue == nil { return nil, nil // NULL input produces NULL output } // Call the appropriate string function switch funcName { case FuncUPPER: return e.Upper(argValue) case FuncLOWER: return e.Lower(argValue) case FuncLENGTH: return e.Length(argValue) case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM return e.Trim(argValue) case FuncLTRIM: return e.LTrim(argValue) case FuncRTRIM: return e.RTrim(argValue) default: return nil, fmt.Errorf("unsupported string function: %s", funcName) } } // evaluateDateTimeFunction evaluates a datetime function for a given record func (e *SQLEngine) evaluateDateTimeFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) { funcName := strings.ToUpper(funcExpr.Name.String()) switch funcName { case FuncEXTRACT: // EXTRACT requires exactly 2 arguments: date part and value if len(funcExpr.Exprs) != 2 { return nil, fmt.Errorf("EXTRACT function expects exactly 2 arguments (date_part, value), got %d", len(funcExpr.Exprs)) } // Get the first argument (date part) var datePartValue *schema_pb.Value if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { var err error datePartValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) if err != nil { return nil, fmt.Errorf("error evaluating EXTRACT date part argument: %v", err) } } else { return nil, fmt.Errorf("unsupported EXTRACT date part argument type") } if datePartValue == nil { return nil, fmt.Errorf("EXTRACT date part cannot be NULL") } // Convert date part to string var datePart string if stringVal, ok := datePartValue.Kind.(*schema_pb.Value_StringValue); ok { datePart = strings.ToUpper(stringVal.StringValue) } else { return nil, fmt.Errorf("EXTRACT date part must be a string") } // Get the second argument (value to extract from) var extractValue *schema_pb.Value if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok { var err error extractValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) if err != nil { return nil, fmt.Errorf("error evaluating EXTRACT value argument: %v", err) } } else { return nil, fmt.Errorf("unsupported EXTRACT value argument type") } if extractValue == nil { return nil, nil // NULL input produces NULL output } // Call the Extract function return e.Extract(DatePart(datePart), extractValue) case FuncDATE_TRUNC: // DATE_TRUNC requires exactly 2 arguments: precision and value if len(funcExpr.Exprs) != 2 { return nil, fmt.Errorf("DATE_TRUNC function expects exactly 2 arguments (precision, value), got %d", len(funcExpr.Exprs)) } // Get the first argument (precision) var precisionValue *schema_pb.Value if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { var err error precisionValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) if err != nil { return nil, fmt.Errorf("error evaluating DATE_TRUNC precision argument: %v", err) } } else { return nil, fmt.Errorf("unsupported DATE_TRUNC precision argument type") } if precisionValue == nil { return nil, fmt.Errorf("DATE_TRUNC precision cannot be NULL") } // Convert precision to string var precision string if stringVal, ok := precisionValue.Kind.(*schema_pb.Value_StringValue); ok { precision = stringVal.StringValue } else { return nil, fmt.Errorf("DATE_TRUNC precision must be a string") } // Get the second argument (value to truncate) var truncateValue *schema_pb.Value if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok { var err error truncateValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) if err != nil { return nil, fmt.Errorf("error evaluating DATE_TRUNC value argument: %v", err) } } else { return nil, fmt.Errorf("unsupported DATE_TRUNC value argument type") } if truncateValue == nil { return nil, nil // NULL input produces NULL output } // Call the DateTrunc function return e.DateTrunc(precision, truncateValue) case FuncCURRENT_DATE: // CURRENT_DATE is a zero-argument function if len(funcExpr.Exprs) != 0 { return nil, fmt.Errorf("CURRENT_DATE function expects no arguments, got %d", len(funcExpr.Exprs)) } return e.CurrentDate() case FuncCURRENT_TIME: // CURRENT_TIME is a zero-argument function if len(funcExpr.Exprs) != 0 { return nil, fmt.Errorf("CURRENT_TIME function expects no arguments, got %d", len(funcExpr.Exprs)) } return e.CurrentTime() case FuncCURRENT_TIMESTAMP: // CURRENT_TIMESTAMP is a zero-argument function if len(funcExpr.Exprs) != 0 { return nil, fmt.Errorf("CURRENT_TIMESTAMP function expects no arguments, got %d", len(funcExpr.Exprs)) } return e.CurrentTimestamp() case FuncNOW: // NOW is a zero-argument function (but often used with () syntax) if len(funcExpr.Exprs) != 0 { return nil, fmt.Errorf("NOW function expects no arguments, got %d", len(funcExpr.Exprs)) } return e.Now() // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(date) default: return nil, fmt.Errorf("unsupported datetime function: %s", funcName) } } // evaluateInterval parses an interval string and returns duration in nanoseconds func (e *SQLEngine) evaluateInterval(intervalValue string) (int64, error) { // Parse interval strings like "1 hour", "30 minutes", "2 days" parts := strings.Fields(strings.TrimSpace(intervalValue)) if len(parts) != 2 { return 0, fmt.Errorf("invalid interval format: %s (expected 'number unit')", intervalValue) } // Parse the numeric value value, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { return 0, fmt.Errorf("invalid interval value: %s", parts[0]) } // Parse the unit and convert to nanoseconds unit := strings.ToLower(parts[1]) var multiplier int64 switch unit { case "nanosecond", "nanoseconds", "ns": multiplier = 1 case "microsecond", "microseconds", "us": multiplier = 1000 case "millisecond", "milliseconds", "ms": multiplier = 1000000 case "second", "seconds", "s": multiplier = 1000000000 case "minute", "minutes", "m": multiplier = 60 * 1000000000 case "hour", "hours", "h": multiplier = 60 * 60 * 1000000000 case "day", "days", "d": multiplier = 24 * 60 * 60 * 1000000000 case "week", "weeks", "w": multiplier = 7 * 24 * 60 * 60 * 1000000000 default: return 0, fmt.Errorf("unsupported interval unit: %s", unit) } return value * multiplier, nil } // convertValueForTimestampColumn converts string timestamp values to nanoseconds for system timestamp columns func (e *SQLEngine) convertValueForTimestampColumn(columnName string, value interface{}, expr ExprNode) interface{} { // Special handling for timestamp system columns if columnName == SW_COLUMN_NAME_TIMESTAMP { if _, ok := value.(string); ok { if timeNanos := e.extractTimeValue(expr); timeNanos != 0 { return timeNanos } } } return value } // evaluateTimestampArithmetic performs arithmetic operations with timestamps and intervals func (e *SQLEngine) evaluateTimestampArithmetic(left, right ExprNode, operator string) (*schema_pb.Value, error) { // Handle timestamp arithmetic: NOW() - INTERVAL '1 hour' // For timestamp arithmetic, we don't need the result context, so we pass an empty one emptyResult := HybridScanResult{} leftValue, err := e.evaluateExpressionValue(left, emptyResult) if err != nil { return nil, fmt.Errorf("failed to evaluate left operand: %v", err) } rightValue, err := e.evaluateExpressionValue(right, emptyResult) if err != nil { return nil, fmt.Errorf("failed to evaluate right operand: %v", err) } // Convert left operand (should be timestamp) var leftTimestamp int64 if leftValue.Kind != nil { switch leftKind := leftValue.Kind.(type) { case *schema_pb.Value_Int64Value: leftTimestamp = leftKind.Int64Value case *schema_pb.Value_TimestampValue: // Convert microseconds to nanoseconds leftTimestamp = leftKind.TimestampValue.TimestampMicros * 1000 case *schema_pb.Value_StringValue: // Parse timestamp string if ts, err := time.Parse(time.RFC3339, leftKind.StringValue); err == nil { leftTimestamp = ts.UnixNano() } else if ts, err := time.Parse("2006-01-02 15:04:05", leftKind.StringValue); err == nil { leftTimestamp = ts.UnixNano() } else { return nil, fmt.Errorf("invalid timestamp format: %s", leftKind.StringValue) } default: return nil, fmt.Errorf("left operand must be a timestamp, got: %T", leftKind) } } else { return nil, fmt.Errorf("left operand value is nil") } // Convert right operand (should be interval in nanoseconds) var intervalNanos int64 if rightValue.Kind != nil { switch rightKind := rightValue.Kind.(type) { case *schema_pb.Value_Int64Value: intervalNanos = rightKind.Int64Value default: return nil, fmt.Errorf("right operand must be an interval duration") } } else { return nil, fmt.Errorf("right operand value is nil") } // Perform arithmetic var resultTimestamp int64 switch operator { case "+": resultTimestamp = leftTimestamp + intervalNanos case "-": resultTimestamp = leftTimestamp - intervalNanos default: return nil, fmt.Errorf("unsupported timestamp arithmetic operator: %s", operator) } // Return as timestamp return &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: resultTimestamp}, }, nil } // evaluateColumnNameAsFunction handles function calls that were incorrectly parsed as column names func (e *SQLEngine) evaluateColumnNameAsFunction(columnName string, result HybridScanResult) (*schema_pb.Value, error) { // Simple parser for basic function calls like TRIM('hello world') // Extract function name and argument parenPos := strings.Index(columnName, "(") if parenPos == -1 { return nil, fmt.Errorf("invalid function format: %s", columnName) } funcName := strings.ToUpper(strings.TrimSpace(columnName[:parenPos])) argsString := columnName[parenPos+1:] // Find the closing parenthesis (handling nested quotes) closeParen := strings.LastIndex(argsString, ")") if closeParen == -1 { return nil, fmt.Errorf("missing closing parenthesis in function: %s", columnName) } argString := strings.TrimSpace(argsString[:closeParen]) // Parse the argument - for now handle simple cases var argValue *schema_pb.Value var err error if strings.HasPrefix(argString, "'") && strings.HasSuffix(argString, "'") { // String literal argument literal := strings.Trim(argString, "'") argValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}} } else if strings.Contains(argString, "(") && strings.Contains(argString, ")") { // Nested function call - recursively evaluate it argValue, err = e.evaluateColumnNameAsFunction(argString, result) if err != nil { return nil, fmt.Errorf("error evaluating nested function argument: %v", err) } } else { // Column name or other expression return nil, fmt.Errorf("unsupported argument type in function: %s", argString) } if argValue == nil { return nil, nil } // Call the appropriate function switch funcName { case FuncUPPER: return e.Upper(argValue) case FuncLOWER: return e.Lower(argValue) case FuncLENGTH: return e.Length(argValue) case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM return e.Trim(argValue) case FuncLTRIM: return e.LTrim(argValue) case FuncRTRIM: return e.RTrim(argValue) // PostgreSQL-only: Use EXTRACT(YEAR FROM date) instead of YEAR(date) default: return nil, fmt.Errorf("unsupported function in column name: %s", funcName) } } // parseColumnLevelCalculation detects and parses arithmetic expressions that contain function calls // This handles cases where the SQL parser incorrectly treats "LENGTH('hello') + 10" as a single ColName func (e *SQLEngine) parseColumnLevelCalculation(expression string) *ArithmeticExpr { // First check if this looks like an arithmetic expression if !e.containsArithmeticOperator(expression) { return nil } // Build AST for the arithmetic expression return e.buildArithmeticAST(expression) } // containsArithmeticOperator checks if the expression contains arithmetic operators outside of function calls func (e *SQLEngine) containsArithmeticOperator(expr string) bool { operators := []string{"+", "-", "*", "/", "%", "||"} parenLevel := 0 quoteLevel := false for i, char := range expr { switch char { case '(': if !quoteLevel { parenLevel++ } case ')': if !quoteLevel { parenLevel-- } case '\'': quoteLevel = !quoteLevel default: // Only check for operators outside of parentheses and quotes if parenLevel == 0 && !quoteLevel { for _, op := range operators { if strings.HasPrefix(expr[i:], op) { return true } } } } } return false } // buildArithmeticAST builds an Abstract Syntax Tree for arithmetic expressions containing function calls func (e *SQLEngine) buildArithmeticAST(expr string) *ArithmeticExpr { // Remove leading/trailing spaces expr = strings.TrimSpace(expr) // Find the main operator (outside of parentheses) operators := []string{"||", "+", "-", "*", "/", "%"} // Order matters for precedence for _, op := range operators { opPos := e.findMainOperator(expr, op) if opPos != -1 { leftExpr := strings.TrimSpace(expr[:opPos]) rightExpr := strings.TrimSpace(expr[opPos+len(op):]) if leftExpr != "" && rightExpr != "" { return &ArithmeticExpr{ Left: e.parseASTExpressionNode(leftExpr), Right: e.parseASTExpressionNode(rightExpr), Operator: op, } } } } return nil } // findMainOperator finds the position of an operator that's not inside parentheses or quotes func (e *SQLEngine) findMainOperator(expr string, operator string) int { parenLevel := 0 quoteLevel := false for i := 0; i <= len(expr)-len(operator); i++ { char := expr[i] switch char { case '(': if !quoteLevel { parenLevel++ } case ')': if !quoteLevel { parenLevel-- } case '\'': quoteLevel = !quoteLevel default: // Check for operator only at top level (not inside parentheses or quotes) if parenLevel == 0 && !quoteLevel && strings.HasPrefix(expr[i:], operator) { return i } } } return -1 } // parseASTExpressionNode parses an expression into the appropriate ExprNode type func (e *SQLEngine) parseASTExpressionNode(expr string) ExprNode { expr = strings.TrimSpace(expr) // Check if it's a function call (contains parentheses) if strings.Contains(expr, "(") && strings.Contains(expr, ")") { // This should be parsed as a function expression, but since our SQL parser // has limitations, we'll create a special ColName that represents the function return &ColName{Name: stringValue(expr)} } // Check if it's a numeric literal if _, err := strconv.ParseInt(expr, 10, 64); err == nil { return &SQLVal{Type: IntVal, Val: []byte(expr)} } if _, err := strconv.ParseFloat(expr, 64); err == nil { return &SQLVal{Type: FloatVal, Val: []byte(expr)} } // Check if it's a string literal if strings.HasPrefix(expr, "'") && strings.HasSuffix(expr, "'") { return &SQLVal{Type: StrVal, Val: []byte(strings.Trim(expr, "'"))} } // Check for nested arithmetic expressions if nestedArithmetic := e.buildArithmeticAST(expr); nestedArithmetic != nil { return nestedArithmetic } // Default to column name return &ColName{Name: stringValue(expr)} }