|  |  | @ -1491,221 +1491,6 @@ func (e *SQLEngine) dropTable(ctx context.Context, stmt *sqlparser.DDL) (*QueryR | 
			
		
	
		
			
				
					|  |  |  | 	return result, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // AggregationComputer handles the computation of aggregations using fast path
 | 
			
		
	
		
			
				
					|  |  |  | type AggregationComputer struct { | 
			
		
	
		
			
				
					|  |  |  | 	engine *SQLEngine | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // NewAggregationComputer creates a new aggregation computer
 | 
			
		
	
		
			
				
					|  |  |  | func NewAggregationComputer(engine *SQLEngine) *AggregationComputer { | 
			
		
	
		
			
				
					|  |  |  | 	return &AggregationComputer{engine: engine} | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
 | 
			
		
	
		
			
				
					|  |  |  | func (comp *AggregationComputer) ComputeFastPathAggregations( | 
			
		
	
		
			
				
					|  |  |  | 	ctx context.Context, | 
			
		
	
		
			
				
					|  |  |  | 	aggregations []AggregationSpec, | 
			
		
	
		
			
				
					|  |  |  | 	dataSources *TopicDataSources, | 
			
		
	
		
			
				
					|  |  |  | 	partitions []string, | 
			
		
	
		
			
				
					|  |  |  | ) ([]AggregationResult, error) { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	aggResults := make([]AggregationResult, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for i, spec := range aggregations { | 
			
		
	
		
			
				
					|  |  |  | 		switch spec.Function { | 
			
		
	
		
			
				
					|  |  |  | 		case "COUNT": | 
			
		
	
		
			
				
					|  |  |  | 			if spec.Column == "*" { | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount | 
			
		
	
		
			
				
					|  |  |  | 			} else { | 
			
		
	
		
			
				
					|  |  |  | 				// For specific columns, we might need to account for NULLs in the future
 | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "MIN": | 
			
		
	
		
			
				
					|  |  |  | 			globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions) | 
			
		
	
		
			
				
					|  |  |  | 			if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				return nil, AggregationError{ | 
			
		
	
		
			
				
					|  |  |  | 					Operation: spec.Function, | 
			
		
	
		
			
				
					|  |  |  | 					Column:    spec.Column, | 
			
		
	
		
			
				
					|  |  |  | 					Cause:     err, | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			aggResults[i].Min = globalMin | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "MAX": | 
			
		
	
		
			
				
					|  |  |  | 			globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions) | 
			
		
	
		
			
				
					|  |  |  | 			if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				return nil, AggregationError{ | 
			
		
	
		
			
				
					|  |  |  | 					Operation: spec.Function, | 
			
		
	
		
			
				
					|  |  |  | 					Column:    spec.Column, | 
			
		
	
		
			
				
					|  |  |  | 					Cause:     err, | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			aggResults[i].Max = globalMax | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		default: | 
			
		
	
		
			
				
					|  |  |  | 			return nil, OptimizationError{ | 
			
		
	
		
			
				
					|  |  |  | 				Strategy: "fast_path_aggregation", | 
			
		
	
		
			
				
					|  |  |  | 				Reason:   fmt.Sprintf("unsupported aggregation function: %s", spec.Function), | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return aggResults, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // computeGlobalMin computes the global minimum value across all data sources
 | 
			
		
	
		
			
				
					|  |  |  | func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { | 
			
		
	
		
			
				
					|  |  |  | 	var globalMin interface{} | 
			
		
	
		
			
				
					|  |  |  | 	var globalMinValue *schema_pb.Value | 
			
		
	
		
			
				
					|  |  |  | 	hasParquetStats := false | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 1: Get minimum from parquet statistics
 | 
			
		
	
		
			
				
					|  |  |  | 	for _, fileStats := range dataSources.ParquetFiles { | 
			
		
	
		
			
				
					|  |  |  | 		for _, fileStat := range fileStats { | 
			
		
	
		
			
				
					|  |  |  | 			// Try case-insensitive column lookup
 | 
			
		
	
		
			
				
					|  |  |  | 			var colStats *ParquetColumnStats | 
			
		
	
		
			
				
					|  |  |  | 			var found bool | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			// First try exact match
 | 
			
		
	
		
			
				
					|  |  |  | 			if stats, exists := fileStat.ColumnStats[spec.Column]; exists { | 
			
		
	
		
			
				
					|  |  |  | 				colStats = stats | 
			
		
	
		
			
				
					|  |  |  | 				found = true | 
			
		
	
		
			
				
					|  |  |  | 			} else { | 
			
		
	
		
			
				
					|  |  |  | 				// Try case-insensitive lookup
 | 
			
		
	
		
			
				
					|  |  |  | 				for colName, stats := range fileStat.ColumnStats { | 
			
		
	
		
			
				
					|  |  |  | 					if strings.EqualFold(colName, spec.Column) { | 
			
		
	
		
			
				
					|  |  |  | 						colStats = stats | 
			
		
	
		
			
				
					|  |  |  | 						found = true | 
			
		
	
		
			
				
					|  |  |  | 						break | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			if found && colStats != nil && colStats.MinValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 				if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { | 
			
		
	
		
			
				
					|  |  |  | 					globalMinValue = colStats.MinValue | 
			
		
	
		
			
				
					|  |  |  | 					extractedValue := comp.engine.extractRawValue(colStats.MinValue) | 
			
		
	
		
			
				
					|  |  |  | 					if extractedValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 						globalMin = extractedValue | 
			
		
	
		
			
				
					|  |  |  | 						hasParquetStats = true | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
 | 
			
		
	
		
			
				
					|  |  |  | 	if dataSources.LiveLogRowCount > 0 { | 
			
		
	
		
			
				
					|  |  |  | 		for _, partition := range partitions { | 
			
		
	
		
			
				
					|  |  |  | 			partitionParquetSources := make(map[string]bool) | 
			
		
	
		
			
				
					|  |  |  | 			if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { | 
			
		
	
		
			
				
					|  |  |  | 				partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) | 
			
		
	
		
			
				
					|  |  |  | 			if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				continue // Skip partitions with errors
 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			if liveLogMin != nil { | 
			
		
	
		
			
				
					|  |  |  | 				if globalMin == nil { | 
			
		
	
		
			
				
					|  |  |  | 					globalMin = liveLogMin | 
			
		
	
		
			
				
					|  |  |  | 				} else { | 
			
		
	
		
			
				
					|  |  |  | 					liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) | 
			
		
	
		
			
				
					|  |  |  | 					if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { | 
			
		
	
		
			
				
					|  |  |  | 						globalMin = liveLogMin | 
			
		
	
		
			
				
					|  |  |  | 						globalMinValue = liveLogSchemaValue | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 3: Handle system columns if no regular data found
 | 
			
		
	
		
			
				
					|  |  |  | 	if globalMin == nil && !hasParquetStats { | 
			
		
	
		
			
				
					|  |  |  | 		globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return globalMin, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // computeGlobalMax computes the global maximum value across all data sources
 | 
			
		
	
		
			
				
					|  |  |  | func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { | 
			
		
	
		
			
				
					|  |  |  | 	var globalMax interface{} | 
			
		
	
		
			
				
					|  |  |  | 	var globalMaxValue *schema_pb.Value | 
			
		
	
		
			
				
					|  |  |  | 	hasParquetStats := false | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 1: Get maximum from parquet statistics
 | 
			
		
	
		
			
				
					|  |  |  | 	for _, fileStats := range dataSources.ParquetFiles { | 
			
		
	
		
			
				
					|  |  |  | 		for _, fileStat := range fileStats { | 
			
		
	
		
			
				
					|  |  |  | 			// Try case-insensitive column lookup
 | 
			
		
	
		
			
				
					|  |  |  | 			var colStats *ParquetColumnStats | 
			
		
	
		
			
				
					|  |  |  | 			var found bool | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			// First try exact match
 | 
			
		
	
		
			
				
					|  |  |  | 			if stats, exists := fileStat.ColumnStats[spec.Column]; exists { | 
			
		
	
		
			
				
					|  |  |  | 				colStats = stats | 
			
		
	
		
			
				
					|  |  |  | 				found = true | 
			
		
	
		
			
				
					|  |  |  | 			} else { | 
			
		
	
		
			
				
					|  |  |  | 				// Try case-insensitive lookup
 | 
			
		
	
		
			
				
					|  |  |  | 				for colName, stats := range fileStat.ColumnStats { | 
			
		
	
		
			
				
					|  |  |  | 					if strings.EqualFold(colName, spec.Column) { | 
			
		
	
		
			
				
					|  |  |  | 						colStats = stats | 
			
		
	
		
			
				
					|  |  |  | 						found = true | 
			
		
	
		
			
				
					|  |  |  | 						break | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			if found && colStats != nil && colStats.MaxValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 				if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { | 
			
		
	
		
			
				
					|  |  |  | 					globalMaxValue = colStats.MaxValue | 
			
		
	
		
			
				
					|  |  |  | 					extractedValue := comp.engine.extractRawValue(colStats.MaxValue) | 
			
		
	
		
			
				
					|  |  |  | 					if extractedValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 						globalMax = extractedValue | 
			
		
	
		
			
				
					|  |  |  | 						hasParquetStats = true | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 2: Get maximum from live log data (only if live logs exist)
 | 
			
		
	
		
			
				
					|  |  |  | 	if dataSources.LiveLogRowCount > 0 { | 
			
		
	
		
			
				
					|  |  |  | 		for _, partition := range partitions { | 
			
		
	
		
			
				
					|  |  |  | 			partitionParquetSources := make(map[string]bool) | 
			
		
	
		
			
				
					|  |  |  | 			if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { | 
			
		
	
		
			
				
					|  |  |  | 				partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			_, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) | 
			
		
	
		
			
				
					|  |  |  | 			if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 				continue // Skip partitions with errors
 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 			if liveLogMax != nil { | 
			
		
	
		
			
				
					|  |  |  | 				if globalMax == nil { | 
			
		
	
		
			
				
					|  |  |  | 					globalMax = liveLogMax | 
			
		
	
		
			
				
					|  |  |  | 				} else { | 
			
		
	
		
			
				
					|  |  |  | 					liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) | 
			
		
	
		
			
				
					|  |  |  | 					if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { | 
			
		
	
		
			
				
					|  |  |  | 						globalMax = liveLogMax | 
			
		
	
		
			
				
					|  |  |  | 						globalMaxValue = liveLogSchemaValue | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 3: Handle system columns if no regular data found
 | 
			
		
	
		
			
				
					|  |  |  | 	if globalMax == nil && !hasParquetStats { | 
			
		
	
		
			
				
					|  |  |  | 		globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return globalMax, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // ExecutionPlanBuilder handles building execution plans for queries
 | 
			
		
	
		
			
				
					|  |  |  | type ExecutionPlanBuilder struct { | 
			
		
	
		
			
				
					|  |  |  | 	engine *SQLEngine | 
			
		
	
	
		
			
				
					|  |  | @ -1879,350 +1664,6 @@ func (e *SQLEngine) parseAggregationFunction(funcExpr *sqlparser.FuncExpr, alias | 
			
		
	
		
			
				
					|  |  |  | 	return spec, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // executeAggregationQuery handles SELECT queries with aggregation functions
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *sqlparser.Select) (*QueryResult, error) { | 
			
		
	
		
			
				
					|  |  |  | 	// Parse WHERE clause for filtering
 | 
			
		
	
		
			
				
					|  |  |  | 	var predicate func(*schema_pb.RecordValue) bool | 
			
		
	
		
			
				
					|  |  |  | 	var err error | 
			
		
	
		
			
				
					|  |  |  | 	if stmt.Where != nil { | 
			
		
	
		
			
				
					|  |  |  | 		predicate, err = e.buildPredicate(stmt.Where.Expr) | 
			
		
	
		
			
				
					|  |  |  | 		if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return &QueryResult{Error: err}, err | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Extract time filters for optimization
 | 
			
		
	
		
			
				
					|  |  |  | 	startTimeNs, stopTimeNs := int64(0), int64(0) | 
			
		
	
		
			
				
					|  |  |  | 	if stmt.Where != nil { | 
			
		
	
		
			
				
					|  |  |  | 		startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// FAST PATH: Try to use parquet statistics for optimization
 | 
			
		
	
		
			
				
					|  |  |  | 	// This can be ~130x faster than scanning all data
 | 
			
		
	
		
			
				
					|  |  |  | 	if stmt.Where == nil { // Only optimize when no complex WHERE clause
 | 
			
		
	
		
			
				
					|  |  |  | 		fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) | 
			
		
	
		
			
				
					|  |  |  | 		if canOptimize { | 
			
		
	
		
			
				
					|  |  |  | 			fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") | 
			
		
	
		
			
				
					|  |  |  | 			return fastResult, nil | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// SLOW PATH: Fall back to full table scan
 | 
			
		
	
		
			
				
					|  |  |  | 	fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Build scan options for full table scan (aggregations need all data)
 | 
			
		
	
		
			
				
					|  |  |  | 	hybridScanOptions := HybridScanOptions{ | 
			
		
	
		
			
				
					|  |  |  | 		StartTimeNs: startTimeNs, | 
			
		
	
		
			
				
					|  |  |  | 		StopTimeNs:  stopTimeNs, | 
			
		
	
		
			
				
					|  |  |  | 		Limit:       0, // No limit for aggregations - need all data
 | 
			
		
	
		
			
				
					|  |  |  | 		Predicate:   predicate, | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Execute the hybrid scan to get all matching records
 | 
			
		
	
		
			
				
					|  |  |  | 	results, err := hybridScanner.Scan(ctx, hybridScanOptions) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return &QueryResult{Error: err}, err | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Compute aggregations
 | 
			
		
	
		
			
				
					|  |  |  | 	aggResults := e.computeAggregations(results, aggregations) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Build result set
 | 
			
		
	
		
			
				
					|  |  |  | 	columns := make([]string, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 	row := make([]sqltypes.Value, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for i, spec := range aggregations { | 
			
		
	
		
			
				
					|  |  |  | 		columns[i] = spec.Alias | 
			
		
	
		
			
				
					|  |  |  | 		row[i] = e.formatAggregationResult(spec, aggResults[i]) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return &QueryResult{ | 
			
		
	
		
			
				
					|  |  |  | 		Columns: columns, | 
			
		
	
		
			
				
					|  |  |  | 		Rows:    [][]sqltypes.Value{row}, | 
			
		
	
		
			
				
					|  |  |  | 	}, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // computeAggregations computes aggregation functions over the scan results
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult { | 
			
		
	
		
			
				
					|  |  |  | 	aggResults := make([]AggregationResult, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for i, spec := range aggregations { | 
			
		
	
		
			
				
					|  |  |  | 		switch spec.Function { | 
			
		
	
		
			
				
					|  |  |  | 		case "COUNT": | 
			
		
	
		
			
				
					|  |  |  | 			if spec.Column == "*" { | 
			
		
	
		
			
				
					|  |  |  | 				// COUNT(*) counts all rows
 | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = int64(len(results)) | 
			
		
	
		
			
				
					|  |  |  | 			} else if spec.Distinct { | 
			
		
	
		
			
				
					|  |  |  | 				// COUNT(DISTINCT column) counts unique non-null values
 | 
			
		
	
		
			
				
					|  |  |  | 				uniqueValues := make(map[string]bool) | 
			
		
	
		
			
				
					|  |  |  | 				for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 					if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 						if !e.isNullValue(value) { | 
			
		
	
		
			
				
					|  |  |  | 							// Use string representation for uniqueness check
 | 
			
		
	
		
			
				
					|  |  |  | 							rawValue := e.extractRawValue(value) | 
			
		
	
		
			
				
					|  |  |  | 							if rawValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 								uniqueValues[fmt.Sprintf("%v", rawValue)] = true | 
			
		
	
		
			
				
					|  |  |  | 							} | 
			
		
	
		
			
				
					|  |  |  | 						} | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = int64(len(uniqueValues)) | 
			
		
	
		
			
				
					|  |  |  | 			} else { | 
			
		
	
		
			
				
					|  |  |  | 				// COUNT(column) counts non-null values
 | 
			
		
	
		
			
				
					|  |  |  | 				count := int64(0) | 
			
		
	
		
			
				
					|  |  |  | 				for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 					if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 						if !e.isNullValue(value) { | 
			
		
	
		
			
				
					|  |  |  | 							count++ | 
			
		
	
		
			
				
					|  |  |  | 						} | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = count | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "SUM": | 
			
		
	
		
			
				
					|  |  |  | 			sum := float64(0) | 
			
		
	
		
			
				
					|  |  |  | 			for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 				if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 					if numValue := e.convertToNumber(value); numValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 						sum += *numValue | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			aggResults[i].Sum = sum | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "AVG": | 
			
		
	
		
			
				
					|  |  |  | 			sum := float64(0) | 
			
		
	
		
			
				
					|  |  |  | 			count := int64(0) | 
			
		
	
		
			
				
					|  |  |  | 			for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 				if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 					if numValue := e.convertToNumber(value); numValue != nil { | 
			
		
	
		
			
				
					|  |  |  | 						sum += *numValue | 
			
		
	
		
			
				
					|  |  |  | 						count++ | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			if count > 0 { | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Sum = sum / float64(count) // Store average in Sum field
 | 
			
		
	
		
			
				
					|  |  |  | 				aggResults[i].Count = count | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "MIN": | 
			
		
	
		
			
				
					|  |  |  | 			var min interface{} | 
			
		
	
		
			
				
					|  |  |  | 			var minValue *schema_pb.Value | 
			
		
	
		
			
				
					|  |  |  | 			for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 				if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 					if minValue == nil || e.compareValues(value, minValue) < 0 { | 
			
		
	
		
			
				
					|  |  |  | 						minValue = value | 
			
		
	
		
			
				
					|  |  |  | 						min = e.extractRawValue(value) | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			aggResults[i].Min = min | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 		case "MAX": | 
			
		
	
		
			
				
					|  |  |  | 			var max interface{} | 
			
		
	
		
			
				
					|  |  |  | 			var maxValue *schema_pb.Value | 
			
		
	
		
			
				
					|  |  |  | 			for _, result := range results { | 
			
		
	
		
			
				
					|  |  |  | 				if value := e.findColumnValue(result, spec.Column); value != nil { | 
			
		
	
		
			
				
					|  |  |  | 					if maxValue == nil || e.compareValues(value, maxValue) > 0 { | 
			
		
	
		
			
				
					|  |  |  | 						maxValue = value | 
			
		
	
		
			
				
					|  |  |  | 						max = e.extractRawValue(value) | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			aggResults[i].Max = max | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return aggResults | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // Helper functions for aggregation processing
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) isNullValue(value *schema_pb.Value) bool { | 
			
		
	
		
			
				
					|  |  |  | 	return value == nil || value.Kind == nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) convertToNumber(value *schema_pb.Value) *float64 { | 
			
		
	
		
			
				
					|  |  |  | 	switch v := value.Kind.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_Int32Value: | 
			
		
	
		
			
				
					|  |  |  | 		result := float64(v.Int32Value) | 
			
		
	
		
			
				
					|  |  |  | 		return &result | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_Int64Value: | 
			
		
	
		
			
				
					|  |  |  | 		result := float64(v.Int64Value) | 
			
		
	
		
			
				
					|  |  |  | 		return &result | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_FloatValue: | 
			
		
	
		
			
				
					|  |  |  | 		result := float64(v.FloatValue) | 
			
		
	
		
			
				
					|  |  |  | 		return &result | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_DoubleValue: | 
			
		
	
		
			
				
					|  |  |  | 		return &v.DoubleValue | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} { | 
			
		
	
		
			
				
					|  |  |  | 	switch v := value.Kind.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_Int32Value: | 
			
		
	
		
			
				
					|  |  |  | 		return v.Int32Value | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_Int64Value: | 
			
		
	
		
			
				
					|  |  |  | 		return v.Int64Value | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_FloatValue: | 
			
		
	
		
			
				
					|  |  |  | 		return v.FloatValue | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_DoubleValue: | 
			
		
	
		
			
				
					|  |  |  | 		return v.DoubleValue | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_StringValue: | 
			
		
	
		
			
				
					|  |  |  | 		return v.StringValue | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_BoolValue: | 
			
		
	
		
			
				
					|  |  |  | 		return v.BoolValue | 
			
		
	
		
			
				
					|  |  |  | 	case *schema_pb.Value_BytesValue: | 
			
		
	
		
			
				
					|  |  |  | 		return string(v.BytesValue) // Convert bytes to string for comparison
 | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int { | 
			
		
	
		
			
				
					|  |  |  | 	if value2 == nil { | 
			
		
	
		
			
				
					|  |  |  | 		return 1 // value1 > nil
 | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	raw1 := e.extractRawValue(value1) | 
			
		
	
		
			
				
					|  |  |  | 	raw2 := e.extractRawValue(value2) | 
			
		
	
		
			
				
					|  |  |  | 	if raw1 == nil { | 
			
		
	
		
			
				
					|  |  |  | 		return -1 | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	if raw2 == nil { | 
			
		
	
		
			
				
					|  |  |  | 		return 1 | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Simple comparison - in a full implementation this would handle type coercion
 | 
			
		
	
		
			
				
					|  |  |  | 	switch v1 := raw1.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case int32: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(int32); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 < v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return -1 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 > v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return 0 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	case int64: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(int64); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 < v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return -1 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 > v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return 0 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	case float32: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(float32); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 < v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return -1 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 > v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return 0 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	case float64: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(float64); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 < v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return -1 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 > v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return 0 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	case string: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(string); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 < v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return -1 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 > v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return 0 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	case bool: | 
			
		
	
		
			
				
					|  |  |  | 		if v2, ok := raw2.(bool); ok { | 
			
		
	
		
			
				
					|  |  |  | 			if v1 == v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 0 | 
			
		
	
		
			
				
					|  |  |  | 			} else if v1 && !v2 { | 
			
		
	
		
			
				
					|  |  |  | 				return 1 | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 			return -1 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return 0 | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
 | 
			
		
	
		
			
				
					|  |  |  | // - Use parquet metadata for parquet files
 | 
			
		
	
		
			
				
					|  |  |  | // - Count live log files for live data
 | 
			
		
	
		
			
				
					|  |  |  | // - Combine both for accurate results per partition
 | 
			
		
	
		
			
				
					|  |  |  | // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { | 
			
		
	
		
			
				
					|  |  |  | 	// Use the new modular components
 | 
			
		
	
		
			
				
					|  |  |  | 	optimizer := NewFastPathOptimizer(e) | 
			
		
	
		
			
				
					|  |  |  | 	computer := NewAggregationComputer(e) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 1: Determine strategy
 | 
			
		
	
		
			
				
					|  |  |  | 	strategy := optimizer.DetermineStrategy(aggregations) | 
			
		
	
		
			
				
					|  |  |  | 	if !strategy.CanUseFastPath { | 
			
		
	
		
			
				
					|  |  |  | 		return nil, false | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 2: Collect data sources
 | 
			
		
	
		
			
				
					|  |  |  | 	dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return nil, false | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Build partition list for aggregation computer
 | 
			
		
	
		
			
				
					|  |  |  | 	relativePartitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return nil, false | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	topicBasePath := fmt.Sprintf("/topics/%s/%s", hybridScanner.topic.Namespace, hybridScanner.topic.Name) | 
			
		
	
		
			
				
					|  |  |  | 	partitions := make([]string, len(relativePartitions)) | 
			
		
	
		
			
				
					|  |  |  | 	for i, relPartition := range relativePartitions { | 
			
		
	
		
			
				
					|  |  |  | 		partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Debug: Show the hybrid optimization results
 | 
			
		
	
		
			
				
					|  |  |  | 	if dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 { | 
			
		
	
		
			
				
					|  |  |  | 		partitionsWithLiveLogs := 0 | 
			
		
	
		
			
				
					|  |  |  | 		if dataSources.LiveLogRowCount > 0 { | 
			
		
	
		
			
				
					|  |  |  | 			partitionsWithLiveLogs = 1 // Simplified for now
 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows from %d partitions\n", | 
			
		
	
		
			
				
					|  |  |  | 			dataSources.ParquetRowCount, dataSources.LiveLogRowCount, partitionsWithLiveLogs) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 3: Compute aggregations using fast path
 | 
			
		
	
		
			
				
					|  |  |  | 	aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions) | 
			
		
	
		
			
				
					|  |  |  | 	if err != nil { | 
			
		
	
		
			
				
					|  |  |  | 		return nil, false | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Step 4: Build final query result
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	// Build result using fast parquet statistics
 | 
			
		
	
		
			
				
					|  |  |  | 	columns := make([]string, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 	row := make([]sqltypes.Value, len(aggregations)) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	for i, spec := range aggregations { | 
			
		
	
		
			
				
					|  |  |  | 		columns[i] = spec.Alias | 
			
		
	
		
			
				
					|  |  |  | 		row[i] = e.formatAggregationResult(spec, aggResults[i]) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	result := &QueryResult{ | 
			
		
	
		
			
				
					|  |  |  | 		Columns: columns, | 
			
		
	
		
			
				
					|  |  |  | 		Rows:    [][]sqltypes.Value{row}, | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return result, true | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) { | 
			
		
	
		
			
				
					|  |  |  | 	if e.catalog.brokerClient == nil { | 
			
		
	
	
		
			
				
					|  |  | @ -2456,151 +1897,6 @@ func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (* | 
			
		
	
		
			
				
					|  |  |  | 	return recordValue, "live_log", nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // convertJSONValueToSchemaValue converts JSON values to schema_pb.Value
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) convertJSONValueToSchemaValue(jsonValue interface{}) *schema_pb.Value { | 
			
		
	
		
			
				
					|  |  |  | 	switch v := jsonValue.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case string: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case float64: | 
			
		
	
		
			
				
					|  |  |  | 		// JSON numbers are always float64, try to detect if it's actually an integer
 | 
			
		
	
		
			
				
					|  |  |  | 		if v == float64(int64(v)) { | 
			
		
	
		
			
				
					|  |  |  | 			return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case bool: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case nil: | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 	default: | 
			
		
	
		
			
				
					|  |  |  | 		// Convert other types to string
 | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // convertRawValueToSchemaValue converts raw Go values back to schema_pb.Value for comparison
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) convertRawValueToSchemaValue(rawValue interface{}) *schema_pb.Value { | 
			
		
	
		
			
				
					|  |  |  | 	switch v := rawValue.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case int32: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case int64: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case float32: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case float64: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case string: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case bool: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	case []byte: | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: v}} | 
			
		
	
		
			
				
					|  |  |  | 	default: | 
			
		
	
		
			
				
					|  |  |  | 		// Convert other types to string as fallback
 | 
			
		
	
		
			
				
					|  |  |  | 		return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", v)}} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { | 
			
		
	
		
			
				
					|  |  |  | 	switch spec.Function { | 
			
		
	
		
			
				
					|  |  |  | 	case "COUNT": | 
			
		
	
		
			
				
					|  |  |  | 		return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) | 
			
		
	
		
			
				
					|  |  |  | 	case "MIN", "MAX": | 
			
		
	
		
			
				
					|  |  |  | 		return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) | 
			
		
	
		
			
				
					|  |  |  | 	case "SUM", "AVG": | 
			
		
	
		
			
				
					|  |  |  | 		// These require scanning actual values, not just min/max
 | 
			
		
	
		
			
				
					|  |  |  | 		return false | 
			
		
	
		
			
				
					|  |  |  | 	default: | 
			
		
	
		
			
				
					|  |  |  | 		return false | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) isSystemColumn(columnName string) bool { | 
			
		
	
		
			
				
					|  |  |  | 	lowerName := strings.ToLower(columnName) | 
			
		
	
		
			
				
					|  |  |  | 	return lowerName == "_timestamp_ns" || lowerName == "timestamp_ns" || | 
			
		
	
		
			
				
					|  |  |  | 		lowerName == "_key" || lowerName == "key" || | 
			
		
	
		
			
				
					|  |  |  | 		lowerName == "_source" || lowerName == "source" | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // isRegularColumn checks if a column might be a regular data column (placeholder)
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) isRegularColumn(columnName string) bool { | 
			
		
	
		
			
				
					|  |  |  | 	// For now, assume any non-system column is a regular column
 | 
			
		
	
		
			
				
					|  |  |  | 	return !e.isSystemColumn(columnName) | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // getSystemColumnGlobalMin computes global min for system columns using file metadata
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { | 
			
		
	
		
			
				
					|  |  |  | 	lowerName := strings.ToLower(columnName) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	switch lowerName { | 
			
		
	
		
			
				
					|  |  |  | 	case "_timestamp_ns", "timestamp_ns": | 
			
		
	
		
			
				
					|  |  |  | 		// For timestamps, find the earliest timestamp across all files
 | 
			
		
	
		
			
				
					|  |  |  | 		// This should match what's in the Extended["min"] metadata
 | 
			
		
	
		
			
				
					|  |  |  | 		var minTimestamp *int64 | 
			
		
	
		
			
				
					|  |  |  | 		for _, fileStats := range allFileStats { | 
			
		
	
		
			
				
					|  |  |  | 			for _, fileStat := range fileStats { | 
			
		
	
		
			
				
					|  |  |  | 				// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
 | 
			
		
	
		
			
				
					|  |  |  | 				timestamp := e.extractTimestampFromFilename(fileStat.FileName) | 
			
		
	
		
			
				
					|  |  |  | 				if timestamp != 0 { | 
			
		
	
		
			
				
					|  |  |  | 					if minTimestamp == nil || timestamp < *minTimestamp { | 
			
		
	
		
			
				
					|  |  |  | 						minTimestamp = ×tamp | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		if minTimestamp != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return *minTimestamp | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	case "_key", "key": | 
			
		
	
		
			
				
					|  |  |  | 		// For keys, we'd need to read the actual parquet column stats
 | 
			
		
	
		
			
				
					|  |  |  | 		// Fall back to scanning if not available in our current stats
 | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	case "_source", "source": | 
			
		
	
		
			
				
					|  |  |  | 		// Source is always "parquet_archive" for parquet files
 | 
			
		
	
		
			
				
					|  |  |  | 		return "parquet_archive" | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // getSystemColumnGlobalMax computes global max for system columns using file metadata
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { | 
			
		
	
		
			
				
					|  |  |  | 	lowerName := strings.ToLower(columnName) | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	switch lowerName { | 
			
		
	
		
			
				
					|  |  |  | 	case "_timestamp_ns", "timestamp_ns": | 
			
		
	
		
			
				
					|  |  |  | 		// For timestamps, find the latest timestamp across all files
 | 
			
		
	
		
			
				
					|  |  |  | 		var maxTimestamp *int64 | 
			
		
	
		
			
				
					|  |  |  | 		for _, fileStats := range allFileStats { | 
			
		
	
		
			
				
					|  |  |  | 			for _, fileStat := range fileStats { | 
			
		
	
		
			
				
					|  |  |  | 				// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
 | 
			
		
	
		
			
				
					|  |  |  | 				timestamp := e.extractTimestampFromFilename(fileStat.FileName) | 
			
		
	
		
			
				
					|  |  |  | 				if timestamp != 0 { | 
			
		
	
		
			
				
					|  |  |  | 					if maxTimestamp == nil || timestamp > *maxTimestamp { | 
			
		
	
		
			
				
					|  |  |  | 						maxTimestamp = ×tamp | 
			
		
	
		
			
				
					|  |  |  | 					} | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		if maxTimestamp != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return *maxTimestamp | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	case "_key", "key": | 
			
		
	
		
			
				
					|  |  |  | 		// For keys, we'd need to read the actual parquet column stats
 | 
			
		
	
		
			
				
					|  |  |  | 		return nil | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	case "_source", "source": | 
			
		
	
		
			
				
					|  |  |  | 		// Source is always "parquet_archive" for parquet files
 | 
			
		
	
		
			
				
					|  |  |  | 		return "parquet_archive" | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	return nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // extractTimestampFromFilename extracts timestamp from parquet filename
 | 
			
		
	
		
			
				
					|  |  |  | // Format: YYYY-MM-DD-HH-MM-SS.parquet
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { | 
			
		
	
	
		
			
				
					|  |  | @ -2979,49 +2275,6 @@ func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) | 
			
		
	
		
			
				
					|  |  |  | 	return totalScannedRows, nil | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) formatAggregationResult(spec AggregationSpec, result AggregationResult) sqltypes.Value { | 
			
		
	
		
			
				
					|  |  |  | 	switch spec.Function { | 
			
		
	
		
			
				
					|  |  |  | 	case "COUNT": | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewInt64(result.Count) | 
			
		
	
		
			
				
					|  |  |  | 	case "SUM": | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewFloat64(result.Sum) | 
			
		
	
		
			
				
					|  |  |  | 	case "AVG": | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewFloat64(result.Sum) // Sum contains the average for AVG
 | 
			
		
	
		
			
				
					|  |  |  | 	case "MIN": | 
			
		
	
		
			
				
					|  |  |  | 		if result.Min != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return e.convertRawValueToSQL(result.Min) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NULL | 
			
		
	
		
			
				
					|  |  |  | 	case "MAX": | 
			
		
	
		
			
				
					|  |  |  | 		if result.Max != nil { | 
			
		
	
		
			
				
					|  |  |  | 			return e.convertRawValueToSQL(result.Max) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NULL | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return sqltypes.NULL | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value { | 
			
		
	
		
			
				
					|  |  |  | 	switch v := value.(type) { | 
			
		
	
		
			
				
					|  |  |  | 	case int32: | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewInt32(v) | 
			
		
	
		
			
				
					|  |  |  | 	case int64: | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewInt64(v) | 
			
		
	
		
			
				
					|  |  |  | 	case float32: | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewFloat32(v) | 
			
		
	
		
			
				
					|  |  |  | 	case float64: | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewFloat64(v) | 
			
		
	
		
			
				
					|  |  |  | 	case string: | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewVarChar(v) | 
			
		
	
		
			
				
					|  |  |  | 	case bool: | 
			
		
	
		
			
				
					|  |  |  | 		if v { | 
			
		
	
		
			
				
					|  |  |  | 			return sqltypes.NewVarChar("1") | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		return sqltypes.NewVarChar("0") | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return sqltypes.NULL | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | // findColumnValue performs case-insensitive lookup of column values
 | 
			
		
	
		
			
				
					|  |  |  | // Now includes support for system columns stored in HybridScanResult
 | 
			
		
	
		
			
				
					|  |  |  | func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { | 
			
		
	
	
		
			
				
					|  |  | 
 |