|
|
@ -832,164 +832,6 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. |
|
|
|
return hybridScanner.ConvertToSQLResult(results, columns), nil |
|
|
|
} |
|
|
|
|
|
|
|
// executeSelectWithSampleData provides enhanced sample data that simulates both live and archived messages
|
|
|
|
func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlparser.Select, database, tableName string) (*QueryResult, error) { |
|
|
|
// Create a sample HybridMessageScanner to simulate both data sources
|
|
|
|
now := time.Now().UnixNano() |
|
|
|
|
|
|
|
var sampleResults []HybridScanResult |
|
|
|
|
|
|
|
switch tableName { |
|
|
|
case "user_events": |
|
|
|
sampleResults = []HybridScanResult{ |
|
|
|
// Live log data (recent)
|
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, |
|
|
|
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, |
|
|
|
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 300000000000, // 5 minutes ago
|
|
|
|
Key: []byte("live-1003"), |
|
|
|
Source: "live_log", |
|
|
|
}, |
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, |
|
|
|
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_click"}}, |
|
|
|
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"button": "submit", "live": true}`}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 120000000000, // 2 minutes ago
|
|
|
|
Key: []byte("live-1004"), |
|
|
|
Source: "live_log", |
|
|
|
}, |
|
|
|
// Archived Parquet data (older)
|
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, |
|
|
|
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, |
|
|
|
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 3600000000000, // 1 hour ago
|
|
|
|
Key: []byte("archived-1001"), |
|
|
|
Source: "parquet_archive", |
|
|
|
}, |
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, |
|
|
|
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, |
|
|
|
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 1800000000000, // 30 minutes ago
|
|
|
|
Key: []byte("archived-1002"), |
|
|
|
Source: "parquet_archive", |
|
|
|
}, |
|
|
|
} |
|
|
|
case "system_logs": |
|
|
|
sampleResults = []HybridScanResult{ |
|
|
|
// Live system logs
|
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}}, |
|
|
|
"message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live service heartbeat"}}, |
|
|
|
"service": {Kind: &schema_pb.Value_StringValue{StringValue: "api-gateway"}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 60000000000, // 1 minute ago
|
|
|
|
Key: []byte("live-log-001"), |
|
|
|
Source: "live_log", |
|
|
|
}, |
|
|
|
// Archived system logs
|
|
|
|
{ |
|
|
|
Values: map[string]*schema_pb.Value{ |
|
|
|
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}}, |
|
|
|
"message": {Kind: &schema_pb.Value_StringValue{StringValue: "Database connection timeout"}}, |
|
|
|
"service": {Kind: &schema_pb.Value_StringValue{StringValue: "user-service"}}, |
|
|
|
}, |
|
|
|
Timestamp: now - 7200000000000, // 2 hours ago
|
|
|
|
Key: []byte("archived-error-001"), |
|
|
|
Source: "parquet_archive", |
|
|
|
}, |
|
|
|
} |
|
|
|
default: |
|
|
|
return &QueryResult{ |
|
|
|
Error: fmt.Errorf("table '%s.%s' not found", database, tableName), |
|
|
|
}, fmt.Errorf("table '%s.%s' not found", database, tableName) |
|
|
|
} |
|
|
|
|
|
|
|
// Apply basic LIMIT if specified
|
|
|
|
if stmt.Limit != nil && stmt.Limit.Rowcount != nil { |
|
|
|
if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal { |
|
|
|
if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil { |
|
|
|
if limit64 > math.MaxInt32 || limit64 < 0 { |
|
|
|
return &QueryResult{ |
|
|
|
Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64), |
|
|
|
}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) |
|
|
|
} |
|
|
|
limit := int(limit64) |
|
|
|
if limit > 0 && limit < len(sampleResults) { |
|
|
|
sampleResults = sampleResults[:limit] |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Convert to SQL result format using hybrid scanner logic
|
|
|
|
return convertHybridResultsToSQL(sampleResults, nil), nil |
|
|
|
} |
|
|
|
|
|
|
|
// convertHybridResultsToSQL converts HybridScanResults to SQL format (helper function)
|
|
|
|
func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *QueryResult { |
|
|
|
if len(results) == 0 { |
|
|
|
return &QueryResult{ |
|
|
|
Columns: columns, |
|
|
|
Rows: [][]sqltypes.Value{}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Determine columns if not specified
|
|
|
|
if len(columns) == 0 { |
|
|
|
columnSet := make(map[string]bool) |
|
|
|
for _, result := range results { |
|
|
|
for columnName := range result.Values { |
|
|
|
columnSet[columnName] = true |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
columns = make([]string, 0, len(columnSet)) |
|
|
|
for columnName := range columnSet { |
|
|
|
columns = append(columns, columnName) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Convert to SQL rows
|
|
|
|
rows := make([][]sqltypes.Value, len(results)) |
|
|
|
for i, result := range results { |
|
|
|
row := make([]sqltypes.Value, len(columns)) |
|
|
|
for j, columnName := range columns { |
|
|
|
switch columnName { |
|
|
|
case "_source": |
|
|
|
row[j] = sqltypes.NewVarChar(result.Source) |
|
|
|
case "_timestamp_ns": |
|
|
|
row[j] = sqltypes.NewInt64(result.Timestamp) |
|
|
|
case "_key": |
|
|
|
row[j] = sqltypes.NewVarBinary(string(result.Key)) |
|
|
|
default: |
|
|
|
if value, exists := result.Values[columnName]; exists { |
|
|
|
row[j] = convertSchemaValueToSQL(value) |
|
|
|
} else { |
|
|
|
row[j] = sqltypes.NULL |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
rows[i] = row |
|
|
|
} |
|
|
|
|
|
|
|
return &QueryResult{ |
|
|
|
Columns: columns, |
|
|
|
Rows: rows, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// extractTimeFilters extracts time range filters from WHERE clause for optimization
|
|
|
|
// This allows push-down of time-based queries to improve scan performance
|
|
|
|
// Returns (startTimeNs, stopTimeNs) where 0 means unbounded
|
|
|
@ -1122,12 +964,13 @@ func (e *SQLEngine) getColumnName(expr sqlparser.Expr) string { |
|
|
|
func (e *SQLEngine) extractTimeValue(expr sqlparser.Expr) int64 { |
|
|
|
switch exprType := expr.(type) { |
|
|
|
case *sqlparser.SQLVal: |
|
|
|
if exprType.Type == sqlparser.IntVal { |
|
|
|
switch exprType.Type { |
|
|
|
case sqlparser.IntVal: |
|
|
|
// Parse as nanosecond timestamp
|
|
|
|
if val, err := strconv.ParseInt(string(exprType.Val), 10, 64); err == nil { |
|
|
|
return val |
|
|
|
} |
|
|
|
} else if exprType.Type == sqlparser.StrVal { |
|
|
|
case sqlparser.StrVal: |
|
|
|
// Parse as ISO date or other string formats
|
|
|
|
timeStr := string(exprType.Val) |
|
|
|
|
|
|
|