|
|
|
@ -2175,361 +2175,6 @@ func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, h |
|
|
|
return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil |
|
|
|
} |
|
|
|
|
|
|
|
// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture
|
|
|
|
// This is used by EXPLAIN queries to capture complete data source information including broker memory
|
|
|
|
func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { |
|
|
|
// Parse FROM clause to get table (topic) information
|
|
|
|
if len(stmt.From) != 1 { |
|
|
|
err := fmt.Errorf("SELECT supports single table queries only") |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
|
|
|
|
// Extract table reference
|
|
|
|
var database, tableName string |
|
|
|
switch table := stmt.From[0].(type) { |
|
|
|
case *AliasedTableExpr: |
|
|
|
switch tableExpr := table.Expr.(type) { |
|
|
|
case TableName: |
|
|
|
tableName = tableExpr.Name.String() |
|
|
|
if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { |
|
|
|
database = tableExpr.Qualifier.String() |
|
|
|
} |
|
|
|
default: |
|
|
|
err := fmt.Errorf("unsupported table expression: %T", tableExpr) |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
default: |
|
|
|
err := fmt.Errorf("unsupported FROM clause: %T", table) |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
|
|
|
|
// Use current database context if not specified
|
|
|
|
if database == "" { |
|
|
|
database = e.catalog.GetCurrentDatabase() |
|
|
|
if database == "" { |
|
|
|
database = "default" |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Auto-discover and register topic if not already in catalog
|
|
|
|
if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { |
|
|
|
// Topic not in catalog, try to discover and register it
|
|
|
|
if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { |
|
|
|
// Return error immediately for non-existent topics instead of falling back to sample data
|
|
|
|
return &QueryResult{Error: regErr}, regErr |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
|
|
|
|
// Get filerClient from broker connection (works with both real and mock brokers)
|
|
|
|
var filerClient filer_pb.FilerClient |
|
|
|
var filerClientErr error |
|
|
|
filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() |
|
|
|
if filerClientErr != nil { |
|
|
|
// Return error if filer client is not available for topic access
|
|
|
|
return &QueryResult{Error: filerClientErr}, filerClientErr |
|
|
|
} |
|
|
|
|
|
|
|
hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) |
|
|
|
if err != nil { |
|
|
|
// Handle quiet topics gracefully: topics exist but have no active schema/brokers
|
|
|
|
if IsNoSchemaError(err) { |
|
|
|
// Return empty result for quiet topics (normal in production environments)
|
|
|
|
return &QueryResult{ |
|
|
|
Columns: []string{}, |
|
|
|
Rows: [][]sqltypes.Value{}, |
|
|
|
Database: database, |
|
|
|
Table: tableName, |
|
|
|
}, nil |
|
|
|
} |
|
|
|
// Return error for other access issues (truly non-existent topics, etc.)
|
|
|
|
topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) |
|
|
|
return &QueryResult{Error: topicErr}, topicErr |
|
|
|
} |
|
|
|
|
|
|
|
// Parse SELECT columns and detect aggregation functions
|
|
|
|
var columns []string |
|
|
|
var aggregations []AggregationSpec |
|
|
|
selectAll := false |
|
|
|
hasAggregations := false |
|
|
|
_ = hasAggregations // Used later in aggregation routing
|
|
|
|
// Track required base columns for arithmetic expressions
|
|
|
|
baseColumnsSet := make(map[string]bool) |
|
|
|
|
|
|
|
for _, selectExpr := range stmt.SelectExprs { |
|
|
|
switch expr := selectExpr.(type) { |
|
|
|
case *StarExpr: |
|
|
|
selectAll = true |
|
|
|
case *AliasedExpr: |
|
|
|
switch col := expr.Expr.(type) { |
|
|
|
case *ColName: |
|
|
|
colName := col.Name.String() |
|
|
|
columns = append(columns, colName) |
|
|
|
baseColumnsSet[colName] = true |
|
|
|
case *ArithmeticExpr: |
|
|
|
// Handle arithmetic expressions like id+user_id and string concatenation like name||suffix
|
|
|
|
columns = append(columns, e.getArithmeticExpressionAlias(col)) |
|
|
|
// Extract base columns needed for this arithmetic expression
|
|
|
|
e.extractBaseColumns(col, baseColumnsSet) |
|
|
|
case *SQLVal: |
|
|
|
// Handle string/numeric literals like 'good', 123, etc.
|
|
|
|
columns = append(columns, e.getSQLValAlias(col)) |
|
|
|
case *FuncExpr: |
|
|
|
// Distinguish between aggregation functions and string functions
|
|
|
|
funcName := strings.ToUpper(col.Name.String()) |
|
|
|
if e.isAggregationFunction(funcName) { |
|
|
|
// Handle aggregation functions
|
|
|
|
aggSpec, err := e.parseAggregationFunction(col, expr) |
|
|
|
if err != nil { |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
aggregations = append(aggregations, *aggSpec) |
|
|
|
hasAggregations = true |
|
|
|
} else if e.isStringFunction(funcName) { |
|
|
|
// Handle string functions like UPPER, LENGTH, etc.
|
|
|
|
columns = append(columns, e.getStringFunctionAlias(col)) |
|
|
|
// Extract base columns needed for this string function
|
|
|
|
e.extractBaseColumnsFromFunction(col, baseColumnsSet) |
|
|
|
} else if e.isDateTimeFunction(funcName) { |
|
|
|
// Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC
|
|
|
|
columns = append(columns, e.getDateTimeFunctionAlias(col)) |
|
|
|
// Extract base columns needed for this datetime function
|
|
|
|
e.extractBaseColumnsFromFunction(col, baseColumnsSet) |
|
|
|
} else { |
|
|
|
return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) |
|
|
|
} |
|
|
|
default: |
|
|
|
err := fmt.Errorf("unsupported SELECT expression: %T", col) |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
default: |
|
|
|
err := fmt.Errorf("unsupported SELECT expression: %T", expr) |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If we have aggregations, use aggregation query path
|
|
|
|
if hasAggregations { |
|
|
|
return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) |
|
|
|
} |
|
|
|
|
|
|
|
// Parse WHERE clause for predicate pushdown
|
|
|
|
var predicate func(*schema_pb.RecordValue) bool |
|
|
|
if stmt.Where != nil { |
|
|
|
predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) |
|
|
|
if err != nil { |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Parse LIMIT and OFFSET clauses
|
|
|
|
// Use -1 to distinguish "no LIMIT" from "LIMIT 0"
|
|
|
|
limit := -1 |
|
|
|
offset := 0 |
|
|
|
if stmt.Limit != nil && stmt.Limit.Rowcount != nil { |
|
|
|
switch limitExpr := stmt.Limit.Rowcount.(type) { |
|
|
|
case *SQLVal: |
|
|
|
if limitExpr.Type == IntVal { |
|
|
|
var parseErr error |
|
|
|
limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) |
|
|
|
if parseErr != nil { |
|
|
|
return &QueryResult{Error: parseErr}, parseErr |
|
|
|
} |
|
|
|
if limit64 > math.MaxInt32 || limit64 < 0 { |
|
|
|
return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) |
|
|
|
} |
|
|
|
limit = int(limit64) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Parse OFFSET clause if present
|
|
|
|
if stmt.Limit != nil && stmt.Limit.Offset != nil { |
|
|
|
switch offsetExpr := stmt.Limit.Offset.(type) { |
|
|
|
case *SQLVal: |
|
|
|
if offsetExpr.Type == IntVal { |
|
|
|
var parseErr error |
|
|
|
offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) |
|
|
|
if parseErr != nil { |
|
|
|
return &QueryResult{Error: parseErr}, parseErr |
|
|
|
} |
|
|
|
if offset64 > math.MaxInt32 || offset64 < 0 { |
|
|
|
return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) |
|
|
|
} |
|
|
|
offset = int(offset64) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Build hybrid scan options
|
|
|
|
// Extract time filters from WHERE clause to optimize scanning
|
|
|
|
startTimeNs, stopTimeNs := int64(0), int64(0) |
|
|
|
if stmt.Where != nil { |
|
|
|
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) |
|
|
|
} |
|
|
|
|
|
|
|
hybridScanOptions := HybridScanOptions{ |
|
|
|
StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
|
|
|
|
StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
|
|
|
|
Limit: limit, |
|
|
|
Offset: offset, |
|
|
|
Predicate: predicate, |
|
|
|
} |
|
|
|
|
|
|
|
if !selectAll { |
|
|
|
// Convert baseColumnsSet to slice for hybrid scan options
|
|
|
|
baseColumns := make([]string, 0, len(baseColumnsSet)) |
|
|
|
for columnName := range baseColumnsSet { |
|
|
|
baseColumns = append(baseColumns, columnName) |
|
|
|
} |
|
|
|
// Use base columns (not expression aliases) for data retrieval
|
|
|
|
if len(baseColumns) > 0 { |
|
|
|
hybridScanOptions.Columns = baseColumns |
|
|
|
} else { |
|
|
|
// If no base columns found (shouldn't happen), use original columns
|
|
|
|
hybridScanOptions.Columns = columns |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Execute the hybrid scan with stats capture for EXPLAIN
|
|
|
|
var results []HybridScanResult |
|
|
|
if plan != nil { |
|
|
|
// EXPLAIN mode - capture broker buffer stats
|
|
|
|
var stats *HybridScanStats |
|
|
|
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) |
|
|
|
if err != nil { |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
|
|
|
|
// Populate plan with broker buffer information
|
|
|
|
if stats != nil { |
|
|
|
plan.BrokerBufferQueried = stats.BrokerBufferQueried |
|
|
|
plan.BrokerBufferMessages = stats.BrokerBufferMessages |
|
|
|
plan.BufferStartIndex = stats.BufferStartIndex |
|
|
|
|
|
|
|
// Add broker_buffer to data sources if buffer was queried
|
|
|
|
if stats.BrokerBufferQueried { |
|
|
|
// Check if broker_buffer is already in data sources
|
|
|
|
hasBrokerBuffer := false |
|
|
|
for _, source := range plan.DataSources { |
|
|
|
if source == "broker_buffer" { |
|
|
|
hasBrokerBuffer = true |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
if !hasBrokerBuffer { |
|
|
|
plan.DataSources = append(plan.DataSources, "broker_buffer") |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Populate execution plan details with source file information for Data Sources Tree
|
|
|
|
if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { |
|
|
|
// Add partition paths to execution plan details
|
|
|
|
plan.Details["partition_paths"] = partitions |
|
|
|
// Persist time filter details for downstream pruning/diagnostics
|
|
|
|
plan.Details[PlanDetailStartTimeNs] = startTimeNs |
|
|
|
plan.Details[PlanDetailStopTimeNs] = stopTimeNs |
|
|
|
|
|
|
|
// Collect actual file information for each partition
|
|
|
|
var parquetFiles []string |
|
|
|
var liveLogFiles []string |
|
|
|
parquetSources := make(map[string]bool) |
|
|
|
|
|
|
|
var parquetReadErrors []string |
|
|
|
var liveLogListErrors []string |
|
|
|
for _, partitionPath := range partitions { |
|
|
|
// Get parquet files for this partition
|
|
|
|
if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { |
|
|
|
// Prune files by time range with debug logging
|
|
|
|
filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) |
|
|
|
|
|
|
|
// Further prune by column statistics from WHERE clause
|
|
|
|
if stmt.Where != nil { |
|
|
|
beforeColumnPrune := len(filteredStats) |
|
|
|
filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) |
|
|
|
columnPrunedCount := beforeColumnPrune - len(filteredStats) |
|
|
|
|
|
|
|
if columnPrunedCount > 0 { |
|
|
|
// Track column statistics optimization
|
|
|
|
if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { |
|
|
|
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
for _, stats := range filteredStats { |
|
|
|
parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) |
|
|
|
} |
|
|
|
} else { |
|
|
|
parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) |
|
|
|
} |
|
|
|
|
|
|
|
// Merge accurate parquet sources from metadata
|
|
|
|
if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { |
|
|
|
for src := range sources { |
|
|
|
parquetSources[src] = true |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Get live log files for this partition
|
|
|
|
if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { |
|
|
|
for _, fileName := range liveFiles { |
|
|
|
// Exclude live log files that have been converted to parquet (deduplicated)
|
|
|
|
if parquetSources[fileName] { |
|
|
|
continue |
|
|
|
} |
|
|
|
liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) |
|
|
|
} |
|
|
|
} else { |
|
|
|
liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if len(parquetFiles) > 0 { |
|
|
|
plan.Details["parquet_files"] = parquetFiles |
|
|
|
} |
|
|
|
if len(liveLogFiles) > 0 { |
|
|
|
plan.Details["live_log_files"] = liveLogFiles |
|
|
|
} |
|
|
|
if len(parquetReadErrors) > 0 { |
|
|
|
plan.Details["error_parquet_statistics"] = parquetReadErrors |
|
|
|
} |
|
|
|
if len(liveLogListErrors) > 0 { |
|
|
|
plan.Details["error_live_log_listing"] = liveLogListErrors |
|
|
|
} |
|
|
|
|
|
|
|
// Update scan statistics for execution plan display
|
|
|
|
plan.PartitionsScanned = len(partitions) |
|
|
|
plan.ParquetFilesScanned = len(parquetFiles) |
|
|
|
plan.LiveLogFilesScanned = len(liveLogFiles) |
|
|
|
} else { |
|
|
|
// Handle partition discovery error
|
|
|
|
plan.Details["error_partition_discovery"] = discoverErr.Error() |
|
|
|
} |
|
|
|
} else { |
|
|
|
// Normal mode - just get results
|
|
|
|
results, err = hybridScanner.Scan(ctx, hybridScanOptions) |
|
|
|
if err != nil { |
|
|
|
return &QueryResult{Error: err}, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Convert to SQL result format
|
|
|
|
if selectAll { |
|
|
|
if len(columns) > 0 { |
|
|
|
// SELECT *, specific_columns - include both auto-discovered and explicit columns
|
|
|
|
return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil |
|
|
|
} else { |
|
|
|
// SELECT * only - let converter determine all columns (excludes system columns)
|
|
|
|
columns = nil |
|
|
|
return hybridScanner.ConvertToSQLResult(results, columns), nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Handle custom column expressions (including arithmetic)
|
|
|
|
return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil |
|
|
|
} |
|
|
|
|
|
|
|
// extractTimeFilters extracts time range filters from WHERE clause for optimization
|
|
|
|
// This allows push-down of time-based queries to improve scan performance
|
|
|
|
// Returns (startTimeNs, stopTimeNs) where 0 means unbounded
|
|
|
|
|