|
@ -5,7 +5,7 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"sync" |
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" |
|
|
) |
|
|
) |
|
@ -18,15 +18,15 @@ import ( |
|
|
// 4. Schema evolution is tracked via RevisionId
|
|
|
// 4. Schema evolution is tracked via RevisionId
|
|
|
type SchemaCatalog struct { |
|
|
type SchemaCatalog struct { |
|
|
mu sync.RWMutex |
|
|
mu sync.RWMutex |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// databases maps namespace names to database metadata
|
|
|
// databases maps namespace names to database metadata
|
|
|
// Assumption: Namespace names are valid SQL database identifiers
|
|
|
// Assumption: Namespace names are valid SQL database identifiers
|
|
|
databases map[string]*DatabaseInfo |
|
|
databases map[string]*DatabaseInfo |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// currentDatabase tracks the active database context (for USE database)
|
|
|
// currentDatabase tracks the active database context (for USE database)
|
|
|
// Assumption: Single-threaded usage per SQL session
|
|
|
// Assumption: Single-threaded usage per SQL session
|
|
|
currentDatabase string |
|
|
currentDatabase string |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// brokerClient handles communication with MQ broker
|
|
|
// brokerClient handles communication with MQ broker
|
|
|
brokerClient *BrokerClient |
|
|
brokerClient *BrokerClient |
|
|
} |
|
|
} |
|
@ -43,26 +43,26 @@ type DatabaseInfo struct { |
|
|
// 2. Schema evolution maintains backward compatibility
|
|
|
// 2. Schema evolution maintains backward compatibility
|
|
|
// 3. Primary key is implicitly the message timestamp/offset
|
|
|
// 3. Primary key is implicitly the message timestamp/offset
|
|
|
type TableInfo struct { |
|
|
type TableInfo struct { |
|
|
Name string |
|
|
|
|
|
Namespace string |
|
|
|
|
|
Schema *schema.Schema |
|
|
|
|
|
Columns []ColumnInfo |
|
|
|
|
|
RevisionId uint32 |
|
|
|
|
|
|
|
|
Name string |
|
|
|
|
|
Namespace string |
|
|
|
|
|
Schema *schema.Schema |
|
|
|
|
|
Columns []ColumnInfo |
|
|
|
|
|
RevisionId uint32 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ColumnInfo represents a SQL column (MQ schema field)
|
|
|
// ColumnInfo represents a SQL column (MQ schema field)
|
|
|
type ColumnInfo struct { |
|
|
type ColumnInfo struct { |
|
|
Name string |
|
|
Name string |
|
|
Type string // SQL type representation
|
|
|
|
|
|
Nullable bool // Assumption: MQ fields are nullable by default
|
|
|
|
|
|
|
|
|
Type string // SQL type representation
|
|
|
|
|
|
Nullable bool // Assumption: MQ fields are nullable by default
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewSchemaCatalog creates a new schema catalog
|
|
|
// NewSchemaCatalog creates a new schema catalog
|
|
|
// Assumption: Catalog starts empty and is populated on-demand
|
|
|
|
|
|
func NewSchemaCatalog(filerAddress string) *SchemaCatalog { |
|
|
|
|
|
|
|
|
// Uses master address for service discovery of filers and brokers
|
|
|
|
|
|
func NewSchemaCatalog(masterAddress string) *SchemaCatalog { |
|
|
return &SchemaCatalog{ |
|
|
return &SchemaCatalog{ |
|
|
databases: make(map[string]*DatabaseInfo), |
|
|
databases: make(map[string]*DatabaseInfo), |
|
|
brokerClient: NewBrokerClient(filerAddress), |
|
|
|
|
|
|
|
|
brokerClient: NewBrokerClient(masterAddress), |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -71,26 +71,25 @@ func NewSchemaCatalog(filerAddress string) *SchemaCatalog { |
|
|
func (c *SchemaCatalog) ListDatabases() []string { |
|
|
func (c *SchemaCatalog) ListDatabases() []string { |
|
|
c.mu.RLock() |
|
|
c.mu.RLock() |
|
|
defer c.mu.RUnlock() |
|
|
defer c.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Try to get real namespaces from broker first
|
|
|
// Try to get real namespaces from broker first
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
defer cancel() |
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespaces, err := c.brokerClient.ListNamespaces(ctx) |
|
|
namespaces, err := c.brokerClient.ListNamespaces(ctx) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
|
|
|
// Silently handle broker connection errors
|
|
|
|
|
|
|
|
|
// Fallback to cached databases if broker unavailable
|
|
|
// Fallback to cached databases if broker unavailable
|
|
|
databases := make([]string, 0, len(c.databases)) |
|
|
databases := make([]string, 0, len(c.databases)) |
|
|
for name := range c.databases { |
|
|
for name := range c.databases { |
|
|
databases = append(databases, name) |
|
|
databases = append(databases, name) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// If no cached data, return sample data for testing
|
|
|
|
|
|
if len(databases) == 0 { |
|
|
|
|
|
return []string{"default", "analytics", "logs"} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Return empty list if no cached data (no more sample data)
|
|
|
return databases |
|
|
return databases |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return namespaces |
|
|
return namespaces |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -98,36 +97,27 @@ func (c *SchemaCatalog) ListDatabases() []string { |
|
|
func (c *SchemaCatalog) ListTables(database string) ([]string, error) { |
|
|
func (c *SchemaCatalog) ListTables(database string) ([]string, error) { |
|
|
c.mu.RLock() |
|
|
c.mu.RLock() |
|
|
defer c.mu.RUnlock() |
|
|
defer c.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Try to get real topics from broker first
|
|
|
// Try to get real topics from broker first
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
defer cancel() |
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
topics, err := c.brokerClient.ListTopics(ctx, database) |
|
|
topics, err := c.brokerClient.ListTopics(ctx, database) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
// Fallback to cached data if broker unavailable
|
|
|
// Fallback to cached data if broker unavailable
|
|
|
db, exists := c.databases[database] |
|
|
db, exists := c.databases[database] |
|
|
if !exists { |
|
|
if !exists { |
|
|
// Return sample data if no cache
|
|
|
|
|
|
switch database { |
|
|
|
|
|
case "default": |
|
|
|
|
|
return []string{"user_events", "system_logs"}, nil |
|
|
|
|
|
case "analytics": |
|
|
|
|
|
return []string{"page_views", "click_events"}, nil |
|
|
|
|
|
case "logs": |
|
|
|
|
|
return []string{"error_logs", "access_logs"}, nil |
|
|
|
|
|
default: |
|
|
|
|
|
return nil, fmt.Errorf("database '%s' not found", database) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Return empty list if database not found (no more sample data)
|
|
|
|
|
|
return []string{}, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tables := make([]string, 0, len(db.Tables)) |
|
|
tables := make([]string, 0, len(db.Tables)) |
|
|
for name := range db.Tables { |
|
|
for name := range db.Tables { |
|
|
tables = append(tables, name) |
|
|
tables = append(tables, name) |
|
|
} |
|
|
} |
|
|
return tables, nil |
|
|
return tables, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return topics, nil |
|
|
return topics, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -136,17 +126,17 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) { |
|
|
func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { |
|
|
func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { |
|
|
c.mu.RLock() |
|
|
c.mu.RLock() |
|
|
defer c.mu.RUnlock() |
|
|
defer c.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db, exists := c.databases[database] |
|
|
db, exists := c.databases[database] |
|
|
if !exists { |
|
|
if !exists { |
|
|
return nil, fmt.Errorf("database '%s' not found", database) |
|
|
return nil, fmt.Errorf("database '%s' not found", database) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tableInfo, exists := db.Tables[table] |
|
|
tableInfo, exists := db.Tables[table] |
|
|
if !exists { |
|
|
if !exists { |
|
|
return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database) |
|
|
return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return tableInfo, nil |
|
|
return tableInfo, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -155,7 +145,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) |
|
|
func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { |
|
|
func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { |
|
|
c.mu.Lock() |
|
|
c.mu.Lock() |
|
|
defer c.mu.Unlock() |
|
|
defer c.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Ensure database exists
|
|
|
// Ensure database exists
|
|
|
db, exists := c.databases[namespace] |
|
|
db, exists := c.databases[namespace] |
|
|
if !exists { |
|
|
if !exists { |
|
@ -165,13 +155,13 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch |
|
|
} |
|
|
} |
|
|
c.databases[namespace] = db |
|
|
c.databases[namespace] = db |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Convert MQ schema to SQL table info
|
|
|
// Convert MQ schema to SQL table info
|
|
|
tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) |
|
|
tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return fmt.Errorf("failed to convert MQ schema: %v", err) |
|
|
return fmt.Errorf("failed to convert MQ schema: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
db.Tables[topicName] = tableInfo |
|
|
db.Tables[topicName] = tableInfo |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
@ -183,20 +173,20 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch |
|
|
// 3. All fields are nullable unless specifically marked otherwise
|
|
|
// 3. All fields are nullable unless specifically marked otherwise
|
|
|
func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { |
|
|
func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { |
|
|
columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) |
|
|
columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i, field := range mqSchema.RecordType.Fields { |
|
|
for i, field := range mqSchema.RecordType.Fields { |
|
|
sqlType, err := c.convertMQFieldTypeToSQL(field.Type) |
|
|
sqlType, err := c.convertMQFieldTypeToSQL(field.Type) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) |
|
|
return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
columns[i] = ColumnInfo{ |
|
|
columns[i] = ColumnInfo{ |
|
|
Name: field.Name, |
|
|
Name: field.Name, |
|
|
Type: sqlType, |
|
|
Type: sqlType, |
|
|
Nullable: true, // Assumption: MQ fields are nullable by default
|
|
|
Nullable: true, // Assumption: MQ fields are nullable by default
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return &TableInfo{ |
|
|
return &TableInfo{ |
|
|
Name: topicName, |
|
|
Name: topicName, |
|
|
Namespace: namespace, |
|
|
Namespace: namespace, |
|
@ -245,7 +235,7 @@ func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (stri |
|
|
func (c *SchemaCatalog) SetCurrentDatabase(database string) error { |
|
|
func (c *SchemaCatalog) SetCurrentDatabase(database string) error { |
|
|
c.mu.Lock() |
|
|
c.mu.Lock() |
|
|
defer c.mu.Unlock() |
|
|
defer c.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: Validate database exists in MQ broker
|
|
|
// TODO: Validate database exists in MQ broker
|
|
|
c.currentDatabase = database |
|
|
c.currentDatabase = database |
|
|
return nil |
|
|
return nil |
|
|