From 471ba271dcefdce976e474a5e6609bc6b42a1432 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 13:28:38 -0700 Subject: [PATCH] fix reading system fields --- weed/query/engine/engine.go | 91 +++++++++++++++++---- weed/query/engine/hybrid_message_scanner.go | 13 +++ 2 files changed, 88 insertions(+), 16 deletions(-) diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index db93f6c0d..ed3b99df9 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1008,6 +1008,7 @@ type AggregationSpec struct { Function string // COUNT, SUM, AVG, MIN, MAX Column string // Column name, or "*" for COUNT(*) Alias string // Optional alias for the result column + Distinct bool // Support for DISTINCT keyword } // AggregationResult holds the computed result of an aggregation @@ -1137,11 +1138,26 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations if spec.Column == "*" { // COUNT(*) counts all rows aggResults[i].Count = int64(len(results)) + } else if spec.Distinct { + // COUNT(DISTINCT column) counts unique non-null values + uniqueValues := make(map[string]bool) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if !e.isNullValue(value) { + // Use string representation for uniqueness check + rawValue := e.extractRawValue(value) + if rawValue != nil { + uniqueValues[fmt.Sprintf("%v", rawValue)] = true + } + } + } + } + aggResults[i].Count = int64(len(uniqueValues)) } else { // COUNT(column) counts non-null values count := int64(0) for _, result := range results { - if value := e.findColumnValue(result.Values, spec.Column); value != nil { + if value := e.findColumnValue(result, spec.Column); value != nil { if !e.isNullValue(value) { count++ } @@ -1153,7 +1169,7 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations case "SUM": sum := float64(0) for _, result := range results { - if value := e.findColumnValue(result.Values, spec.Column); value != nil { + if value := e.findColumnValue(result, spec.Column); value != nil { if numValue := e.convertToNumber(value); numValue != nil { sum += *numValue } @@ -1165,7 +1181,7 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations sum := float64(0) count := int64(0) for _, result := range results { - if value := e.findColumnValue(result.Values, spec.Column); value != nil { + if value := e.findColumnValue(result, spec.Column); value != nil { if numValue := e.convertToNumber(value); numValue != nil { sum += *numValue count++ @@ -1179,9 +1195,11 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations case "MIN": var min interface{} + var minValue *schema_pb.Value for _, result := range results { - if value := e.findColumnValue(result.Values, spec.Column); value != nil { - if min == nil || e.compareValues(value, min) < 0 { + if value := e.findColumnValue(result, spec.Column); value != nil { + if minValue == nil || e.compareValues(value, minValue) < 0 { + minValue = value min = e.extractRawValue(value) } } @@ -1190,9 +1208,11 @@ func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations case "MAX": var max interface{} + var maxValue *schema_pb.Value for _, result := range results { - if value := e.findColumnValue(result.Values, spec.Column); value != nil { - if max == nil || e.compareValues(value, max) > 0 { + if value := e.findColumnValue(result, spec.Column); value != nil { + if maxValue == nil || e.compareValues(value, maxValue) > 0 { + maxValue = value max = e.extractRawValue(value) } } @@ -1241,20 +1261,29 @@ func (e *SQLEngine) extractRawValue(value *schema_pb.Value) interface{} { return v.StringValue case *schema_pb.Value_BoolValue: return v.BoolValue + case *schema_pb.Value_BytesValue: + return string(v.BytesValue) // Convert bytes to string for comparison } return nil } -func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) int { +func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 *schema_pb.Value) int { + if value2 == nil { + return 1 // value1 > nil + } raw1 := e.extractRawValue(value1) + raw2 := e.extractRawValue(value2) if raw1 == nil { return -1 } + if raw2 == nil { + return 1 + } // Simple comparison - in a full implementation this would handle type coercion switch v1 := raw1.(type) { case int32: - if v2, ok := value2.(int32); ok { + if v2, ok := raw2.(int32); ok { if v1 < v2 { return -1 } else if v1 > v2 { @@ -1263,7 +1292,16 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i return 0 } case int64: - if v2, ok := value2.(int64); ok { + if v2, ok := raw2.(int64); ok { + if v1 < v2 { + return -1 + } else if v1 > v2 { + return 1 + } + return 0 + } + case float32: + if v2, ok := raw2.(float32); ok { if v1 < v2 { return -1 } else if v1 > v2 { @@ -1272,7 +1310,7 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i return 0 } case float64: - if v2, ok := value2.(float64); ok { + if v2, ok := raw2.(float64); ok { if v1 < v2 { return -1 } else if v1 > v2 { @@ -1281,7 +1319,7 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i return 0 } case string: - if v2, ok := value2.(string); ok { + if v2, ok := raw2.(string); ok { if v1 < v2 { return -1 } else if v1 > v2 { @@ -1289,6 +1327,15 @@ func (e *SQLEngine) compareValues(value1 *schema_pb.Value, value2 interface{}) i } return 0 } + case bool: + if v2, ok := raw2.(bool); ok { + if v1 == v2 { + return 0 + } else if v1 && !v2 { + return 1 + } + return -1 + } } return 0 } @@ -1337,15 +1384,27 @@ func (e *SQLEngine) convertRawValueToSQL(value interface{}) sqltypes.Value { } // findColumnValue performs case-insensitive lookup of column values -func (e *SQLEngine) findColumnValue(values map[string]*schema_pb.Value, columnName string) *schema_pb.Value { +// Now includes support for system columns stored in HybridScanResult +func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { + // Check system columns first (stored separately in HybridScanResult) + lowerColumnName := strings.ToLower(columnName) + switch lowerColumnName { + case "_timestamp_ns", "timestamp_ns": + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} + case "_key", "key": + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} + case "_source", "source": + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}} + } + + // Then check regular columns in Values map // First try exact match - if value, exists := values[columnName]; exists { + if value, exists := result.Values[columnName]; exists { return value } // Then try case-insensitive match - lowerColumnName := strings.ToLower(columnName) - for key, value := range values { + for key, value := range result.Values { if strings.ToLower(key) == lowerColumnName { return value } diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 3907f333f..be3949a33 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -296,6 +296,19 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { // This is an archived message from Parquet files + // ✅ FIX: Add system columns from LogEntry to RecordValue + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + return recordValue, "parquet_archive", nil }