package engine import ( "context" "fmt" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // BrokerClientInterface defines the interface for broker client operations // Both real BrokerClient and MockBrokerClient implement this interface type BrokerClientInterface interface { ListNamespaces(ctx context.Context) ([]string, error) ListTopics(ctx context.Context, namespace string) ([]string, error) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) GetFilerClient() (filer_pb.FilerClient, error) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error DeleteTopic(ctx context.Context, namespace, topicName string) error // GetUnflushedMessages returns only messages that haven't been flushed to disk yet // This prevents double-counting when combining with disk-based data GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) } // SchemaCatalog manages the mapping between MQ topics and SQL tables // Assumptions: // 1. Each MQ namespace corresponds to a SQL database // 2. Each MQ topic corresponds to a SQL table // 3. Topic schemas are cached for performance // 4. Schema evolution is tracked via RevisionId type SchemaCatalog struct { mu sync.RWMutex // databases maps namespace names to database metadata // Assumption: Namespace names are valid SQL database identifiers databases map[string]*DatabaseInfo // currentDatabase tracks the active database context (for USE database) // Assumption: Single-threaded usage per SQL session currentDatabase string // brokerClient handles communication with MQ broker brokerClient BrokerClientInterface // Use interface for dependency injection // defaultPartitionCount is the default number of partitions for new topics // Can be overridden in CREATE TABLE statements with PARTITION COUNT option defaultPartitionCount int32 // cacheTTL is the time-to-live for cached database and table information // After this duration, cached data is considered stale and will be refreshed cacheTTL time.Duration } // DatabaseInfo represents a SQL database (MQ namespace) type DatabaseInfo struct { Name string Tables map[string]*TableInfo CachedAt time.Time // Timestamp when this database info was cached } // TableInfo represents a SQL table (MQ topic) with schema information // Assumptions: // 1. All topic messages conform to the same schema within a revision // 2. Schema evolution maintains backward compatibility // 3. Primary key is implicitly the message timestamp/offset type TableInfo struct { Name string Namespace string Schema *schema.Schema Columns []ColumnInfo RevisionId uint32 CachedAt time.Time // Timestamp when this table info was cached } // ColumnInfo represents a SQL column (MQ schema field) type ColumnInfo struct { Name string Type string // SQL type representation Nullable bool // Assumption: MQ fields are nullable by default } // NewSchemaCatalog creates a new schema catalog // Uses master address for service discovery of filers and brokers func NewSchemaCatalog(masterAddress string) *SchemaCatalog { return &SchemaCatalog{ databases: make(map[string]*DatabaseInfo), brokerClient: NewBrokerClient(masterAddress), defaultPartitionCount: 6, // Default partition count, can be made configurable via environment variable cacheTTL: 5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable } } // ListDatabases returns all available databases (MQ namespaces) // Assumption: This would be populated from MQ broker metadata func (c *SchemaCatalog) ListDatabases() []string { // Clean up expired cache entries first c.mu.Lock() c.cleanExpiredDatabases() c.mu.Unlock() c.mu.RLock() defer c.mu.RUnlock() // Try to get real namespaces from broker first ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() namespaces, err := c.brokerClient.ListNamespaces(ctx) if err != nil { // Silently handle broker connection errors // Fallback to cached databases if broker unavailable databases := make([]string, 0, len(c.databases)) for name := range c.databases { databases = append(databases, name) } // Return empty list if no cached data (no more sample data) return databases } return namespaces } // ListTables returns all tables in a database (MQ topics in namespace) func (c *SchemaCatalog) ListTables(database string) ([]string, error) { // Clean up expired cache entries first c.mu.Lock() c.cleanExpiredDatabases() c.mu.Unlock() c.mu.RLock() defer c.mu.RUnlock() // Try to get real topics from broker first ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() topics, err := c.brokerClient.ListTopics(ctx, database) if err != nil { // Fallback to cached data if broker unavailable db, exists := c.databases[database] if !exists { // Return empty list if database not found (no more sample data) return []string{}, nil } tables := make([]string, 0, len(db.Tables)) for name := range db.Tables { tables = append(tables, name) } return tables, nil } return topics, nil } // GetTableInfo returns detailed schema information for a table // Assumption: Table exists and schema is accessible func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { // Clean up expired cache entries first c.mu.Lock() c.cleanExpiredDatabases() c.mu.Unlock() c.mu.RLock() db, exists := c.databases[database] if !exists { c.mu.RUnlock() return nil, TableNotFoundError{ Database: database, Table: "", } } tableInfo, exists := db.Tables[table] if !exists || c.isTableCacheExpired(tableInfo) { c.mu.RUnlock() // Try to refresh table info from broker if not found or expired ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table) if err != nil { // If broker unavailable and we have expired cached data, return it if exists { return tableInfo, nil } // Otherwise return not found error return nil, TableNotFoundError{ Database: database, Table: table, } } // Convert the broker response to schema and register it mqSchema := &schema.Schema{ RecordType: recordType, RevisionId: 1, // Default revision for schema fetched from broker } // Register the refreshed schema err = c.RegisterTopic(database, table, mqSchema) if err != nil { // If registration fails but we have cached data, return it if exists { return tableInfo, nil } return nil, fmt.Errorf("failed to register topic schema: %v", err) } // Get the newly registered table info c.mu.RLock() defer c.mu.RUnlock() db, exists := c.databases[database] if !exists { return nil, TableNotFoundError{ Database: database, Table: table, } } tableInfo, exists := db.Tables[table] if !exists { return nil, TableNotFoundError{ Database: database, Table: table, } } return tableInfo, nil } c.mu.RUnlock() return tableInfo, nil } // RegisterTopic adds or updates a topic's schema information in the catalog // Assumption: This is called when topics are created or schemas are modified func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { c.mu.Lock() defer c.mu.Unlock() now := time.Now() // Ensure database exists db, exists := c.databases[namespace] if !exists { db = &DatabaseInfo{ Name: namespace, Tables: make(map[string]*TableInfo), CachedAt: now, } c.databases[namespace] = db } // Convert MQ schema to SQL table info tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) if err != nil { return fmt.Errorf("failed to convert MQ schema: %v", err) } // Set the cached timestamp for the table tableInfo.CachedAt = now db.Tables[topicName] = tableInfo return nil } // convertMQSchemaToTableInfo converts MQ schema to SQL table information // Assumptions: // 1. MQ scalar types map directly to SQL types // 2. Complex types (arrays, maps) are serialized as JSON strings // 3. All fields are nullable unless specifically marked otherwise func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) for i, field := range mqSchema.RecordType.Fields { sqlType, err := c.convertMQFieldTypeToSQL(field.Type) if err != nil { return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) } columns[i] = ColumnInfo{ Name: field.Name, Type: sqlType, Nullable: true, // Assumption: MQ fields are nullable by default } } return &TableInfo{ Name: topicName, Namespace: namespace, Schema: mqSchema, Columns: columns, RevisionId: mqSchema.RevisionId, }, nil } // convertMQFieldTypeToSQL maps MQ field types to SQL types // Uses standard SQL type mappings with PostgreSQL compatibility func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) { switch t := fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: switch t.ScalarType { case schema_pb.ScalarType_BOOL: return "BOOLEAN", nil case schema_pb.ScalarType_INT32: return "INT", nil case schema_pb.ScalarType_INT64: return "BIGINT", nil case schema_pb.ScalarType_FLOAT: return "FLOAT", nil case schema_pb.ScalarType_DOUBLE: return "DOUBLE", nil case schema_pb.ScalarType_BYTES: return "VARBINARY", nil case schema_pb.ScalarType_STRING: return "VARCHAR(255)", nil // Assumption: Default string length default: return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType) } case *schema_pb.Type_ListType: // Assumption: Lists are serialized as JSON strings in SQL return "TEXT", nil case *schema_pb.Type_RecordType: // Assumption: Nested records are serialized as JSON strings return "TEXT", nil default: return "", fmt.Errorf("unsupported field type: %T", t) } } // SetCurrentDatabase sets the active database context // Assumption: Used for implementing "USE database" functionality func (c *SchemaCatalog) SetCurrentDatabase(database string) error { c.mu.Lock() defer c.mu.Unlock() // TODO: Validate database exists in MQ broker c.currentDatabase = database return nil } // GetCurrentDatabase returns the currently active database func (c *SchemaCatalog) GetCurrentDatabase() string { c.mu.RLock() defer c.mu.RUnlock() return c.currentDatabase } // SetDefaultPartitionCount sets the default number of partitions for new topics func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) { c.mu.Lock() defer c.mu.Unlock() c.defaultPartitionCount = count } // GetDefaultPartitionCount returns the default number of partitions for new topics func (c *SchemaCatalog) GetDefaultPartitionCount() int32 { c.mu.RLock() defer c.mu.RUnlock() return c.defaultPartitionCount } // SetCacheTTL sets the time-to-live for cached database and table information func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) { c.mu.Lock() defer c.mu.Unlock() c.cacheTTL = ttl } // GetCacheTTL returns the current cache TTL setting func (c *SchemaCatalog) GetCacheTTL() time.Duration { c.mu.RLock() defer c.mu.RUnlock() return c.cacheTTL } // isDatabaseCacheExpired checks if a database's cached information has expired func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool { return time.Since(db.CachedAt) > c.cacheTTL } // isTableCacheExpired checks if a table's cached information has expired func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool { return time.Since(table.CachedAt) > c.cacheTTL } // cleanExpiredDatabases removes expired database entries from cache // Note: This method assumes the caller already holds the write lock func (c *SchemaCatalog) cleanExpiredDatabases() { for name, db := range c.databases { if c.isDatabaseCacheExpired(db) { delete(c.databases, name) } else { // Clean expired tables within non-expired databases for tableName, table := range db.Tables { if c.isTableCacheExpired(table) { delete(db.Tables, tableName) } } } } } // CleanExpiredCache removes all expired entries from the cache // This method can be called externally to perform periodic cache cleanup func (c *SchemaCatalog) CleanExpiredCache() { c.mu.Lock() defer c.mu.Unlock() c.cleanExpiredDatabases() }