diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index faa139c04..7db7cd649 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -105,7 +105,7 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan // Count live log files (excluding those converted to parquet) parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath]) - liveLogCount, _ := opt.engine.countLiveLogRowsExcludingParquetSources(partitionPath, parquetSources) + liveLogCount, _ := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources) dataSources.LiveLogRowCount += liveLogCount } @@ -352,13 +352,17 @@ func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner * if stmt.Where == nil { // Only optimize when no complex WHERE clause fastResult, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) if canOptimize { - fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") + if isDebugMode(ctx) { + fmt.Printf("Using fast hybrid statistics for aggregation (parquet stats + live log counts)\n") + } return fastResult, nil } } // SLOW PATH: Fall back to full table scan - fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") + if isDebugMode(ctx) { + fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") + } // Build scan options for full table scan (aggregations need all data) hybridScanOptions := HybridScanOptions{ @@ -426,8 +430,8 @@ func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner partitions[i] = fmt.Sprintf("%s/%s", topicBasePath, relPartition) } - // Debug: Show the hybrid optimization results - if dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 { + // Debug: Show the hybrid optimization results (only in explain mode) + if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0) { partitionsWithLiveLogs := 0 if dataSources.LiveLogRowCount > 0 { partitionsWithLiveLogs = 1 // Simplified for now diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 8be8e2b2d..852cffc46 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -21,6 +21,20 @@ import ( "google.golang.org/protobuf/proto" ) +// debugModeKey is used to store debug mode flag in context +type debugModeKey struct{} + +// isDebugMode checks if we're in debug/explain mode +func isDebugMode(ctx context.Context) bool { + debug, ok := ctx.Value(debugModeKey{}).(bool) + return ok && debug +} + +// withDebugMode returns a context with debug mode enabled +func withDebugMode(ctx context.Context) context.Context { + return context.WithValue(ctx, debugModeKey{}, true) +} + // SQLEngine provides SQL query execution capabilities for SeaweedFS // Assumptions: // 1. MQ namespaces map directly to SQL databases @@ -111,6 +125,9 @@ func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, e // executeExplain handles EXPLAIN statements by executing the query with plan tracking func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) { + // Enable debug mode for EXPLAIN queries + ctx = withDebugMode(ctx) + // Parse the actual SQL statement stmt, err := sqlparser.Parse(actualSQL) if err != nil { @@ -470,14 +487,14 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq if canUseFastPath { // Fast path: minimal scanning (only live logs that weren't converted) - if actualScanCount, countErr := e.getActualRowsScannedForFastPath("test", tableName); countErr == nil { + if actualScanCount, countErr := e.getActualRowsScannedForFastPath(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualScanCount } else { plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning } } else { // Full scan: count all rows - if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { + if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualRowCount } else { plan.TotalRowsProcessed = int64(len(result.Rows)) @@ -486,7 +503,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *sq } } else { // With WHERE clause: full scan required - if actualRowCount, countErr := e.getTopicTotalRowCount("test", tableName); countErr == nil { + if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { plan.TotalRowsProcessed = actualRowCount } else { plan.TotalRowsProcessed = int64(len(result.Rows)) @@ -1881,18 +1898,8 @@ func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (* Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, } - // Add user data fields - for fieldName, jsonValue := range jsonData { - if fieldName == SW_COLUMN_NAME_TS || fieldName == SW_COLUMN_NAME_KEY { - continue // Skip system fields in user data - } - - // Convert JSON value to schema value (basic conversion) - schemaValue := e.convertJSONValueToSchemaValue(jsonValue) - if schemaValue != nil { - recordValue.Fields[fieldName] = schemaValue - } - } + // User data fields are already present in the protobuf-deserialized recordValue + // No additional processing needed since proto.Unmarshal already populated the Fields map return recordValue, "live_log", nil } @@ -1986,7 +1993,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map } // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet -func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { +func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err @@ -2000,8 +2007,8 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(partitionPath string actualSourceFiles = parquetSourceFiles } - // Debug: Show deduplication status - if len(actualSourceFiles) > 0 { + // Debug: Show deduplication status (only in explain mode) + if isDebugMode(ctx) && len(actualSourceFiles) > 0 { fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) } @@ -2013,7 +2020,9 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(partitionPath string // Skip files that have been converted to parquet if actualSourceFiles[entry.Name] { - fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) + if isDebugMode(ctx) { + fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) + } return nil } @@ -2164,7 +2173,7 @@ func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]stri } // getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs) -func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, error) { +func (e *SQLEngine) getTopicTotalRowCount(ctx context.Context, namespace, topicName string) (int64, error) { // Create a hybrid scanner to access parquet statistics var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { @@ -2211,7 +2220,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) } - liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) if liveLogErr == nil { totalRowCount += liveLogCount } @@ -2222,7 +2231,7 @@ func (e *SQLEngine) getTopicTotalRowCount(namespace, topicName string) (int64, e // getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations // (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only) -func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) (int64, error) { +func (e *SQLEngine) getActualRowsScannedForFastPath(ctx context.Context, namespace, topicName string) (int64, error) { // Create a hybrid scanner to access parquet statistics var filerClient filer_pb.FilerClient if e.catalog.brokerClient != nil { @@ -2264,7 +2273,7 @@ func (e *SQLEngine) getActualRowsScannedForFastPath(namespace, topicName string) } // Count only live log rows that haven't been converted to parquet - liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(partition, parquetSourceFiles) + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) if liveLogErr == nil { totalScannedRows += liveLogCount } @@ -2327,6 +2336,6 @@ func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tabl return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err) } - fmt.Printf("Auto-discovered and registered topic: %s.%s\n", database, tableName) + // Note: This is a discovery operation, not query execution, so it's okay to always log return nil } diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index c2f40ca5a..00084b6f6 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -6,10 +6,12 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/xwb1989/sqlparser" + "google.golang.org/protobuf/proto" ) // Mock implementations for testing @@ -61,7 +63,7 @@ func (m *MockSQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) return map[string]bool{"converted-log-1": true} } -func (m *MockSQLEngine) countLiveLogRowsExcludingParquetSources(partition string, parquetSources map[string]bool) (int64, error) { +func (m *MockSQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partition string, parquetSources map[string]bool) (int64, error) { if count, exists := m.mockLiveLogRowCounts[partition]; exists { return count, nil } @@ -904,3 +906,233 @@ func BenchmarkAggregationComputer_ComputeFastPathAggregations(b *testing.B) { _ = results } } + +// Tests for convertLogEntryToRecordValue - Protocol Buffer parsing bug fix +func TestSQLEngine_ConvertLogEntryToRecordValue_ValidProtobuf(t *testing.T) { + engine := NewTestSQLEngine() + + // Create a valid RecordValue protobuf with user data + originalRecord := &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 42}}, + "name": {Kind: &schema_pb.Value_StringValue{StringValue: "test-user"}}, + "score": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 95.5}}, + }, + } + + // Serialize the protobuf (this is what MQ actually stores) + protobufData, err := proto.Marshal(originalRecord) + assert.NoError(t, err) + + // Create a LogEntry with the serialized data + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, // 2021-01-01 00:00:00 UTC + PartitionKeyHash: 123, + Data: protobufData, // Protocol buffer data (not JSON!) + Key: []byte("test-key-001"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Verify no error + assert.NoError(t, err) + assert.Equal(t, "live_log", source) + assert.NotNil(t, result) + assert.NotNil(t, result.Fields) + + // Verify system columns are added correctly + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, []byte("test-key-001"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) + + // Verify user data is preserved + assert.Contains(t, result.Fields, "id") + assert.Contains(t, result.Fields, "name") + assert.Contains(t, result.Fields, "score") + assert.Equal(t, int32(42), result.Fields["id"].GetInt32Value()) + assert.Equal(t, "test-user", result.Fields["name"].GetStringValue()) + assert.Equal(t, 95.5, result.Fields["score"].GetDoubleValue()) +} + +func TestSQLEngine_ConvertLogEntryToRecordValue_InvalidProtobuf(t *testing.T) { + engine := NewTestSQLEngine() + + // Create LogEntry with invalid protobuf data (this would cause the original JSON parsing bug) + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, + PartitionKeyHash: 123, + Data: []byte{0x17, 0x00, 0xFF, 0xFE}, // Invalid protobuf data (starts with \x17 like in the original error) + Key: []byte("test-key"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Should return error for invalid protobuf + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to unmarshal log entry protobuf") + assert.Nil(t, result) + assert.Empty(t, source) +} + +func TestSQLEngine_ConvertLogEntryToRecordValue_EmptyProtobuf(t *testing.T) { + engine := NewTestSQLEngine() + + // Create a minimal valid RecordValue (empty fields) + emptyRecord := &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{}, + } + protobufData, err := proto.Marshal(emptyRecord) + assert.NoError(t, err) + + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, + PartitionKeyHash: 456, + Data: protobufData, + Key: []byte("empty-key"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Should succeed and add system columns + assert.NoError(t, err) + assert.Equal(t, "live_log", source) + assert.NotNil(t, result) + assert.NotNil(t, result.Fields) + + // Should have system columns + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, []byte("empty-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) + + // Should have no user fields + userFieldCount := 0 + for fieldName := range result.Fields { + if fieldName != SW_COLUMN_NAME_TS && fieldName != SW_COLUMN_NAME_KEY { + userFieldCount++ + } + } + assert.Equal(t, 0, userFieldCount) +} + +func TestSQLEngine_ConvertLogEntryToRecordValue_NilFieldsMap(t *testing.T) { + engine := NewTestSQLEngine() + + // Create RecordValue with nil Fields map (edge case) + recordWithNilFields := &schema_pb.RecordValue{ + Fields: nil, // This should be handled gracefully + } + protobufData, err := proto.Marshal(recordWithNilFields) + assert.NoError(t, err) + + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, + PartitionKeyHash: 789, + Data: protobufData, + Key: []byte("nil-fields-key"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Should succeed and create Fields map + assert.NoError(t, err) + assert.Equal(t, "live_log", source) + assert.NotNil(t, result) + assert.NotNil(t, result.Fields) // Should be created by the function + + // Should have system columns + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, []byte("nil-fields-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) +} + +func TestSQLEngine_ConvertLogEntryToRecordValue_SystemColumnOverride(t *testing.T) { + engine := NewTestSQLEngine() + + // Create RecordValue that already has system column names (should be overridden) + recordWithSystemCols := &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "user_field": {Kind: &schema_pb.Value_StringValue{StringValue: "user-data"}}, + SW_COLUMN_NAME_TS: {Kind: &schema_pb.Value_Int64Value{Int64Value: 999999999}}, // Should be overridden + SW_COLUMN_NAME_KEY: {Kind: &schema_pb.Value_StringValue{StringValue: "old-key"}}, // Should be overridden + }, + } + protobufData, err := proto.Marshal(recordWithSystemCols) + assert.NoError(t, err) + + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, + PartitionKeyHash: 100, + Data: protobufData, + Key: []byte("actual-key"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Should succeed + assert.NoError(t, err) + assert.Equal(t, "live_log", source) + assert.NotNil(t, result) + + // System columns should use LogEntry values, not protobuf values + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, []byte("actual-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) + + // User field should be preserved + assert.Contains(t, result.Fields, "user_field") + assert.Equal(t, "user-data", result.Fields["user_field"].GetStringValue()) +} + +func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { + engine := NewTestSQLEngine() + + // Test with various data types + complexRecord := &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "int32_field": {Kind: &schema_pb.Value_Int32Value{Int32Value: -42}}, + "int64_field": {Kind: &schema_pb.Value_Int64Value{Int64Value: 9223372036854775807}}, + "float_field": {Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}}, + "double_field": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}}, + "bool_field": {Kind: &schema_pb.Value_BoolValue{BoolValue: true}}, + "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode 🎉"}}, + "bytes_field": {Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{0x01, 0x02, 0x03}}}, + }, + } + protobufData, err := proto.Marshal(complexRecord) + assert.NoError(t, err) + + logEntry := &filer_pb.LogEntry{ + TsNs: 1609459200000000000, + PartitionKeyHash: 200, + Data: protobufData, + Key: []byte("complex-key"), + } + + // Test the conversion + result, source, err := engine.convertLogEntryToRecordValue(logEntry) + + // Should succeed + assert.NoError(t, err) + assert.Equal(t, "live_log", source) + assert.NotNil(t, result) + + // Verify all data types are preserved + assert.Equal(t, int32(-42), result.Fields["int32_field"].GetInt32Value()) + assert.Equal(t, int64(9223372036854775807), result.Fields["int64_field"].GetInt64Value()) + assert.Equal(t, float32(3.14159), result.Fields["float_field"].GetFloatValue()) + assert.Equal(t, 2.718281828, result.Fields["double_field"].GetDoubleValue()) + assert.Equal(t, true, result.Fields["bool_field"].GetBoolValue()) + assert.Equal(t, "test string with unicode 🎉", result.Fields["string_field"].GetStringValue()) + assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue()) + + // System columns should still be present + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) +}