diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 42cd2510a..0779cedf3 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -3,6 +3,7 @@ package engine import ( "context" "fmt" + "math" "regexp" "strconv" "strings" @@ -140,7 +141,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr) } } - + hybridScanner, err := NewHybridMessageScanner(filerClient, database, tableName) if err != nil { // Fallback to sample data if topic doesn't exist or filer unavailable @@ -189,6 +190,9 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. if parseErr != nil { return &QueryResult{Error: parseErr}, parseErr } + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } limit = int(limit64) } } @@ -200,7 +204,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. if stmt.Where != nil { startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) } - + hybridScanOptions := HybridScanOptions{ StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons @@ -230,7 +234,7 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlparser.Select, database, tableName string) (*QueryResult, error) { // Create a sample HybridMessageScanner to simulate both data sources now := time.Now().UnixNano() - + var sampleResults []HybridScanResult switch tableName { @@ -239,9 +243,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa // Live log data (recent) { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, }, Timestamp: now - 300000000000, // 5 minutes ago Key: []byte("live-1003"), @@ -249,9 +253,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa }, { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_click"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"button": "submit", "live": true}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"button": "submit", "live": true}`}}, }, Timestamp: now - 120000000000, // 2 minutes ago Key: []byte("live-1004"), @@ -260,9 +264,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa // Archived Parquet data (older) { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, }, Timestamp: now - 3600000000000, // 1 hour ago Key: []byte("archived-1001"), @@ -270,9 +274,9 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa }, { Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, }, Timestamp: now - 1800000000000, // 30 minutes ago Key: []byte("archived-1002"), @@ -284,7 +288,7 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa // Live system logs { Values: map[string]*schema_pb.Value{ - "level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}}, + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}}, "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live service heartbeat"}}, "service": {Kind: &schema_pb.Value_StringValue{StringValue: "api-gateway"}}, }, @@ -295,7 +299,7 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa // Archived system logs { Values: map[string]*schema_pb.Value{ - "level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}}, + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}}, "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Database connection timeout"}}, "service": {Kind: &schema_pb.Value_StringValue{StringValue: "user-service"}}, }, @@ -314,6 +318,11 @@ func (e *SQLEngine) executeSelectWithSampleData(ctx context.Context, stmt *sqlpa if stmt.Limit != nil && stmt.Limit.Rowcount != nil { if limitExpr, ok := stmt.Limit.Rowcount.(*sqlparser.SQLVal); ok && limitExpr.Type == sqlparser.IntVal { if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil { + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{ + Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64), + }, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } limit := int(limit64) if limit > 0 && limit < len(sampleResults) { sampleResults = sampleResults[:limit] @@ -334,7 +343,7 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu Rows: [][]sqltypes.Value{}, } } - + // Determine columns if not specified if len(columns) == 0 { columnSet := make(map[string]bool) @@ -343,16 +352,16 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu columnSet[columnName] = true } } - + columns = make([]string, 0, len(columnSet)) for columnName := range columnSet { columns = append(columns, columnName) } - + // Add metadata columns showing data source columns = append(columns, "_source") } - + // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { @@ -368,7 +377,7 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu } rows[i] = row } - + return &QueryResult{ Columns: columns, Rows: rows, @@ -380,10 +389,10 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu // Returns (startTimeNs, stopTimeNs) where 0 means unbounded func (e *SQLEngine) extractTimeFilters(expr sqlparser.Expr) (int64, int64) { startTimeNs, stopTimeNs := int64(0), int64(0) - + // Recursively extract time filters from expression tree e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs) - + return startTimeNs, stopTimeNs } @@ -412,10 +421,10 @@ func (e *SQLEngine) extractTimeFromComparison(comp *sqlparser.ComparisonExpr, st // Check if this is a time-related column comparison leftCol := e.getColumnName(comp.Left) rightCol := e.getColumnName(comp.Right) - + var valueExpr sqlparser.Expr var reversed bool - + // Determine which side is the time column if e.isTimeColumn(leftCol) { valueExpr = comp.Right @@ -427,27 +436,27 @@ func (e *SQLEngine) extractTimeFromComparison(comp *sqlparser.ComparisonExpr, st // Not a time comparison return } - + // Extract the time value timeValue := e.extractTimeValue(valueExpr) if timeValue == 0 { // Couldn't parse time value return } - + // Apply the comparison operator to determine time bounds operator := comp.Operator if reversed { // Reverse the operator if column and value are swapped operator = e.reverseOperator(operator) } - + switch operator { case sqlparser.GreaterThanStr: // timestamp > value if *startTimeNs == 0 || timeValue > *startTimeNs { *startTimeNs = timeValue } - case sqlparser.GreaterEqualStr: // timestamp >= value + case sqlparser.GreaterEqualStr: // timestamp >= value if *startTimeNs == 0 || timeValue >= *startTimeNs { *startTimeNs = timeValue } @@ -471,25 +480,25 @@ func (e *SQLEngine) isTimeColumn(columnName string) bool { if columnName == "" { return false } - + // System timestamp columns timeColumns := []string{ - "_timestamp_ns", // SeaweedFS MQ system timestamp (nanoseconds) - "timestamp_ns", // Alternative naming - "timestamp", // Common timestamp field - "created_at", // Common creation time field - "updated_at", // Common update time field - "event_time", // Event timestamp - "log_time", // Log timestamp - "ts", // Short form - } - + "_timestamp_ns", // SeaweedFS MQ system timestamp (nanoseconds) + "timestamp_ns", // Alternative naming + "timestamp", // Common timestamp field + "created_at", // Common creation time field + "updated_at", // Common update time field + "event_time", // Event timestamp + "log_time", // Log timestamp + "ts", // Short form + } + for _, timeCol := range timeColumns { if strings.EqualFold(columnName, timeCol) { return true } } - + return false } @@ -515,29 +524,29 @@ func (e *SQLEngine) extractTimeValue(expr sqlparser.Expr) int64 { } else if exprType.Type == sqlparser.StrVal { // Parse as ISO date or other string formats timeStr := string(exprType.Val) - + // Try parsing as RFC3339 (ISO 8601) if t, err := time.Parse(time.RFC3339, timeStr); err == nil { return t.UnixNano() } - + // Try parsing as RFC3339 with nanoseconds if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil { return t.UnixNano() } - + // Try parsing as date only (YYYY-MM-DD) if t, err := time.Parse("2006-01-02", timeStr); err == nil { return t.UnixNano() } - + // Try parsing as datetime (YYYY-MM-DD HH:MM:SS) if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil { return t.UnixNano() } } } - + return 0 // Couldn't parse } @@ -690,6 +699,9 @@ func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interf switch v := fieldValue.Kind.(type) { case *schema_pb.Value_Int32Value: if intVal, ok := compareValue.(int64); ok { + if intVal > math.MaxInt32 || intVal < math.MinInt32 { + return false // Value out of range for int32, cannot be equal + } return v.Int32Value == int32(intVal) } case *schema_pb.Value_Int64Value: @@ -708,6 +720,12 @@ func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue inte switch v := fieldValue.Kind.(type) { case *schema_pb.Value_Int32Value: if intVal, ok := compareValue.(int64); ok { + if intVal > math.MaxInt32 { + return true // int32 value is always less than values > MaxInt32 + } + if intVal < math.MinInt32 { + return false // int32 value is always greater than values < MinInt32 + } return v.Int32Value < int32(intVal) } case *schema_pb.Value_Int64Value: @@ -722,6 +740,12 @@ func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue i switch v := fieldValue.Kind.(type) { case *schema_pb.Value_Int32Value: if intVal, ok := compareValue.(int64); ok { + if intVal > math.MaxInt32 { + return false // int32 value is never greater than values > MaxInt32 + } + if intVal < math.MinInt32 { + return true // int32 value is always greater than values < MinInt32 + } return v.Int32Value > int32(intVal) } case *schema_pb.Value_Int64Value: @@ -739,24 +763,24 @@ func (e *SQLEngine) valueLike(fieldValue *schema_pb.Value, compareValue interfac if !ok { return false } - + pattern, ok := compareValue.(string) if !ok { return false } - + // Convert SQL LIKE pattern to Go regex pattern // % matches any sequence of characters (.*), _ matches single character (.) regexPattern := strings.ReplaceAll(pattern, "%", ".*") regexPattern = strings.ReplaceAll(regexPattern, "_", ".") regexPattern = "^" + regexPattern + "$" // Anchor to match entire string - + // Compile and match regex regex, err := regexp.Compile(regexPattern) if err != nil { return false // Invalid pattern } - + return regex.MatchString(stringVal.StringValue) } @@ -768,14 +792,14 @@ func (e *SQLEngine) valueIn(fieldValue *schema_pb.Value, compareValue interface{ if !ok { return false } - + // Check if fieldValue matches any value in the list for _, value := range values { if e.valuesEqual(fieldValue, value) { return true } } - + return false }