You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							451 lines
						
					
					
						
							14 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							451 lines
						
					
					
						
							14 KiB
						
					
					
				
								package engine
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/schema"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// BrokerClientInterface defines the interface for broker client operations
							 | 
						|
								// Both real BrokerClient and MockBrokerClient implement this interface
							 | 
						|
								type BrokerClientInterface interface {
							 | 
						|
									ListNamespaces(ctx context.Context) ([]string, error)
							 | 
						|
									ListTopics(ctx context.Context, namespace string) ([]string, error)
							 | 
						|
									GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) // Returns (flatSchema, keyColumns, schemaFormat, error)
							 | 
						|
									ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error
							 | 
						|
									GetFilerClient() (filer_pb.FilerClient, error)
							 | 
						|
									DeleteTopic(ctx context.Context, namespace, topicName string) error
							 | 
						|
									// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
							 | 
						|
									// This prevents double-counting when combining with disk-based data
							 | 
						|
									GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SchemaCatalog manages the mapping between MQ topics and SQL tables
							 | 
						|
								// Assumptions:
							 | 
						|
								// 1. Each MQ namespace corresponds to a SQL database
							 | 
						|
								// 2. Each MQ topic corresponds to a SQL table
							 | 
						|
								// 3. Topic schemas are cached for performance
							 | 
						|
								// 4. Schema evolution is tracked via RevisionId
							 | 
						|
								type SchemaCatalog struct {
							 | 
						|
									mu sync.RWMutex
							 | 
						|
								
							 | 
						|
									// databases maps namespace names to database metadata
							 | 
						|
									// Assumption: Namespace names are valid SQL database identifiers
							 | 
						|
									databases map[string]*DatabaseInfo
							 | 
						|
								
							 | 
						|
									// currentDatabase tracks the active database context (for USE database)
							 | 
						|
									// Assumption: Single-threaded usage per SQL session
							 | 
						|
									currentDatabase string
							 | 
						|
								
							 | 
						|
									// brokerClient handles communication with MQ broker
							 | 
						|
									brokerClient BrokerClientInterface // Use interface for dependency injection
							 | 
						|
								
							 | 
						|
									// defaultPartitionCount is the default number of partitions for new topics
							 | 
						|
									// Can be overridden in CREATE TABLE statements with PARTITION COUNT option
							 | 
						|
									defaultPartitionCount int32
							 | 
						|
								
							 | 
						|
									// cacheTTL is the time-to-live for cached database and table information
							 | 
						|
									// After this duration, cached data is considered stale and will be refreshed
							 | 
						|
									cacheTTL time.Duration
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// DatabaseInfo represents a SQL database (MQ namespace)
							 | 
						|
								type DatabaseInfo struct {
							 | 
						|
									Name     string
							 | 
						|
									Tables   map[string]*TableInfo
							 | 
						|
									CachedAt time.Time // Timestamp when this database info was cached
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// TableInfo represents a SQL table (MQ topic) with schema information
							 | 
						|
								// Assumptions:
							 | 
						|
								// 1. All topic messages conform to the same schema within a revision
							 | 
						|
								// 2. Schema evolution maintains backward compatibility
							 | 
						|
								// 3. Primary key is implicitly the message timestamp/offset
							 | 
						|
								type TableInfo struct {
							 | 
						|
									Name       string
							 | 
						|
									Namespace  string
							 | 
						|
									Schema     *schema.Schema
							 | 
						|
									Columns    []ColumnInfo
							 | 
						|
									RevisionId uint32
							 | 
						|
									CachedAt   time.Time // Timestamp when this table info was cached
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ColumnInfo represents a SQL column (MQ schema field)
							 | 
						|
								type ColumnInfo struct {
							 | 
						|
									Name     string
							 | 
						|
									Type     string // SQL type representation
							 | 
						|
									Nullable bool   // Assumption: MQ fields are nullable by default
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewSchemaCatalog creates a new schema catalog
							 | 
						|
								// Uses master address for service discovery of filers and brokers
							 | 
						|
								func NewSchemaCatalog(masterAddress string) *SchemaCatalog {
							 | 
						|
									return &SchemaCatalog{
							 | 
						|
										databases:             make(map[string]*DatabaseInfo),
							 | 
						|
										brokerClient:          NewBrokerClient(masterAddress),
							 | 
						|
										defaultPartitionCount: 6,               // Default partition count, can be made configurable via environment variable
							 | 
						|
										cacheTTL:              5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ListDatabases returns all available databases (MQ namespaces)
							 | 
						|
								// Assumption: This would be populated from MQ broker metadata
							 | 
						|
								func (c *SchemaCatalog) ListDatabases() []string {
							 | 
						|
									// Clean up expired cache entries first
							 | 
						|
									c.mu.Lock()
							 | 
						|
									c.cleanExpiredDatabases()
							 | 
						|
									c.mu.Unlock()
							 | 
						|
								
							 | 
						|
									c.mu.RLock()
							 | 
						|
									defer c.mu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Try to get real namespaces from broker first
							 | 
						|
									ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
							 | 
						|
									defer cancel()
							 | 
						|
								
							 | 
						|
									namespaces, err := c.brokerClient.ListNamespaces(ctx)
							 | 
						|
									if err != nil {
							 | 
						|
										// Silently handle broker connection errors
							 | 
						|
								
							 | 
						|
										// Fallback to cached databases if broker unavailable
							 | 
						|
										databases := make([]string, 0, len(c.databases))
							 | 
						|
										for name := range c.databases {
							 | 
						|
											databases = append(databases, name)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Return empty list if no cached data (no more sample data)
							 | 
						|
										return databases
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return namespaces
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ListTables returns all tables in a database (MQ topics in namespace)
							 | 
						|
								func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
							 | 
						|
									// Clean up expired cache entries first
							 | 
						|
									c.mu.Lock()
							 | 
						|
									c.cleanExpiredDatabases()
							 | 
						|
									c.mu.Unlock()
							 | 
						|
								
							 | 
						|
									c.mu.RLock()
							 | 
						|
									defer c.mu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Try to get real topics from broker first
							 | 
						|
									ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
							 | 
						|
									defer cancel()
							 | 
						|
								
							 | 
						|
									topics, err := c.brokerClient.ListTopics(ctx, database)
							 | 
						|
									if err != nil {
							 | 
						|
										// Fallback to cached data if broker unavailable
							 | 
						|
										db, exists := c.databases[database]
							 | 
						|
										if !exists {
							 | 
						|
											// Return empty list if database not found (no more sample data)
							 | 
						|
											return []string{}, nil
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										tables := make([]string, 0, len(db.Tables))
							 | 
						|
										for name := range db.Tables {
							 | 
						|
											// Skip .meta table
							 | 
						|
											if name == ".meta" {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
											tables = append(tables, name)
							 | 
						|
										}
							 | 
						|
										return tables, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Filter out .meta table from topics
							 | 
						|
									filtered := make([]string, 0, len(topics))
							 | 
						|
									for _, topic := range topics {
							 | 
						|
										if topic != ".meta" {
							 | 
						|
											filtered = append(filtered, topic)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return filtered, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetTableInfo returns detailed schema information for a table
							 | 
						|
								// Assumption: Table exists and schema is accessible
							 | 
						|
								func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) {
							 | 
						|
									// Clean up expired cache entries first
							 | 
						|
									c.mu.Lock()
							 | 
						|
									c.cleanExpiredDatabases()
							 | 
						|
									c.mu.Unlock()
							 | 
						|
								
							 | 
						|
									c.mu.RLock()
							 | 
						|
									db, exists := c.databases[database]
							 | 
						|
									if !exists {
							 | 
						|
										c.mu.RUnlock()
							 | 
						|
										return nil, TableNotFoundError{
							 | 
						|
											Database: database,
							 | 
						|
											Table:    "",
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									tableInfo, exists := db.Tables[table]
							 | 
						|
									if !exists || c.isTableCacheExpired(tableInfo) {
							 | 
						|
										c.mu.RUnlock()
							 | 
						|
								
							 | 
						|
										// Try to refresh table info from broker if not found or expired
							 | 
						|
										ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
							 | 
						|
										defer cancel()
							 | 
						|
								
							 | 
						|
										recordType, _, _, err := c.brokerClient.GetTopicSchema(ctx, database, table)
							 | 
						|
										if err != nil {
							 | 
						|
											// If broker unavailable and we have expired cached data, return it
							 | 
						|
											if exists {
							 | 
						|
												return tableInfo, nil
							 | 
						|
											}
							 | 
						|
											// Otherwise return not found error
							 | 
						|
											return nil, TableNotFoundError{
							 | 
						|
												Database: database,
							 | 
						|
												Table:    table,
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Convert the broker response to schema and register it
							 | 
						|
										mqSchema := &schema.Schema{
							 | 
						|
											RecordType: recordType,
							 | 
						|
											RevisionId: 1, // Default revision for schema fetched from broker
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Register the refreshed schema
							 | 
						|
										err = c.RegisterTopic(database, table, mqSchema)
							 | 
						|
										if err != nil {
							 | 
						|
											// If registration fails but we have cached data, return it
							 | 
						|
											if exists {
							 | 
						|
												return tableInfo, nil
							 | 
						|
											}
							 | 
						|
											return nil, fmt.Errorf("failed to register topic schema: %v", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Get the newly registered table info
							 | 
						|
										c.mu.RLock()
							 | 
						|
										defer c.mu.RUnlock()
							 | 
						|
								
							 | 
						|
										db, exists := c.databases[database]
							 | 
						|
										if !exists {
							 | 
						|
											return nil, TableNotFoundError{
							 | 
						|
												Database: database,
							 | 
						|
												Table:    table,
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										tableInfo, exists := db.Tables[table]
							 | 
						|
										if !exists {
							 | 
						|
											return nil, TableNotFoundError{
							 | 
						|
												Database: database,
							 | 
						|
												Table:    table,
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return tableInfo, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									c.mu.RUnlock()
							 | 
						|
									return tableInfo, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// RegisterTopic adds or updates a topic's schema information in the catalog
							 | 
						|
								// Assumption: This is called when topics are created or schemas are modified
							 | 
						|
								func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error {
							 | 
						|
									c.mu.Lock()
							 | 
						|
									defer c.mu.Unlock()
							 | 
						|
								
							 | 
						|
									now := time.Now()
							 | 
						|
								
							 | 
						|
									// Ensure database exists
							 | 
						|
									db, exists := c.databases[namespace]
							 | 
						|
									if !exists {
							 | 
						|
										db = &DatabaseInfo{
							 | 
						|
											Name:     namespace,
							 | 
						|
											Tables:   make(map[string]*TableInfo),
							 | 
						|
											CachedAt: now,
							 | 
						|
										}
							 | 
						|
										c.databases[namespace] = db
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert MQ schema to SQL table info
							 | 
						|
									tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to convert MQ schema: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Set the cached timestamp for the table
							 | 
						|
									tableInfo.CachedAt = now
							 | 
						|
								
							 | 
						|
									db.Tables[topicName] = tableInfo
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertMQSchemaToTableInfo converts MQ schema to SQL table information
							 | 
						|
								// Assumptions:
							 | 
						|
								// 1. MQ scalar types map directly to SQL types
							 | 
						|
								// 2. Complex types (arrays, maps) are serialized as JSON strings
							 | 
						|
								// 3. All fields are nullable unless specifically marked otherwise
							 | 
						|
								// 4. If no schema is defined, create a default schema with system fields and _value
							 | 
						|
								func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
							 | 
						|
									// Check if the schema has a valid RecordType
							 | 
						|
									if mqSchema == nil || mqSchema.RecordType == nil {
							 | 
						|
										// For topics without schema, create a default schema with system fields and _value
							 | 
						|
										columns := []ColumnInfo{
							 | 
						|
											{Name: SW_DISPLAY_NAME_TIMESTAMP, Type: "TIMESTAMP", Nullable: true},
							 | 
						|
											{Name: SW_COLUMN_NAME_KEY, Type: "VARBINARY", Nullable: true},
							 | 
						|
											{Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(255)", Nullable: true},
							 | 
						|
											{Name: SW_COLUMN_NAME_VALUE, Type: "VARBINARY", Nullable: true},
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										return &TableInfo{
							 | 
						|
											Name:       topicName,
							 | 
						|
											Namespace:  namespace,
							 | 
						|
											Schema:     nil, // No schema defined
							 | 
						|
											Columns:    columns,
							 | 
						|
											RevisionId: 0,
							 | 
						|
										}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
							 | 
						|
								
							 | 
						|
									for i, field := range mqSchema.RecordType.Fields {
							 | 
						|
										sqlType, err := c.convertMQFieldTypeToSQL(field.Type)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										columns[i] = ColumnInfo{
							 | 
						|
											Name:     field.Name,
							 | 
						|
											Type:     sqlType,
							 | 
						|
											Nullable: true, // Assumption: MQ fields are nullable by default
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &TableInfo{
							 | 
						|
										Name:       topicName,
							 | 
						|
										Namespace:  namespace,
							 | 
						|
										Schema:     mqSchema,
							 | 
						|
										Columns:    columns,
							 | 
						|
										RevisionId: mqSchema.RevisionId,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// convertMQFieldTypeToSQL maps MQ field types to SQL types
							 | 
						|
								// Uses standard SQL type mappings with PostgreSQL compatibility
							 | 
						|
								func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) {
							 | 
						|
									switch t := fieldType.Kind.(type) {
							 | 
						|
									case *schema_pb.Type_ScalarType:
							 | 
						|
										switch t.ScalarType {
							 | 
						|
										case schema_pb.ScalarType_BOOL:
							 | 
						|
											return "BOOLEAN", nil
							 | 
						|
										case schema_pb.ScalarType_INT32:
							 | 
						|
											return "INT", nil
							 | 
						|
										case schema_pb.ScalarType_INT64:
							 | 
						|
											return "BIGINT", nil
							 | 
						|
										case schema_pb.ScalarType_FLOAT:
							 | 
						|
											return "FLOAT", nil
							 | 
						|
										case schema_pb.ScalarType_DOUBLE:
							 | 
						|
											return "DOUBLE", nil
							 | 
						|
										case schema_pb.ScalarType_BYTES:
							 | 
						|
											return "VARBINARY", nil
							 | 
						|
										case schema_pb.ScalarType_STRING:
							 | 
						|
											return "VARCHAR(255)", nil // Assumption: Default string length
							 | 
						|
										default:
							 | 
						|
											return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType)
							 | 
						|
										}
							 | 
						|
									case *schema_pb.Type_ListType:
							 | 
						|
										// Assumption: Lists are serialized as JSON strings in SQL
							 | 
						|
										return "TEXT", nil
							 | 
						|
									case *schema_pb.Type_RecordType:
							 | 
						|
										// Assumption: Nested records are serialized as JSON strings
							 | 
						|
										return "TEXT", nil
							 | 
						|
									default:
							 | 
						|
										return "", fmt.Errorf("unsupported field type: %T", t)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetCurrentDatabase sets the active database context
							 | 
						|
								// Assumption: Used for implementing "USE database" functionality
							 | 
						|
								func (c *SchemaCatalog) SetCurrentDatabase(database string) error {
							 | 
						|
									c.mu.Lock()
							 | 
						|
									defer c.mu.Unlock()
							 | 
						|
								
							 | 
						|
									// TODO: Validate database exists in MQ broker
							 | 
						|
									c.currentDatabase = database
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetCurrentDatabase returns the currently active database
							 | 
						|
								func (c *SchemaCatalog) GetCurrentDatabase() string {
							 | 
						|
									c.mu.RLock()
							 | 
						|
									defer c.mu.RUnlock()
							 | 
						|
									return c.currentDatabase
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetDefaultPartitionCount sets the default number of partitions for new topics
							 | 
						|
								func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) {
							 | 
						|
									c.mu.Lock()
							 | 
						|
									defer c.mu.Unlock()
							 | 
						|
									c.defaultPartitionCount = count
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetDefaultPartitionCount returns the default number of partitions for new topics
							 | 
						|
								func (c *SchemaCatalog) GetDefaultPartitionCount() int32 {
							 | 
						|
									c.mu.RLock()
							 | 
						|
									defer c.mu.RUnlock()
							 | 
						|
									return c.defaultPartitionCount
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SetCacheTTL sets the time-to-live for cached database and table information
							 | 
						|
								func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) {
							 | 
						|
									c.mu.Lock()
							 | 
						|
									defer c.mu.Unlock()
							 | 
						|
									c.cacheTTL = ttl
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// GetCacheTTL returns the current cache TTL setting
							 | 
						|
								func (c *SchemaCatalog) GetCacheTTL() time.Duration {
							 | 
						|
									c.mu.RLock()
							 | 
						|
									defer c.mu.RUnlock()
							 | 
						|
									return c.cacheTTL
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isDatabaseCacheExpired checks if a database's cached information has expired
							 | 
						|
								func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool {
							 | 
						|
									return time.Since(db.CachedAt) > c.cacheTTL
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isTableCacheExpired checks if a table's cached information has expired
							 | 
						|
								func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool {
							 | 
						|
									return time.Since(table.CachedAt) > c.cacheTTL
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// cleanExpiredDatabases removes expired database entries from cache
							 | 
						|
								// Note: This method assumes the caller already holds the write lock
							 | 
						|
								func (c *SchemaCatalog) cleanExpiredDatabases() {
							 | 
						|
									for name, db := range c.databases {
							 | 
						|
										if c.isDatabaseCacheExpired(db) {
							 | 
						|
											delete(c.databases, name)
							 | 
						|
										} else {
							 | 
						|
											// Clean expired tables within non-expired databases
							 | 
						|
											for tableName, table := range db.Tables {
							 | 
						|
												if c.isTableCacheExpired(table) {
							 | 
						|
													delete(db.Tables, tableName)
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CleanExpiredCache removes all expired entries from the cache
							 | 
						|
								// This method can be called externally to perform periodic cache cleanup
							 | 
						|
								func (c *SchemaCatalog) CleanExpiredCache() {
							 | 
						|
									c.mu.Lock()
							 | 
						|
									defer c.mu.Unlock()
							 | 
						|
									c.cleanExpiredDatabases()
							 | 
						|
								}
							 |