Browse Source

integer conversion

pull/7185/head
chrislu 1 month ago
parent
commit
675ec42fad
  1. 122
      weed/query/engine/engine.go

122
weed/query/engine/engine.go

@ -3,6 +3,7 @@ package engine
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@ -140,7 +141,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr) fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr)
} }
} }
hybridScanner, err := NewHybridMessageScanner(filerClient, database, tableName) hybridScanner, err := NewHybridMessageScanner(filerClient, database, tableName)
if err != nil { if err != nil {
// Fallback to sample data if topic doesn't exist or filer unavailable // Fallback to sample data if topic doesn't exist or filer unavailable
@ -189,6 +190,9 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
if parseErr != nil { if parseErr != nil {
return &QueryResult{Error: parseErr}, parseErr return &QueryResult{Error: parseErr}, parseErr
} }
if limit64 > math.MaxInt32 || limit64 < 0 {
return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
}
limit = int(limit64) limit = int(limit64)
} }
} }
@ -200,7 +204,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
if stmt.Where != nil { if stmt.Where != nil {
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
} }
hybridScanOptions := HybridScanOptions{ hybridScanOptions := HybridScanOptions{
StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
@ -230,7 +234,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser.
func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlparser.Select, database, tableName string) (*QueryResult, error) { func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlparser.Select, database, tableName string) (*QueryResult, error) {
// Create a sample HybridMessageScanner to simulate both data sources // Create a sample HybridMessageScanner to simulate both data sources
now := time.Now().UnixNano() now := time.Now().UnixNano()
var sampleResults []HybridScanResult var sampleResults []HybridScanResult
switch tableName { switch tableName {
@ -239,9 +243,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
// Live log data (recent) // Live log data (recent)
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}},
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}},
}, },
Timestamp: now - 300000000000, // 5 minutes ago Timestamp: now - 300000000000, // 5 minutes ago
Key: []byte("live-1003"), Key: []byte("live-1003"),
@ -249,9 +253,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
}, },
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}},
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_click"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_click"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"button": "submit", "live": true}`}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"button": "submit", "live": true}`}},
}, },
Timestamp: now - 120000000000, // 2 minutes ago Timestamp: now - 120000000000, // 2 minutes ago
Key: []byte("live-1004"), Key: []byte("live-1004"),
@ -260,9 +264,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
// Archived Parquet data (older) // Archived Parquet data (older)
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}},
}, },
Timestamp: now - 3600000000000, // 1 hour ago Timestamp: now - 3600000000000, // 1 hour ago
Key: []byte("archived-1001"), Key: []byte("archived-1001"),
@ -270,9 +274,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
}, },
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
"user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
"event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}},
"data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}},
}, },
Timestamp: now - 1800000000000, // 30 minutes ago Timestamp: now - 1800000000000, // 30 minutes ago
Key: []byte("archived-1002"), Key: []byte("archived-1002"),
@ -284,7 +288,7 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
// Live system logs // Live system logs
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}},
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}},
"message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live service heartbeat"}}, "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live service heartbeat"}},
"service": {Kind: &schema_pb.Value_StringValue{StringValue: "api-gateway"}}, "service": {Kind: &schema_pb.Value_StringValue{StringValue: "api-gateway"}},
}, },
@ -295,7 +299,7 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
// Archived system logs // Archived system logs
{ {
Values: map[string]*schema_pb.Value{ Values: map[string]*schema_pb.Value{
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}},
"level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}},
"message": {Kind: &schema_pb.Value_StringValue{StringValue: "Database connection timeout"}}, "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Database connection timeout"}},
"service": {Kind: &schema_pb.Value_StringValue{StringValue: "user-service"}}, "service": {Kind: &schema_pb.Value_StringValue{StringValue: "user-service"}},
}, },
@ -314,6 +318,11 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa
if stmt.Limit != nil && stmt.Limit.Rowcount != nil { if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal { if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal {
if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil { if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
if limit64 > math.MaxInt32 || limit64 < 0 {
return &QueryResult{
Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64),
}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
}
limit := int(limit64) limit := int(limit64)
if limit > 0 && limit < len(sampleResults) { if limit > 0 && limit < len(sampleResults) {
sampleResults = sampleResults[:limit] sampleResults = sampleResults[:limit]
@ -334,7 +343,7 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu
Rows: [][]sqltypes.Value{}, Rows: [][]sqltypes.Value{},
} }
} }
// Determine columns if not specified // Determine columns if not specified
if len(columns) == 0 { if len(columns) == 0 {
columnSet := make(map[string]bool) columnSet := make(map[string]bool)
@ -343,16 +352,16 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu
columnSet[columnName] = true columnSet[columnName] = true
} }
} }
columns = make([]string, 0, len(columnSet)) columns = make([]string, 0, len(columnSet))
for columnName := range columnSet { for columnName := range columnSet {
columns = append(columns, columnName) columns = append(columns, columnName)
} }
// Add metadata columns showing data source // Add metadata columns showing data source
columns = append(columns, "_source") columns = append(columns, "_source")
} }
// Convert to SQL rows // Convert to SQL rows
rows := make([][]sqltypes.Value, len(results)) rows := make([][]sqltypes.Value, len(results))
for i, result := range results { for i, result := range results {
@ -368,7 +377,7 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu
} }
rows[i] = row rows[i] = row
} }
return &QueryResult{ return &QueryResult{
Columns: columns, Columns: columns,
Rows: rows, Rows: rows,
@ -380,10 +389,10 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu
// Returns (startTimeNs, stopTimeNs) where 0 means unbounded // Returns (startTimeNs, stopTimeNs) where 0 means unbounded
func (e *SQLEngine) extractTimeFilters(expr sqlparser.Expr) (int64, int64) { func (e *SQLEngine) extractTimeFilters(expr sqlparser.Expr) (int64, int64) {
startTimeNs, stopTimeNs := int64(0), int64(0) startTimeNs, stopTimeNs := int64(0), int64(0)
// Recursively extract time filters from expression tree // Recursively extract time filters from expression tree
e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs) e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs)
return startTimeNs, stopTimeNs return startTimeNs, stopTimeNs
} }
@ -412,10 +421,10 @@ func (e *SQLEngine) extractTimeFromComparison(comp *sqlparser.ComparisonExpr, st
// Check if this is a time-related column comparison // Check if this is a time-related column comparison
leftCol := e.getColumnName(comp.Left) leftCol := e.getColumnName(comp.Left)
rightCol := e.getColumnName(comp.Right) rightCol := e.getColumnName(comp.Right)
var valueExpr sqlparser.Expr var valueExpr sqlparser.Expr
var reversed bool var reversed bool
// Determine which side is the time column // Determine which side is the time column
if e.isTimeColumn(leftCol) { if e.isTimeColumn(leftCol) {
valueExpr = comp.Right valueExpr = comp.Right
@ -427,27 +436,27 @@ func (e *SQLEngine) extractTimeFromComparison(comp *sqlparser.ComparisonExpr, st
// Not a time comparison // Not a time comparison
return return
} }
// Extract the time value // Extract the time value
timeValue := e.extractTimeValue(valueExpr) timeValue := e.extractTimeValue(valueExpr)
if timeValue == 0 { if timeValue == 0 {
// Couldn't parse time value // Couldn't parse time value
return return
} }
// Apply the comparison operator to determine time bounds // Apply the comparison operator to determine time bounds
operator := comp.Operator operator := comp.Operator
if reversed { if reversed {
// Reverse the operator if column and value are swapped // Reverse the operator if column and value are swapped
operator = e.reverseOperator(operator) operator = e.reverseOperator(operator)
} }
switch operator { switch operator {
case sqlparser.GreaterThanStr: // timestamp > value case sqlparser.GreaterThanStr: // timestamp > value
if *startTimeNs == 0 || timeValue > *startTimeNs { if *startTimeNs == 0 || timeValue > *startTimeNs {
*startTimeNs = timeValue *startTimeNs = timeValue
} }
case sqlparser.GreaterEqualStr: // timestamp >= value
case sqlparser.GreaterEqualStr: // timestamp >= value
if *startTimeNs == 0 || timeValue >= *startTimeNs { if *startTimeNs == 0 || timeValue >= *startTimeNs {
*startTimeNs = timeValue *startTimeNs = timeValue
} }
@ -471,25 +480,25 @@ func (e *SQLEngine) isTimeColumn(columnName string) bool {
if columnName == "" { if columnName == "" {
return false return false
} }
// System timestamp columns // System timestamp columns
timeColumns := []string{ timeColumns := []string{
"_timestamp_ns", // SeaweedFS MQ system timestamp (nanoseconds)
"timestamp_ns", // Alternative naming
"timestamp", // Common timestamp field
"created_at", // Common creation time field
"updated_at", // Common update time field
"event_time", // Event timestamp
"log_time", // Log timestamp
"ts", // Short form
}
"_timestamp_ns", // SeaweedFS MQ system timestamp (nanoseconds)
"timestamp_ns", // Alternative naming
"timestamp", // Common timestamp field
"created_at", // Common creation time field
"updated_at", // Common update time field
"event_time", // Event timestamp
"log_time", // Log timestamp
"ts", // Short form
}
for _, timeCol := range timeColumns { for _, timeCol := range timeColumns {
if strings.EqualFold(columnName, timeCol) { if strings.EqualFold(columnName, timeCol) {
return true return true
} }
} }
return false return false
} }
@ -515,29 +524,29 @@ func (e *SQLEngine) extractTimeValue(expr sqlparser.Expr) int64 {
} else if exprType.Type == sqlparser.StrVal { } else if exprType.Type == sqlparser.StrVal {
// Parse as ISO date or other string formats // Parse as ISO date or other string formats
timeStr := string(exprType.Val) timeStr := string(exprType.Val)
// Try parsing as RFC3339 (ISO 8601) // Try parsing as RFC3339 (ISO 8601)
if t, err := time.Parse(time.RFC3339, timeStr); err == nil { if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
return t.UnixNano() return t.UnixNano()
} }
// Try parsing as RFC3339 with nanoseconds // Try parsing as RFC3339 with nanoseconds
if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil { if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil {
return t.UnixNano() return t.UnixNano()
} }
// Try parsing as date only (YYYY-MM-DD) // Try parsing as date only (YYYY-MM-DD)
if t, err := time.Parse("2006-01-02", timeStr); err == nil { if t, err := time.Parse("2006-01-02", timeStr); err == nil {
return t.UnixNano() return t.UnixNano()
} }
// Try parsing as datetime (YYYY-MM-DD HH:MM:SS) // Try parsing as datetime (YYYY-MM-DD HH:MM:SS)
if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil { if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil {
return t.UnixNano() return t.UnixNano()
} }
} }
} }
return 0 // Couldn't parse return 0 // Couldn't parse
} }
@ -690,6 +699,9 @@ func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interf
switch v := fieldValue.Kind.(type) { switch v := fieldValue.Kind.(type) {
case *schema_pb.Value_Int32Value: case *schema_pb.Value_Int32Value:
if intVal, ok := compareValue.(int64); ok { if intVal, ok := compareValue.(int64); ok {
if intVal > math.MaxInt32 || intVal < math.MinInt32 {
return false // Value out of range for int32, cannot be equal
}
return v.Int32Value == int32(intVal) return v.Int32Value == int32(intVal)
} }
case *schema_pb.Value_Int64Value: case *schema_pb.Value_Int64Value:
@ -708,6 +720,12 @@ func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue inte
switch v := fieldValue.Kind.(type) { switch v := fieldValue.Kind.(type) {
case *schema_pb.Value_Int32Value: case *schema_pb.Value_Int32Value:
if intVal, ok := compareValue.(int64); ok { if intVal, ok := compareValue.(int64); ok {
if intVal > math.MaxInt32 {
return true // int32 value is always less than values > MaxInt32
}
if intVal < math.MinInt32 {
return false // int32 value is always greater than values < MinInt32
}
return v.Int32Value < int32(intVal) return v.Int32Value < int32(intVal)
} }
case *schema_pb.Value_Int64Value: case *schema_pb.Value_Int64Value:
@ -722,6 +740,12 @@ func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue i
switch v := fieldValue.Kind.(type) { switch v := fieldValue.Kind.(type) {
case *schema_pb.Value_Int32Value: case *schema_pb.Value_Int32Value:
if intVal, ok := compareValue.(int64); ok { if intVal, ok := compareValue.(int64); ok {
if intVal > math.MaxInt32 {
return false // int32 value is never greater than values > MaxInt32
}
if intVal < math.MinInt32 {
return true // int32 value is always greater than values < MinInt32
}
return v.Int32Value > int32(intVal) return v.Int32Value > int32(intVal)
} }
case *schema_pb.Value_Int64Value: case *schema_pb.Value_Int64Value:
@ -739,24 +763,24 @@ func (e *SQLEngine) valueLike(fieldValue *schema_pb.Value, compareValue interfac
if !ok { if !ok {
return false return false
} }
pattern, ok := compareValue.(string) pattern, ok := compareValue.(string)
if !ok { if !ok {
return false return false
} }
// Convert SQL LIKE pattern to Go regex pattern // Convert SQL LIKE pattern to Go regex pattern
// % matches any sequence of characters (.*), _ matches single character (.) // % matches any sequence of characters (.*), _ matches single character (.)
regexPattern := strings.ReplaceAll(pattern, "%", ".*") regexPattern := strings.ReplaceAll(pattern, "%", ".*")
regexPattern = strings.ReplaceAll(regexPattern, "_", ".") regexPattern = strings.ReplaceAll(regexPattern, "_", ".")
regexPattern = "^" + regexPattern + "$" // Anchor to match entire string regexPattern = "^" + regexPattern + "$" // Anchor to match entire string
// Compile and match regex // Compile and match regex
regex, err := regexp.Compile(regexPattern) regex, err := regexp.Compile(regexPattern)
if err != nil { if err != nil {
return false // Invalid pattern return false // Invalid pattern
} }
return regex.MatchString(stringVal.StringValue) return regex.MatchString(stringVal.StringValue)
} }
@ -768,14 +792,14 @@ func (e *SQLEngine) valueIn(fieldValue *schema_pb.Value, compareValue interface{
if !ok { if !ok {
return false return false
} }
// Check if fieldValue matches any value in the list // Check if fieldValue matches any value in the list
for _, value := range values { for _, value := range values {
if e.valuesEqual(fieldValue, value) { if e.valuesEqual(fieldValue, value) {
return true return true
} }
} }
return false return false
} }

Loading…
Cancel
Save