From dfd0897e498b0caf30f98bfc6cc72d04dfd398b1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 18:52:22 -0700 Subject: [PATCH] improve tests --- weed/query/engine/catalog.go | 66 ++++++++++++++++++++++++ weed/query/engine/engine.go | 38 +++++++++++++- weed/query/engine/hybrid_test.go | 12 ++--- weed/query/engine/schema_parsing_test.go | 2 +- weed/query/engine/select_test.go | 10 ++-- weed/query/engine/time_filter_test.go | 8 +-- 6 files changed, 119 insertions(+), 17 deletions(-) diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 9b64ac49e..ded740c37 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -66,6 +66,20 @@ func NewSchemaCatalog(masterAddress string) *SchemaCatalog { } } +// NewTestSchemaCatalog creates a schema catalog for testing with sample data +// Does not attempt to connect to real services +func NewTestSchemaCatalog() *SchemaCatalog { + catalog := &SchemaCatalog{ + databases: make(map[string]*DatabaseInfo), + currentDatabase: "default", + brokerClient: nil, // No broker client to avoid connection attempts + } + + // Pre-populate with sample data to avoid service discovery warnings + catalog.initSampleData() + return catalog +} + // ListDatabases returns all available databases (MQ namespaces) // Assumption: This would be populated from MQ broker metadata func (c *SchemaCatalog) ListDatabases() []string { @@ -247,3 +261,55 @@ func (c *SchemaCatalog) GetCurrentDatabase() string { defer c.mu.RUnlock() return c.currentDatabase } + +// initSampleData populates the catalog with sample schema data for testing +func (c *SchemaCatalog) initSampleData() { + // Create sample databases and tables + c.databases["default"] = &DatabaseInfo{ + Name: "default", + Tables: map[string]*TableInfo{ + "user_events": { + Name: "user_events", + Columns: []ColumnInfo{ + {Name: "user_id", Type: "VARCHAR(100)", Nullable: true}, + {Name: "event_type", Type: "VARCHAR(50)", Nullable: true}, + {Name: "data", Type: "TEXT", Nullable: true}, + // System columns - hidden by default in SELECT * + {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, + {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, + {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, + }, + }, + "system_logs": { + Name: "system_logs", + Columns: []ColumnInfo{ + {Name: "level", Type: "VARCHAR(10)", Nullable: true}, + {Name: "message", Type: "TEXT", Nullable: true}, + {Name: "service", Type: "VARCHAR(50)", Nullable: true}, + // System columns + {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, + {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, + {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, + }, + }, + }, + } + + c.databases["test"] = &DatabaseInfo{ + Name: "test", + Tables: map[string]*TableInfo{ + "test-topic": { + Name: "test-topic", + Columns: []ColumnInfo{ + {Name: "id", Type: "INT", Nullable: true}, + {Name: "name", Type: "VARCHAR(100)", Nullable: true}, + {Name: "value", Type: "DOUBLE", Nullable: true}, + // System columns + {Name: "_timestamp_ns", Type: "BIGINT", Nullable: false}, + {Name: "_key", Type: "VARCHAR(255)", Nullable: true}, + {Name: "_source", Type: "VARCHAR(50)", Nullable: false}, + }, + }, + }, + } +} diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index ae9afc211..c1d64dd1b 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -69,6 +69,34 @@ func NewSQLEngine(masterAddress string) *SQLEngine { } } +// NewSQLEngineWithCatalog creates a new SQL execution engine with a custom catalog +// Used for testing or when you want to provide a pre-configured catalog +func NewSQLEngineWithCatalog(catalog *SchemaCatalog) *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() + } + + return &SQLEngine{ + catalog: catalog, + } +} + +// NewTestSQLEngine creates a new SQL execution engine for testing +// Does not attempt to connect to real SeaweedFS services +func NewTestSQLEngine() *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() + } + + return &SQLEngine{ + catalog: NewTestSchemaCatalog(), + } +} + // GetCatalog returns the schema catalog for external access func (e *SQLEngine) GetCatalog() *SchemaCatalog { return e.catalog @@ -642,7 +670,10 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { // Topic not in catalog, try to discover and register it if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { - fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) + // Only show warning if we have a real broker client (not in test mode) + if e.catalog.brokerClient != nil { + fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) + } } } @@ -3164,6 +3195,11 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { + // Skip discovery if no broker client (testing mode) + if e.catalog.brokerClient == nil { + return fmt.Errorf("topic %s.%s not found (no broker client available)", database, tableName) + } + // First, check if topic exists by trying to get its schema from the broker/filer recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { diff --git a/weed/query/engine/hybrid_test.go b/weed/query/engine/hybrid_test.go index bb73db73e..2090cd04a 100644 --- a/weed/query/engine/hybrid_test.go +++ b/weed/query/engine/hybrid_test.go @@ -8,7 +8,7 @@ import ( ) func TestSQLEngine_HybridSelectBasic(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT with _source column to show both live and archived data result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events") @@ -77,7 +77,7 @@ func TestSQLEngine_HybridSelectBasic(t *testing.T) { } func TestSQLEngine_HybridSelectWithLimit(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT with LIMIT on hybrid data result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2") @@ -96,7 +96,7 @@ func TestSQLEngine_HybridSelectWithLimit(t *testing.T) { } func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test both user_events and system_logs tables tables := []string{"user_events", "system_logs"} @@ -139,7 +139,7 @@ func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) { } func TestSQLEngine_HybridDataSource(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test that we can distinguish between live and archived data result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type, _source FROM user_events") @@ -203,7 +203,7 @@ func TestSQLEngine_HybridDataSource(t *testing.T) { } func TestSQLEngine_HybridSystemLogs(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test system_logs with hybrid data result, err := engine.ExecuteSQL(context.Background(), "SELECT level, message, service, _source FROM system_logs") @@ -269,7 +269,7 @@ func TestSQLEngine_HybridSystemLogs(t *testing.T) { } func TestSQLEngine_HybridSelectWithTimeImplications(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test that demonstrates the time-based nature of hybrid data // Live data should be more recent than archived data diff --git a/weed/query/engine/schema_parsing_test.go b/weed/query/engine/schema_parsing_test.go index 42cb3256d..03db28a9a 100644 --- a/weed/query/engine/schema_parsing_test.go +++ b/weed/query/engine/schema_parsing_test.go @@ -132,7 +132,7 @@ func TestSchemaAwareParsing(t *testing.T) { // TestSchemaAwareParsingIntegration tests the full integration with SQL engine func TestSchemaAwareParsingIntegration(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test that the enhanced schema-aware parsing doesn't break existing functionality result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events LIMIT 2") diff --git a/weed/query/engine/select_test.go b/weed/query/engine/select_test.go index 1623a4609..60f612fdb 100644 --- a/weed/query/engine/select_test.go +++ b/weed/query/engine/select_test.go @@ -8,7 +8,7 @@ import ( ) func TestSQLEngine_SelectBasic(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT * FROM table result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events") @@ -41,7 +41,7 @@ func TestSQLEngine_SelectBasic(t *testing.T) { } func TestSQLEngine_SelectWithLimit(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT with LIMIT result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2") @@ -60,7 +60,7 @@ func TestSQLEngine_SelectWithLimit(t *testing.T) { } func TestSQLEngine_SelectSpecificColumns(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT specific columns (this will fall back to sample data) result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type FROM user_events") @@ -79,7 +79,7 @@ func TestSQLEngine_SelectSpecificColumns(t *testing.T) { } func TestSQLEngine_SelectFromNonExistentTable(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test SELECT from non-existent table result, _ := engine.ExecuteSQL(context.Background(), "SELECT * FROM nonexistent_table") @@ -93,7 +93,7 @@ func TestSQLEngine_SelectFromNonExistentTable(t *testing.T) { } func TestSQLEngine_SelectDifferentTables(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test different sample tables tables := []string{"user_events", "system_logs"} diff --git a/weed/query/engine/time_filter_test.go b/weed/query/engine/time_filter_test.go index a1f012699..7497105c4 100644 --- a/weed/query/engine/time_filter_test.go +++ b/weed/query/engine/time_filter_test.go @@ -9,7 +9,7 @@ import ( // TestTimeFilterExtraction tests the extraction of time filters from WHERE clauses func TestTimeFilterExtraction(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test data: use fixed timestamps for consistent testing @@ -101,7 +101,7 @@ func TestTimeFilterExtraction(t *testing.T) { // TestTimeColumnRecognition tests the recognition of time-related columns func TestTimeColumnRecognition(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() timeColumns := []string{ "_timestamp_ns", @@ -145,7 +145,7 @@ func TestTimeColumnRecognition(t *testing.T) { // TestTimeValueParsing tests parsing of different time value formats func TestTimeValueParsing(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() testCases := []struct { name string @@ -221,7 +221,7 @@ func TestTimeValueParsing(t *testing.T) { // TestTimeFilterIntegration tests the full integration of time filters with SELECT queries func TestTimeFilterIntegration(t *testing.T) { - engine := NewSQLEngine("localhost:8888") + engine := NewTestSQLEngine() // Test that time filters are properly extracted and used in SELECT queries testQueries := []string{