diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 513975087..580833c00 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -288,55 +288,3 @@ func (c *SchemaCatalog) GetDefaultPartitionCount() int32 { defer c.mu.RUnlock() return c.defaultPartitionCount } - -// initSampleData populates the catalog with sample schema data for testing -func (c *SchemaCatalog) initSampleData() { - // Create sample databases and tables - c.databases["default"] = &DatabaseInfo{ - Name: "default", - Tables: map[string]*TableInfo{ - "user_events": { - Name: "user_events", - Columns: []ColumnInfo{ - {Name: "user_id", Type: "VARCHAR(100)", Nullable: true}, - {Name: "event_type", Type: "VARCHAR(50)", Nullable: true}, - {Name: "data", Type: "TEXT", Nullable: true}, - // System columns - hidden by default in SELECT * - {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, - {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, - {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, - }, - }, - "system_logs": { - Name: "system_logs", - Columns: []ColumnInfo{ - {Name: "level", Type: "VARCHAR(10)", Nullable: true}, - {Name: "message", Type: "TEXT", Nullable: true}, - {Name: "service", Type: "VARCHAR(50)", Nullable: true}, - // System columns - {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, - {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, - {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, - }, - }, - }, - } - - c.databases["test"] = &DatabaseInfo{ - Name: "test", - Tables: map[string]*TableInfo{ - "test-topic": { - Name: "test-topic", - Columns: []ColumnInfo{ - {Name: "id", Type: "INT", Nullable: true}, - {Name: "name", Type: "VARCHAR(100)", Nullable: true}, - {Name: "value", Type: "DOUBLE", Nullable: true}, - // System columns - {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, - {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, - {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, - }, - }, - }, - } -} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 472306652..a1a266966 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1554,8 +1554,14 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStat // Convert to SQL result format if selectAll { - columns = nil // Let converter determine all columns - return hybridScanner.ConvertToSQLResult(results, columns), nil + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } } // Handle custom column expressions (including arithmetic) @@ -1782,8 +1788,14 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s // Convert to SQL result format if selectAll { - columns = nil // Let converter determine all columns - return hybridScanner.ConvertToSQLResult(results, columns), nil + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } } // Handle custom column expressions (including arithmetic) @@ -1881,33 +1893,75 @@ func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, } } -// isTimeColumn checks if a column name refers to a timestamp field +// isTimeColumn checks if a column refers to a timestamp field based on actual type information +// This function uses schema metadata, not naming conventions func (e *SQLEngine) isTimeColumn(columnName string) bool { if columnName == "" { return false } - // System timestamp columns - timeColumns := []string{ - "_timestamp_ns", // SeaweedFS MQ system timestamp (nanoseconds) - "timestamp_ns", // Alternative naming - "timestamp", // Common timestamp field - "created_at", // Common creation time field - "updated_at", // Common update time field - "event_time", // Event timestamp - "log_time", // Log timestamp - "ts", // Short form + // System timestamp columns are always time columns + if columnName == SW_COLUMN_NAME_TIMESTAMP { + return true } - for _, timeCol := range timeColumns { - if strings.EqualFold(columnName, timeCol) { - return true + // For user-defined columns, check actual schema type information + if e.catalog != nil { + currentDB := e.catalog.GetCurrentDatabase() + if currentDB == "" { + currentDB = "default" + } + + // Get current table context from query execution + // Note: This is a limitation - we need table context here + // In a full implementation, this would be passed from the query context + tableInfo, err := e.getCurrentTableInfo(currentDB) + if err == nil && tableInfo != nil { + for _, col := range tableInfo.Columns { + if strings.EqualFold(col.Name, columnName) { + // Use actual SQL type to determine if this is a timestamp + return e.isSQLTypeTimestamp(col.Type) + } + } } } + // Only return true if we have explicit type information + // No guessing based on column names return false } +// isSQLTypeTimestamp checks if a SQL type string represents a timestamp type +func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { + upperType := strings.ToUpper(strings.TrimSpace(sqlType)) + + // Handle type with precision/length specifications + if idx := strings.Index(upperType, "("); idx != -1 { + upperType = upperType[:idx] + } + + switch upperType { + case "TIMESTAMP", "DATETIME": + return true + case "BIGINT": + // BIGINT could be a timestamp if it follows the pattern for timestamp storage + // This is a heuristic - in a better system, we'd have semantic type information + return false // Conservative approach - require explicit TIMESTAMP type + default: + return false + } +} + +// getCurrentTableInfo attempts to get table info for the current query context +// This is a simplified implementation - ideally table context would be passed explicitly +func (e *SQLEngine) getCurrentTableInfo(database string) (*TableInfo, error) { + // This is a limitation of the current architecture + // In practice, we'd need the table context from the current query + // For now, return nil to fallback to naming conventions + // TODO: Enhance architecture to pass table context through query execution + return nil, fmt.Errorf("table context not available in current architecture") +} + // getColumnName extracts column name from expression (handles ColName types) func (e *SQLEngine) getColumnName(expr ExprNode) string { switch exprType := expr.(type) { @@ -2757,11 +2811,11 @@ func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath if e.isSystemColumn(columnName) { // Handle system columns switch strings.ToLower(columnName) { - case "_timestamp_ns", "timestamp_ns": + case SW_COLUMN_NAME_TIMESTAMP: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}} - case "_key", "key": + case SW_COLUMN_NAME_KEY: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}} - case "_source", "source": + case SW_COLUMN_NAME_SOURCE: columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}} } } else { @@ -2894,7 +2948,7 @@ func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (* } // Add system columns - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ @@ -3378,11 +3432,11 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) // Check system columns first (stored separately in HybridScanResult) lowerColumnName := strings.ToLower(columnName) switch lowerColumnName { - case "_timestamp_ns", "timestamp_ns": + case SW_COLUMN_NAME_TIMESTAMP: return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} - case "_key", "key": + case SW_COLUMN_NAME_KEY: return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} - case "_source", "source": + case SW_COLUMN_NAME_SOURCE: return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}} } diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 19effbbd5..0d2bccc6b 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -942,9 +942,9 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ValidProtobuf(t *testing.T) { assert.NotNil(t, result.Fields) // Verify system columns are added correctly - assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) - assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()) assert.Equal(t, []byte("test-key-001"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) // Verify user data is preserved @@ -1004,15 +1004,15 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_EmptyProtobuf(t *testing.T) { assert.NotNil(t, result.Fields) // Should have system columns - assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) - assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()) assert.Equal(t, []byte("empty-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) // Should have no user fields userFieldCount := 0 for fieldName := range result.Fields { - if fieldName != SW_COLUMN_NAME_TS && fieldName != SW_COLUMN_NAME_KEY { + if fieldName != SW_COLUMN_NAME_TIMESTAMP && fieldName != SW_COLUMN_NAME_KEY { userFieldCount++ } } @@ -1046,9 +1046,9 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_NilFieldsMap(t *testing.T) { assert.NotNil(t, result.Fields) // Should be created by the function // Should have system columns - assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) - assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()) assert.Equal(t, []byte("nil-fields-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) } @@ -1058,9 +1058,9 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_SystemColumnOverride(t *testing. // Create RecordValue that already has system column names (should be overridden) recordWithSystemCols := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "user_field": {Kind: &schema_pb.Value_StringValue{StringValue: "user-data"}}, - SW_COLUMN_NAME_TS: {Kind: &schema_pb.Value_Int64Value{Int64Value: 999999999}}, // Should be overridden - SW_COLUMN_NAME_KEY: {Kind: &schema_pb.Value_StringValue{StringValue: "old-key"}}, // Should be overridden + "user_field": {Kind: &schema_pb.Value_StringValue{StringValue: "user-data"}}, + SW_COLUMN_NAME_TIMESTAMP: {Kind: &schema_pb.Value_Int64Value{Int64Value: 999999999}}, // Should be overridden + SW_COLUMN_NAME_KEY: {Kind: &schema_pb.Value_StringValue{StringValue: "old-key"}}, // Should be overridden }, } protobufData, err := proto.Marshal(recordWithSystemCols) @@ -1082,7 +1082,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_SystemColumnOverride(t *testing. assert.NotNil(t, result) // System columns should use LogEntry values, not protobuf values - assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TS].GetInt64Value()) + assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()) assert.Equal(t, []byte("actual-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()) // User field should be preserved @@ -1133,7 +1133,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue()) // System columns should still be present - assert.Contains(t, result.Fields, SW_COLUMN_NAME_TS) + assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP) assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY) } diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index a0c46cf3b..84dcf0730 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -74,7 +74,7 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok // Add system columns that MQ adds to all records recordType = schema.NewRecordTypeBuilder(recordTypeCopy). - WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() @@ -328,7 +328,7 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, } // Extract system columns for result - timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply column projection @@ -336,7 +336,7 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, if len(options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { - if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { + if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { values[name] = value } } @@ -354,7 +354,7 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, Values: values, Timestamp: timestamp, Key: key, - Source: "in_memory_broker", // Tag for debugging/analysis + Source: "live_log", // Data from broker's unflushed messages } results = append(results, result) @@ -386,7 +386,7 @@ func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessa } // Add timestamp - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs}, } @@ -521,14 +521,6 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex results = mergedResults } - // STEP 4: Fallback to sample data if no results found - // STEP 4: Fallback to sample data if no results found - // if len(results) == 0 { - // sampleResults := hms.generateSampleHybridData(options) - // results = append(results, sampleResults...) - // // Note: OFFSET and LIMIT will be applied at the end of the main scan function - // } - return results, stats, nil } @@ -595,7 +587,7 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb } // Add system columns from LogEntry - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ @@ -617,7 +609,7 @@ func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.Lo } // Add system columns (always present) - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, } recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ @@ -862,11 +854,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, row := make([]sqltypes.Value, len(columns)) for j, columnName := range columns { switch columnName { - case "_source": + case SW_COLUMN_NAME_SOURCE: row[j] = sqltypes.NewVarChar(result.Source) - case "_timestamp_ns": + case SW_COLUMN_NAME_TIMESTAMP: row[j] = sqltypes.NewInt64(result.Timestamp) - case "_key": + case SW_COLUMN_NAME_KEY: row[j] = sqltypes.NewVarBinary(string(result.Key)) default: if value, exists := result.Values[columnName]; exists { @@ -887,78 +879,89 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, } } -// generateSampleHybridData creates sample data that simulates both live and archived messages -func (hms *HybridMessageScanner) generateSampleHybridData(options HybridScanOptions) []HybridScanResult { - now := time.Now().UnixNano() - - sampleData := []HybridScanResult{ - // Simulated live log data (recent) - { - Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, - }, - Timestamp: now - 300000000000, // 5 minutes ago - Key: []byte("live-user-1003"), - Source: "live_log", - }, - { - Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_action"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"action": "click", "live": true}`}}, - }, - Timestamp: now - 120000000000, // 2 minutes ago - Key: []byte("live-user-1004"), - Source: "live_log", - }, - - // Simulated archived Parquet data (older) - { - Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, - }, - Timestamp: now - 3600000000000, // 1 hour ago - Key: []byte("archived-user-1001"), - Source: "parquet_archive", - }, - { - Values: map[string]*schema_pb.Value{ - "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, - "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, - }, - Timestamp: now - 1800000000000, // 30 minutes ago - Key: []byte("archived-user-1002"), - Source: "parquet_archive", - }, - } - - // Apply predicate filtering if specified - if options.Predicate != nil { - var filtered []HybridScanResult - for _, result := range sampleData { - // Convert to RecordValue for predicate testing - recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)} - for k, v := range result.Values { - recordValue.Fields[k] = v - } - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} - recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} +// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries +// Combines auto-discovered columns (from *) with explicitly requested columns +func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult { + if len(results) == 0 { + // For empty results, combine auto-discovered columns with explicit ones + columnSet := make(map[string]bool) - if options.Predicate(recordValue) { - filtered = append(filtered, result) - } + // Add explicit columns first + for _, col := range explicitColumns { + columnSet[col] = true + } + + // Build final column list + columns := make([]string, 0, len(columnSet)) + for col := range columnSet { + columns = append(columns, col) } - sampleData = filtered + + return &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{}, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } + } + + // Auto-discover columns from data (like SELECT *) + autoColumns := make(map[string]bool) + for _, result := range results { + for columnName := range result.Values { + autoColumns[columnName] = true + } + } + + // Combine auto-discovered and explicit columns + columnSet := make(map[string]bool) + + // Add auto-discovered columns first (regular data columns) + for col := range autoColumns { + columnSet[col] = true + } + + // Add explicit columns (may include system columns like _source) + for _, col := range explicitColumns { + columnSet[col] = true } - // Note: OFFSET and LIMIT will be applied at the end of the main scan function + // Build final column list + columns := make([]string, 0, len(columnSet)) + for col := range columnSet { + columns = append(columns, col) + } - return sampleData + // Convert to SQL rows + rows := make([][]sqltypes.Value, len(results)) + for i, result := range results { + row := make([]sqltypes.Value, len(columns)) + for j, columnName := range columns { + switch columnName { + case SW_COLUMN_NAME_TIMESTAMP: + row[j] = sqltypes.NewInt64(result.Timestamp) + case SW_COLUMN_NAME_KEY: + row[j] = sqltypes.NewVarBinary(string(result.Key)) + case SW_COLUMN_NAME_SOURCE: + row[j] = sqltypes.NewVarChar(result.Source) + default: + // Regular data column + if value, exists := result.Values[columnName]; exists { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } + } + rows[i] = row + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } } // ReadParquetStatistics efficiently reads column statistics from parquet files @@ -1428,7 +1431,7 @@ func (s *StreamingFlushedDataSource) startStreaming() { } // Extract system columns - timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply column projection @@ -1436,7 +1439,7 @@ func (s *StreamingFlushedDataSource) startStreaming() { if len(s.options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { - if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { + if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { values[name] = value } } diff --git a/weed/query/engine/hybrid_test.go b/weed/query/engine/hybrid_test.go index 2090cd04a..74ef256c7 100644 --- a/weed/query/engine/hybrid_test.go +++ b/weed/query/engine/hybrid_test.go @@ -24,20 +24,17 @@ func TestSQLEngine_HybridSelectBasic(t *testing.T) { t.Error("Expected columns in result") } + // In mock environment, we only get live_log data from unflushed messages + // parquet_archive data would come from parquet files in a real system if len(result.Rows) == 0 { t.Error("Expected rows in result") } - // Should have both live and archived data (4 sample records) - if len(result.Rows) != 4 { - t.Errorf("Expected 4 rows (2 live + 2 archived), got %d", len(result.Rows)) - } - // Check that we have the _source column showing data source hasSourceColumn := false sourceColumnIndex := -1 for i, column := range result.Columns { - if column == "_source" { + if column == SW_COLUMN_NAME_SOURCE { hasSourceColumn = true sourceColumnIndex = i break @@ -48,19 +45,18 @@ func TestSQLEngine_HybridSelectBasic(t *testing.T) { t.Skip("_source column not available in fallback mode - test requires real SeaweedFS cluster") } - // Verify we have both data sources + // Verify we have the expected data sources (in mock environment, only live_log) if hasSourceColumn && sourceColumnIndex >= 0 { foundLiveLog := false - foundParquetArchive := false for _, row := range result.Rows { if sourceColumnIndex < len(row) { source := row[sourceColumnIndex].ToString() if source == "live_log" { foundLiveLog = true - } else if source == "parquet_archive" { - foundParquetArchive = true } + // In mock environment, all data comes from unflushed messages (live_log) + // In a real system, we would also see parquet_archive from parquet files } } @@ -68,11 +64,7 @@ func TestSQLEngine_HybridSelectBasic(t *testing.T) { t.Error("Expected to find live_log data source in results") } - if !foundParquetArchive { - t.Error("Expected to find parquet_archive data source in results") - } - - t.Logf("Found both live_log and parquet_archive data sources") + t.Logf("Found live_log data source from unflushed messages") } } diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go index af27d295f..9bd2319f6 100644 --- a/weed/query/engine/mocks_test.go +++ b/weed/query/engine/mocks_test.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/protobuf/proto" ) // NewTestSchemaCatalog creates a schema catalog for testing with sample data @@ -21,10 +22,63 @@ func NewTestSchemaCatalog() *SchemaCatalog { } // Pre-populate with sample data to avoid service discovery requirements - catalog.initSampleData() + initTestSampleData(catalog) return catalog } +// initTestSampleData populates the catalog with sample schema data for testing +// This function is only available in test builds and not in production +func initTestSampleData(c *SchemaCatalog) { + // Create sample databases and tables + c.databases["default"] = &DatabaseInfo{ + Name: "default", + Tables: map[string]*TableInfo{ + "user_events": { + Name: "user_events", + Columns: []ColumnInfo{ + {Name: "user_id", Type: "VARCHAR(100)", Nullable: true}, + {Name: "event_type", Type: "VARCHAR(50)", Nullable: true}, + {Name: "data", Type: "TEXT", Nullable: true}, + // System columns - hidden by default in SELECT * + {Name: SW_COLUMN_NAME_TIMESTAMP, Type: "BIGINT", Nullable: false}, + {Name: SW_COLUMN_NAME_KEY, Type: "VARCHAR(255)", Nullable: true}, + {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(50)", Nullable: false}, + }, + }, + "system_logs": { + Name: "system_logs", + Columns: []ColumnInfo{ + {Name: "level", Type: "VARCHAR(10)", Nullable: true}, + {Name: "message", Type: "TEXT", Nullable: true}, + {Name: "service", Type: "VARCHAR(50)", Nullable: true}, + // System columns + {Name: SW_COLUMN_NAME_TIMESTAMP, Type: "BIGINT", Nullable: false}, + {Name: SW_COLUMN_NAME_KEY, Type: "VARCHAR(255)", Nullable: true}, + {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(50)", Nullable: false}, + }, + }, + }, + } + + c.databases["test"] = &DatabaseInfo{ + Name: "test", + Tables: map[string]*TableInfo{ + "test-topic": { + Name: "test-topic", + Columns: []ColumnInfo{ + {Name: "id", Type: "INT", Nullable: true}, + {Name: "name", Type: "VARCHAR(100)", Nullable: true}, + {Name: "value", Type: "DOUBLE", Nullable: true}, + // System columns + {Name: SW_COLUMN_NAME_TIMESTAMP, Type: "BIGINT", Nullable: false}, + {Name: SW_COLUMN_NAME_KEY, Type: "VARCHAR(255)", Nullable: true}, + {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(50)", Nullable: false}, + }, + }, + }, + } +} + // NewTestSQLEngine creates a new SQL execution engine for testing // Does not attempt to connect to real SeaweedFS services func NewTestSQLEngine() *SQLEngine { @@ -225,22 +279,44 @@ func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName } // GetUnflushedMessages returns mock unflushed data for testing -// Always returns empty slice to simulate safe deduplication behavior +// Returns sample data as LogEntries to provide test data for SQL engine func (m *MockBrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { if m.shouldFail { return nil, fmt.Errorf("mock broker failed to get unflushed messages: %s", m.failMessage) } - // For testing, return empty slice to simulate: - // 1. No unflushed data available - // 2. Safe deduplication behavior (prevents double-counting) - // 3. Successful broker communication - // - // In a real implementation, this would: - // - Connect to actual broker - // - Access LocalPartition's LogBuffer - // - Use buffer_start metadata for deduplication - // - Return only truly unflushed messages + // Generate sample data as LogEntries for testing + // This provides data that looks like it came from the broker's memory buffer + allSampleData := generateSampleHybridData(topicName, HybridScanOptions{}) + + var logEntries []*filer_pb.LogEntry + for _, result := range allSampleData { + // Only return live_log entries as unflushed messages + // This matches real system behavior where unflushed messages come from broker memory + // parquet_archive data would come from parquet files, not unflushed messages + if result.Source != "live_log" { + continue + } + + // Convert sample data to protobuf LogEntry format + recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)} + for k, v := range result.Values { + recordValue.Fields[k] = v + } + + // Serialize the RecordValue + data, err := proto.Marshal(recordValue) + if err != nil { + continue // Skip invalid entries + } + + logEntry := &filer_pb.LogEntry{ + TsNs: result.Timestamp, + Key: result.Key, + Data: data, + } + logEntries = append(logEntries, logEntry) + } - return []*filer_pb.LogEntry{}, nil + return logEntries, nil } diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index 6d16110f9..faec1fbee 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -16,17 +16,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" ) -// System columns added to all MQ records -const ( - SW_COLUMN_NAME_TS = "_ts_ns" // Timestamp in nanoseconds - SW_COLUMN_NAME_KEY = "_key" // Message key -) - // ParquetScanner scans MQ topic Parquet files for SELECT queries // Assumptions: // 1. All MQ messages are stored in Parquet format in topic partitions // 2. Each partition directory contains dated Parquet files -// 3. System columns (_ts_ns, _key) are added to user schema +// 3. System columns (_timestamp_ns, _key) are added to user schema // 4. Predicate pushdown is used for efficient scanning type ParquetScanner struct { filerClient filer_pb.FilerClient @@ -68,7 +62,7 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st // Add system columns that MQ adds to all records recordType = schema.NewRecordTypeBuilder(recordType). - WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() @@ -196,7 +190,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E } // Extract system columns - timestamp := recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value() + timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() // Apply time filtering @@ -217,7 +211,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E if len(options.Columns) == 0 { // Select all columns (excluding system columns from user view) for name, value := range recordValue.Fields { - if name != SW_COLUMN_NAME_TS && name != SW_COLUMN_NAME_KEY { + if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { values[name] = value } } @@ -293,7 +287,7 @@ func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult { for k, v := range result.Values { recordValue.Fields[k] = v } - recordValue.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} if options.Predicate(recordValue) { diff --git a/weed/query/engine/select_test.go b/weed/query/engine/select_test.go index 3175572eb..f9beb786a 100644 --- a/weed/query/engine/select_test.go +++ b/weed/query/engine/select_test.go @@ -34,9 +34,10 @@ func TestSQLEngine_SelectBasic(t *testing.T) { t.Errorf("Expected %d columns, got %d", len(expectedColumns), len(result.Columns)) } - // Should have 4 sample rows (hybrid data includes both live_log and parquet_archive) - if len(result.Rows) != 4 { - t.Errorf("Expected 4 rows, got %d", len(result.Rows)) + // In mock environment, only live_log data from unflushed messages + // parquet_archive data would come from parquet files in a real system + if len(result.Rows) == 0 { + t.Error("Expected rows in result") } } diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go index e1374ab62..1cb83118a 100644 --- a/weed/query/engine/system_columns.go +++ b/weed/query/engine/system_columns.go @@ -4,12 +4,19 @@ import ( "strings" ) +// System column constants used throughout the SQL engine +const ( + SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds + SW_COLUMN_NAME_KEY = "_key" // Message key + SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.) +) + // isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) func (e *SQLEngine) isSystemColumn(columnName string) bool { lowerName := strings.ToLower(columnName) - return lowerName == "_timestamp_ns" || lowerName == "timestamp_ns" || - lowerName == "_key" || lowerName == "key" || - lowerName == "_source" || lowerName == "source" + return lowerName == SW_COLUMN_NAME_TIMESTAMP || + lowerName == SW_COLUMN_NAME_KEY || + lowerName == SW_COLUMN_NAME_SOURCE } // isRegularColumn checks if a column might be a regular data column (placeholder) @@ -23,7 +30,7 @@ func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map lowerName := strings.ToLower(columnName) switch lowerName { - case "_timestamp_ns", "timestamp_ns": + case SW_COLUMN_NAME_TIMESTAMP: // For timestamps, find the earliest timestamp across all files // This should match what's in the Extended["min"] metadata var minTimestamp *int64 @@ -42,12 +49,12 @@ func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map return *minTimestamp } - case "_key", "key": + case SW_COLUMN_NAME_KEY: // For keys, we'd need to read the actual parquet column stats // Fall back to scanning if not available in our current stats return nil - case "_source", "source": + case SW_COLUMN_NAME_SOURCE: // Source is always "parquet_archive" for parquet files return "parquet_archive" } @@ -60,7 +67,7 @@ func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map lowerName := strings.ToLower(columnName) switch lowerName { - case "_timestamp_ns", "timestamp_ns": + case SW_COLUMN_NAME_TIMESTAMP: // For timestamps, find the latest timestamp across all files // This should match what's in the Extended["max"] metadata var maxTimestamp *int64 @@ -79,12 +86,12 @@ func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map return *maxTimestamp } - case "_key", "key": + case SW_COLUMN_NAME_KEY: // For keys, we'd need to read the actual parquet column stats // Fall back to scanning if not available in our current stats return nil - case "_source", "source": + case SW_COLUMN_NAME_SOURCE: // Source is always "parquet_archive" for parquet files return "parquet_archive" } diff --git a/weed/query/engine/test_sample_data_test.go b/weed/query/engine/test_sample_data_test.go new file mode 100644 index 000000000..23285a3b9 --- /dev/null +++ b/weed/query/engine/test_sample_data_test.go @@ -0,0 +1,137 @@ +package engine + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// generateSampleHybridData creates sample data that simulates both live and archived messages +// This function is only used for testing and is not included in production builds +func generateSampleHybridData(topicName string, options HybridScanOptions) []HybridScanResult { + now := time.Now().UnixNano() + + // Generate different sample data based on topic name + var sampleData []HybridScanResult + + switch topicName { + case "user_events": + sampleData = []HybridScanResult{ + // Simulated live log data (recent) + { + Values: map[string]*schema_pb.Value{ + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1003}}, + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_login"}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "10.0.0.1", "live": true}`}}, + }, + Timestamp: now - 300000000000, // 5 minutes ago + Key: []byte("live-user-1003"), + Source: "live_log", + }, + { + Values: map[string]*schema_pb.Value{ + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1004}}, + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "live_action"}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"action": "click", "live": true}`}}, + }, + Timestamp: now - 120000000000, // 2 minutes ago + Key: []byte("live-user-1004"), + Source: "live_log", + }, + + // Simulated archived Parquet data (older) + { + Values: map[string]*schema_pb.Value{ + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}}, + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_login"}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1", "archived": true}`}}, + }, + Timestamp: now - 3600000000000, // 1 hour ago + Key: []byte("archived-user-1001"), + Source: "parquet_archive", + }, + { + Values: map[string]*schema_pb.Value{ + "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}}, + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "archived_logout"}}, + "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"duration": 1800, "archived": true}`}}, + }, + Timestamp: now - 1800000000000, // 30 minutes ago + Key: []byte("archived-user-1002"), + Source: "parquet_archive", + }, + } + + case "system_logs": + sampleData = []HybridScanResult{ + // Simulated live system logs (recent) + { + Values: map[string]*schema_pb.Value{ + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}}, + "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live system startup completed"}}, + "service": {Kind: &schema_pb.Value_StringValue{StringValue: "auth-service"}}, + }, + Timestamp: now - 240000000000, // 4 minutes ago + Key: []byte("live-sys-001"), + Source: "live_log", + }, + { + Values: map[string]*schema_pb.Value{ + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "WARN"}}, + "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Live high memory usage detected"}}, + "service": {Kind: &schema_pb.Value_StringValue{StringValue: "monitor-service"}}, + }, + Timestamp: now - 180000000000, // 3 minutes ago + Key: []byte("live-sys-002"), + Source: "live_log", + }, + + // Simulated archived system logs (older) + { + Values: map[string]*schema_pb.Value{ + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "ERROR"}}, + "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Archived database connection failed"}}, + "service": {Kind: &schema_pb.Value_StringValue{StringValue: "db-service"}}, + }, + Timestamp: now - 7200000000000, // 2 hours ago + Key: []byte("archived-sys-001"), + Source: "parquet_archive", + }, + { + Values: map[string]*schema_pb.Value{ + "level": {Kind: &schema_pb.Value_StringValue{StringValue: "INFO"}}, + "message": {Kind: &schema_pb.Value_StringValue{StringValue: "Archived batch job completed"}}, + "service": {Kind: &schema_pb.Value_StringValue{StringValue: "batch-service"}}, + }, + Timestamp: now - 3600000000000, // 1 hour ago + Key: []byte("archived-sys-002"), + Source: "parquet_archive", + }, + } + + default: + // For unknown topics, return empty data + sampleData = []HybridScanResult{} + } + + // Apply predicate filtering if specified + if options.Predicate != nil { + var filtered []HybridScanResult + for _, result := range sampleData { + // Convert to RecordValue for predicate testing + recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)} + for k, v := range result.Values { + recordValue.Fields[k] = v + } + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}} + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} + + if options.Predicate(recordValue) { + filtered = append(filtered, result) + } + } + sampleData = filtered + } + + return sampleData +}