From 699e2f441308e06da7a97052fa44e00a261b213c Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 3 Sep 2025 07:29:03 -0700 Subject: [PATCH] feat: Add logical type support to SQL query engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extended SQL engine to handle new Parquet logical types: - Added TimestampValue comparison support (microsecond precision) - Added DateValue comparison support (days since epoch) - Added DecimalValue comparison support with string conversion - Added TimeValue comparison support (microseconds since midnight) - Enhanced valuesEqual(), valueLessThan(), valueGreaterThan() functions - Added decimalToString() helper for precise decimal-to-string conversion - Imported math/big for arbitrary precision decimal handling The SQL engine can now: - ✅ Compare TIMESTAMP values for filtering (e.g., WHERE timestamp > 1672531200000000000) - ✅ Compare DATE values for date-based queries (e.g., WHERE birth_date >= 12345) - ✅ Compare DECIMAL values for precise financial calculations - ✅ Compare TIME values for time-of-day filtering Next: Add YEAR(), MONTH(), DAY() extraction functions for date analytics. --- test/postgres/producer.go | 24 ++++----- weed/query/engine/engine.go | 100 ++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 12 deletions(-) diff --git a/test/postgres/producer.go b/test/postgres/producer.go index c8e210d89..20a72993f 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -31,7 +31,7 @@ type UserEvent struct { Status string `json:"status"` Amount float64 `json:"amount,omitempty"` PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL - BirthDate time.Time `json:"birth_date"` // Will be converted to DATE + BirthDate time.Time `json:"birth_date"` // Will be converted to DATE Timestamp time.Time `json:"timestamp"` Metadata string `json:"metadata,omitempty"` } @@ -199,16 +199,16 @@ func convertToDecimal(value string) ([]byte, int32, int32) { if _, success := rat.SetString(value); !success { return nil, 0, 0 } - + // Convert to a fixed scale (e.g., 4 decimal places) scale := int32(4) precision := int32(18) // Total digits - + // Scale the rational number to integer representation multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) scaled := new(big.Int).Mul(rat.Num(), multiplier) scaled.Div(scaled, rat.Denom()) - + return scaled.Bytes(), precision, scale } @@ -224,7 +224,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}} fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}} fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}} - + // Convert precise amount to DECIMAL logical type if v.PreciseAmount != "" { if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil { @@ -235,15 +235,15 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { }}} } } - + // Convert birth date to DATE logical type fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{ DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch }}} - + fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), - IsUtc: true, + IsUtc: true, }}} fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}} @@ -255,7 +255,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), - IsUtc: true, + IsUtc: true, }}} case MetricEntry: @@ -265,7 +265,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), - IsUtc: true, + IsUtc: true, }}} case ProductView: @@ -277,7 +277,7 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}} fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: v.Timestamp.UnixMicro(), - IsUtc: true, + IsUtc: true, }}} default: @@ -441,7 +441,7 @@ func generateUserEvent() interface{} { birthMonth := 1 + rand.Intn(12) birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC) - + // Generate a precise amount as a string with 4 decimal places preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000) diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 1609dac87..6c1ac0d4c 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "math" + "math/big" "regexp" "strconv" "strings" @@ -1929,6 +1930,38 @@ func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interf return false } + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros == timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch == dateVal + } + return false + } + + // Handle DecimalValue comparison (convert to string for comparison) + if decimalField, ok := fieldValue.Kind.(*schema_pb.Value_DecimalValue); ok { + if decimalStr, ok := compareValue.(string); ok { + // Convert decimal bytes back to string for comparison + decimalValue := e.decimalToString(decimalField.DecimalValue) + return decimalValue == decimalStr + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros == timeVal + } + return false + } + // Handle numeric comparisons with type coercion fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue) @@ -1966,6 +1999,29 @@ func (e *SQLEngine) convertCompareValueToNumber(compareValue interface{}) *float return nil } +// decimalToString converts a DecimalValue back to string representation +func (e *SQLEngine) decimalToString(decimalValue *schema_pb.DecimalValue) string { + if decimalValue == nil || decimalValue.Value == nil { + return "0" + } + + // Convert bytes back to big.Int + intValue := new(big.Int).SetBytes(decimalValue.Value) + + // Convert to string with proper decimal placement + str := intValue.String() + + // Handle decimal placement based on scale + scale := int(decimalValue.Scale) + if scale > 0 && len(str) > scale { + // Insert decimal point + decimalPos := len(str) - scale + return str[:decimalPos] + "." + str[decimalPos:] + } + + return str +} + func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { // Handle string comparisons lexicographically if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { @@ -1975,6 +2031,28 @@ func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue inte return false } + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros < timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch < dateVal + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros < timeVal + } + return false + } + // Handle numeric comparisons with type coercion fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue) @@ -1995,6 +2073,28 @@ func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue i return false } + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros > timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch > dateVal + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros > timeVal + } + return false + } + // Handle numeric comparisons with type coercion fieldNum := e.convertToNumber(fieldValue) compareNum := e.convertCompareValueToNumber(compareValue)