diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index be98d0b7c..d6216a04e 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -22,6 +22,7 @@ import ( ) // BrokerClient handles communication with SeaweedFS MQ broker +// Implements BrokerClientInterface for production use // Assumptions: // 1. Service discovery via master server (discovers filers and brokers) // 2. gRPC connection with default timeout of 30 seconds diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index ded740c37..1dc2ed652 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -7,9 +7,21 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) +// BrokerClientInterface defines the interface for broker client operations +// Both real BrokerClient and MockBrokerClient implement this interface +type BrokerClientInterface interface { + ListNamespaces(ctx context.Context) ([]string, error) + ListTopics(ctx context.Context, namespace string) ([]string, error) + GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) + GetFilerClient() (filer_pb.FilerClient, error) + ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error + DeleteTopic(ctx context.Context, namespace, topicName string) error +} + // SchemaCatalog manages the mapping between MQ topics and SQL tables // Assumptions: // 1. Each MQ namespace corresponds to a SQL database @@ -28,7 +40,7 @@ type SchemaCatalog struct { currentDatabase string // brokerClient handles communication with MQ broker - brokerClient *BrokerClient + brokerClient BrokerClientInterface // Use interface for dependency injection } // DatabaseInfo represents a SQL database (MQ namespace) @@ -66,20 +78,6 @@ func NewSchemaCatalog(masterAddress string) *SchemaCatalog { } } -// NewTestSchemaCatalog creates a schema catalog for testing with sample data -// Does not attempt to connect to real services -func NewTestSchemaCatalog() *SchemaCatalog { - catalog := &SchemaCatalog{ - databases: make(map[string]*DatabaseInfo), - currentDatabase: "default", - brokerClient: nil, // No broker client to avoid connection attempts - } - - // Pre-populate with sample data to avoid service discovery warnings - catalog.initSampleData() - return catalog -} - // ListDatabases returns all available databases (MQ namespaces) // Assumption: This would be populated from MQ broker metadata func (c *SchemaCatalog) ListDatabases() []string { diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 1f2f2e4e0..b122e1781 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -83,20 +83,6 @@ func NewSQLEngineWithCatalog(catalog *SchemaCatalog) *SQLEngine { } } -// NewTestSQLEngine creates a new SQL execution engine for testing -// Does not attempt to connect to real SeaweedFS services -func NewTestSQLEngine() *SQLEngine { - // Initialize global HTTP client if not already done - // This is needed for reading partition data from the filer - if util_http.GetGlobalHttpClient() == nil { - util_http.InitGlobalHttpClient() - } - - return &SQLEngine{ - catalog: NewTestSchemaCatalog(), - } -} - // GetCatalog returns the schema catalog for external access func (e *SQLEngine) GetCatalog() *SchemaCatalog { return e.catalog @@ -670,23 +656,18 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *sqlparser. if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { // Topic not in catalog, try to discover and register it if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { - // Only show warning if we have a real broker client (not in test mode) - if e.catalog.brokerClient != nil { - fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) - } + fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr) } } // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) - // RESOLVED TODO: Get real filerClient from broker connection + // Get filerClient from broker connection (works with both real and mock brokers) var filerClient filer_pb.FilerClient - if e.catalog.brokerClient != nil { - var filerClientErr error - filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() - if filerClientErr != nil { - // Log warning but continue with sample data fallback (only when not in test mode) - fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr) - } + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + // Log warning but continue with sample data fallback + fmt.Printf("Warning: Failed to get filer client: %v, using sample data\n", filerClientErr) } hybridScanner, err := NewHybridMessageScanner(filerClient, database, tableName) @@ -3195,11 +3176,6 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { - // Skip discovery if no broker client (testing mode) - if e.catalog.brokerClient == nil { - return fmt.Errorf("topic %s.%s not found (no broker client available)", database, tableName) - } - // First, check if topic exists by trying to get its schema from the broker/filer recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { diff --git a/weed/query/engine/mock_test.go b/weed/query/engine/mock_test.go new file mode 100644 index 000000000..d00ec1761 --- /dev/null +++ b/weed/query/engine/mock_test.go @@ -0,0 +1,154 @@ +package engine + +import ( + "context" + "testing" +) + +func TestMockBrokerClient_BasicFunctionality(t *testing.T) { + mockBroker := NewMockBrokerClient() + + // Test ListNamespaces + namespaces, err := mockBroker.ListNamespaces(context.Background()) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if len(namespaces) != 2 { + t.Errorf("Expected 2 namespaces, got %d", len(namespaces)) + } + + // Test ListTopics + topics, err := mockBroker.ListTopics(context.Background(), "default") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if len(topics) != 2 { + t.Errorf("Expected 2 topics in default namespace, got %d", len(topics)) + } + + // Test GetTopicSchema + schema, err := mockBroker.GetTopicSchema(context.Background(), "default", "user_events") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if len(schema.Fields) != 3 { + t.Errorf("Expected 3 fields in user_events schema, got %d", len(schema.Fields)) + } +} + +func TestMockBrokerClient_FailureScenarios(t *testing.T) { + mockBroker := NewMockBrokerClient() + + // Configure mock to fail + mockBroker.SetFailure(true, "simulated broker failure") + + // Test that operations fail as expected + _, err := mockBroker.ListNamespaces(context.Background()) + if err == nil { + t.Error("Expected error when mock is configured to fail") + } + + _, err = mockBroker.ListTopics(context.Background(), "default") + if err == nil { + t.Error("Expected error when mock is configured to fail") + } + + _, err = mockBroker.GetTopicSchema(context.Background(), "default", "user_events") + if err == nil { + t.Error("Expected error when mock is configured to fail") + } + + // Test that filer client also fails + _, err = mockBroker.GetFilerClient() + if err == nil { + t.Error("Expected error when mock is configured to fail") + } + + // Reset mock to working state + mockBroker.SetFailure(false, "") + + // Test that operations work again + namespaces, err := mockBroker.ListNamespaces(context.Background()) + if err != nil { + t.Errorf("Expected no error after resetting mock, got %v", err) + } + if len(namespaces) == 0 { + t.Error("Expected namespaces after resetting mock") + } +} + +func TestMockBrokerClient_TopicManagement(t *testing.T) { + mockBroker := NewMockBrokerClient() + + // Test ConfigureTopic (add a new topic) + err := mockBroker.ConfigureTopic(context.Background(), "test", "new-topic", 1, nil) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Verify the topic was added + topics, err := mockBroker.ListTopics(context.Background(), "test") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + foundNewTopic := false + for _, topic := range topics { + if topic == "new-topic" { + foundNewTopic = true + break + } + } + if !foundNewTopic { + t.Error("Expected new-topic to be in the topics list") + } + + // Test DeleteTopic + err = mockBroker.DeleteTopic(context.Background(), "test", "new-topic") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Verify the topic was removed + topics, err = mockBroker.ListTopics(context.Background(), "test") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + for _, topic := range topics { + if topic == "new-topic" { + t.Error("Expected new-topic to be removed from topics list") + } + } +} + +func TestSQLEngineWithMockBrokerClient_ErrorHandling(t *testing.T) { + // Create an engine with a failing mock broker + mockBroker := NewMockBrokerClient() + mockBroker.SetFailure(true, "mock broker unavailable") + + catalog := &SchemaCatalog{ + databases: make(map[string]*DatabaseInfo), + currentDatabase: "default", + brokerClient: mockBroker, + } + + engine := &SQLEngine{catalog: catalog} + + // Test that queries fail gracefully with proper error messages + result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM nonexistent_topic") + + // ExecuteSQL itself should not return an error, but the result should contain an error + if err != nil { + // If ExecuteSQL returns an error, that's also acceptable for this test + t.Logf("ExecuteSQL returned error (acceptable): %v", err) + return + } + + // Should have an error in the result when broker is unavailable + if result.Error == nil { + t.Error("Expected error in query result when broker is unavailable") + } else { + t.Logf("Got expected error in result: %v", result.Error) + } +} diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go new file mode 100644 index 000000000..08246865e --- /dev/null +++ b/weed/query/engine/mocks_test.go @@ -0,0 +1,223 @@ +package engine + +import ( + "context" + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" +) + +// NewTestSchemaCatalog creates a schema catalog for testing with sample data +// Uses mock clients instead of real service connections +func NewTestSchemaCatalog() *SchemaCatalog { + catalog := &SchemaCatalog{ + databases: make(map[string]*DatabaseInfo), + currentDatabase: "default", + brokerClient: NewMockBrokerClient(), // Use mock instead of nil + } + + // Pre-populate with sample data to avoid service discovery requirements + catalog.initSampleData() + return catalog +} + +// NewTestSQLEngine creates a new SQL execution engine for testing +// Does not attempt to connect to real SeaweedFS services +func NewTestSQLEngine() *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() + } + + return &SQLEngine{ + catalog: NewTestSchemaCatalog(), + } +} + +// MockBrokerClient implements BrokerClient interface for testing +type MockBrokerClient struct { + namespaces []string + topics map[string][]string // namespace -> topics + schemas map[string]*schema_pb.RecordType // "namespace.topic" -> schema + shouldFail bool + failMessage string +} + +// NewMockBrokerClient creates a new mock broker client with sample data +func NewMockBrokerClient() *MockBrokerClient { + client := &MockBrokerClient{ + namespaces: []string{"default", "test"}, + topics: map[string][]string{ + "default": {"user_events", "system_logs"}, + "test": {"test-topic"}, + }, + schemas: make(map[string]*schema_pb.RecordType), + } + + // Add sample schemas + client.schemas["default.user_events"] = &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "user_id", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + {Name: "event_type", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + {Name: "data", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + }, + } + + client.schemas["default.system_logs"] = &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "level", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + {Name: "message", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + {Name: "service", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + }, + } + + client.schemas["test.test-topic"] = &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + {Name: "id", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}}, + {Name: "name", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}}, + {Name: "value", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}}, + }, + } + + return client +} + +// SetFailure configures the mock to fail with the given message +func (m *MockBrokerClient) SetFailure(shouldFail bool, message string) { + m.shouldFail = shouldFail + m.failMessage = message +} + +// ListNamespaces returns the mock namespaces +func (m *MockBrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { + if m.shouldFail { + return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + } + return m.namespaces, nil +} + +// ListTopics returns the mock topics for a namespace +func (m *MockBrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) { + if m.shouldFail { + return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + if topics, exists := m.topics[namespace]; exists { + return topics, nil + } + return []string{}, nil +} + +// GetTopicSchema returns the mock schema for a topic +func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) { + if m.shouldFail { + return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + key := fmt.Sprintf("%s.%s", namespace, topic) + if schema, exists := m.schemas[key]; exists { + return schema, nil + } + return nil, fmt.Errorf("topic %s not found", key) +} + +// GetFilerClient returns a mock filer client +func (m *MockBrokerClient) GetFilerClient() (filer_pb.FilerClient, error) { + if m.shouldFail { + return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + } + return NewMockFilerClient(), nil +} + +// MockFilerClient implements filer_pb.FilerClient interface for testing +type MockFilerClient struct { + shouldFail bool + failMessage string +} + +// NewMockFilerClient creates a new mock filer client +func NewMockFilerClient() *MockFilerClient { + return &MockFilerClient{} +} + +// SetFailure configures the mock to fail with the given message +func (m *MockFilerClient) SetFailure(shouldFail bool, message string) { + m.shouldFail = shouldFail + m.failMessage = message +} + +// WithFilerClient executes a function with a mock filer client +func (m *MockFilerClient) WithFilerClient(followRedirect bool, fn func(client filer_pb.SeaweedFilerClient) error) error { + if m.shouldFail { + return fmt.Errorf("mock filer failure: %s", m.failMessage) + } + + // For testing, we can just return success since the actual filer operations + // are not critical for SQL engine unit tests + return nil +} + +// AdjustedUrl implements the FilerClient interface (mock implementation) +func (m *MockFilerClient) AdjustedUrl(location *filer_pb.Location) string { + if location != nil && location.Url != "" { + return location.Url + } + return "mock://localhost:8080" +} + +// GetDataCenter implements the FilerClient interface (mock implementation) +func (m *MockFilerClient) GetDataCenter() string { + return "mock-datacenter" +} + +// ConfigureTopic creates or updates a topic configuration (mock implementation) +func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { + if m.shouldFail { + return fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + // Store the schema in our mock data + key := fmt.Sprintf("%s.%s", namespace, topicName) + m.schemas[key] = recordType + + // Add to topics list if not already present + if topics, exists := m.topics[namespace]; exists { + for _, topic := range topics { + if topic == topicName { + return nil // Already exists + } + } + m.topics[namespace] = append(topics, topicName) + } else { + m.topics[namespace] = []string{topicName} + } + + return nil +} + +// DeleteTopic removes a topic and all its data (mock implementation) +func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error { + if m.shouldFail { + return fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + // Remove from schemas + key := fmt.Sprintf("%s.%s", namespace, topicName) + delete(m.schemas, key) + + // Remove from topics list + if topics, exists := m.topics[namespace]; exists { + newTopics := make([]string, 0, len(topics)) + for _, topic := range topics { + if topic != topicName { + newTopics = append(newTopics, topic) + } + } + m.topics[namespace] = newTopics + } + + return nil +}