You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
419 lines
13 KiB
419 lines
13 KiB
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// BrokerClientInterface defines the interface for broker client operations
|
|
// Both real BrokerClient and MockBrokerClient implement this interface
|
|
type BrokerClientInterface interface {
|
|
ListNamespaces(ctx context.Context) ([]string, error)
|
|
ListTopics(ctx context.Context, namespace string) ([]string, error)
|
|
GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error)
|
|
GetFilerClient() (filer_pb.FilerClient, error)
|
|
ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
|
|
DeleteTopic(ctx context.Context, namespace, topicName string) error
|
|
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
|
|
// This prevents double-counting when combining with disk-based data
|
|
GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error)
|
|
}
|
|
|
|
// SchemaCatalog manages the mapping between MQ topics and SQL tables
|
|
// Assumptions:
|
|
// 1. Each MQ namespace corresponds to a SQL database
|
|
// 2. Each MQ topic corresponds to a SQL table
|
|
// 3. Topic schemas are cached for performance
|
|
// 4. Schema evolution is tracked via RevisionId
|
|
type SchemaCatalog struct {
|
|
mu sync.RWMutex
|
|
|
|
// databases maps namespace names to database metadata
|
|
// Assumption: Namespace names are valid SQL database identifiers
|
|
databases map[string]*DatabaseInfo
|
|
|
|
// currentDatabase tracks the active database context (for USE database)
|
|
// Assumption: Single-threaded usage per SQL session
|
|
currentDatabase string
|
|
|
|
// brokerClient handles communication with MQ broker
|
|
brokerClient BrokerClientInterface // Use interface for dependency injection
|
|
|
|
// defaultPartitionCount is the default number of partitions for new topics
|
|
// Can be overridden in CREATE TABLE statements with PARTITION COUNT option
|
|
defaultPartitionCount int32
|
|
|
|
// cacheTTL is the time-to-live for cached database and table information
|
|
// After this duration, cached data is considered stale and will be refreshed
|
|
cacheTTL time.Duration
|
|
}
|
|
|
|
// DatabaseInfo represents a SQL database (MQ namespace)
|
|
type DatabaseInfo struct {
|
|
Name string
|
|
Tables map[string]*TableInfo
|
|
CachedAt time.Time // Timestamp when this database info was cached
|
|
}
|
|
|
|
// TableInfo represents a SQL table (MQ topic) with schema information
|
|
// Assumptions:
|
|
// 1. All topic messages conform to the same schema within a revision
|
|
// 2. Schema evolution maintains backward compatibility
|
|
// 3. Primary key is implicitly the message timestamp/offset
|
|
type TableInfo struct {
|
|
Name string
|
|
Namespace string
|
|
Schema *schema.Schema
|
|
Columns []ColumnInfo
|
|
RevisionId uint32
|
|
CachedAt time.Time // Timestamp when this table info was cached
|
|
}
|
|
|
|
// ColumnInfo represents a SQL column (MQ schema field)
|
|
type ColumnInfo struct {
|
|
Name string
|
|
Type string // SQL type representation
|
|
Nullable bool // Assumption: MQ fields are nullable by default
|
|
}
|
|
|
|
// NewSchemaCatalog creates a new schema catalog
|
|
// Uses master address for service discovery of filers and brokers
|
|
func NewSchemaCatalog(masterAddress string) *SchemaCatalog {
|
|
return &SchemaCatalog{
|
|
databases: make(map[string]*DatabaseInfo),
|
|
brokerClient: NewBrokerClient(masterAddress),
|
|
defaultPartitionCount: 6, // Default partition count, can be made configurable via environment variable
|
|
cacheTTL: 5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable
|
|
}
|
|
}
|
|
|
|
// ListDatabases returns all available databases (MQ namespaces)
|
|
// Assumption: This would be populated from MQ broker metadata
|
|
func (c *SchemaCatalog) ListDatabases() []string {
|
|
// Clean up expired cache entries first
|
|
c.mu.Lock()
|
|
c.cleanExpiredDatabases()
|
|
c.mu.Unlock()
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Try to get real namespaces from broker first
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
namespaces, err := c.brokerClient.ListNamespaces(ctx)
|
|
if err != nil {
|
|
// Silently handle broker connection errors
|
|
|
|
// Fallback to cached databases if broker unavailable
|
|
databases := make([]string, 0, len(c.databases))
|
|
for name := range c.databases {
|
|
databases = append(databases, name)
|
|
}
|
|
|
|
// Return empty list if no cached data (no more sample data)
|
|
return databases
|
|
}
|
|
|
|
return namespaces
|
|
}
|
|
|
|
// ListTables returns all tables in a database (MQ topics in namespace)
|
|
func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
|
|
// Clean up expired cache entries first
|
|
c.mu.Lock()
|
|
c.cleanExpiredDatabases()
|
|
c.mu.Unlock()
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Try to get real topics from broker first
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
topics, err := c.brokerClient.ListTopics(ctx, database)
|
|
if err != nil {
|
|
// Fallback to cached data if broker unavailable
|
|
db, exists := c.databases[database]
|
|
if !exists {
|
|
// Return empty list if database not found (no more sample data)
|
|
return []string{}, nil
|
|
}
|
|
|
|
tables := make([]string, 0, len(db.Tables))
|
|
for name := range db.Tables {
|
|
tables = append(tables, name)
|
|
}
|
|
return tables, nil
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
// GetTableInfo returns detailed schema information for a table
|
|
// Assumption: Table exists and schema is accessible
|
|
func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) {
|
|
// Clean up expired cache entries first
|
|
c.mu.Lock()
|
|
c.cleanExpiredDatabases()
|
|
c.mu.Unlock()
|
|
|
|
c.mu.RLock()
|
|
db, exists := c.databases[database]
|
|
if !exists {
|
|
c.mu.RUnlock()
|
|
return nil, TableNotFoundError{
|
|
Database: database,
|
|
Table: "",
|
|
}
|
|
}
|
|
|
|
tableInfo, exists := db.Tables[table]
|
|
if !exists || c.isTableCacheExpired(tableInfo) {
|
|
c.mu.RUnlock()
|
|
|
|
// Try to refresh table info from broker if not found or expired
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table)
|
|
if err != nil {
|
|
// If broker unavailable and we have expired cached data, return it
|
|
if exists {
|
|
return tableInfo, nil
|
|
}
|
|
// Otherwise return not found error
|
|
return nil, TableNotFoundError{
|
|
Database: database,
|
|
Table: table,
|
|
}
|
|
}
|
|
|
|
// Convert the broker response to schema and register it
|
|
mqSchema := &schema.Schema{
|
|
RecordType: recordType,
|
|
RevisionId: 1, // Default revision for schema fetched from broker
|
|
}
|
|
|
|
// Register the refreshed schema
|
|
err = c.RegisterTopic(database, table, mqSchema)
|
|
if err != nil {
|
|
// If registration fails but we have cached data, return it
|
|
if exists {
|
|
return tableInfo, nil
|
|
}
|
|
return nil, fmt.Errorf("failed to register topic schema: %v", err)
|
|
}
|
|
|
|
// Get the newly registered table info
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
db, exists := c.databases[database]
|
|
if !exists {
|
|
return nil, TableNotFoundError{
|
|
Database: database,
|
|
Table: table,
|
|
}
|
|
}
|
|
|
|
tableInfo, exists := db.Tables[table]
|
|
if !exists {
|
|
return nil, TableNotFoundError{
|
|
Database: database,
|
|
Table: table,
|
|
}
|
|
}
|
|
|
|
return tableInfo, nil
|
|
}
|
|
|
|
c.mu.RUnlock()
|
|
return tableInfo, nil
|
|
}
|
|
|
|
// RegisterTopic adds or updates a topic's schema information in the catalog
|
|
// Assumption: This is called when topics are created or schemas are modified
|
|
func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
// Ensure database exists
|
|
db, exists := c.databases[namespace]
|
|
if !exists {
|
|
db = &DatabaseInfo{
|
|
Name: namespace,
|
|
Tables: make(map[string]*TableInfo),
|
|
CachedAt: now,
|
|
}
|
|
c.databases[namespace] = db
|
|
}
|
|
|
|
// Convert MQ schema to SQL table info
|
|
tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to convert MQ schema: %v", err)
|
|
}
|
|
|
|
// Set the cached timestamp for the table
|
|
tableInfo.CachedAt = now
|
|
|
|
db.Tables[topicName] = tableInfo
|
|
return nil
|
|
}
|
|
|
|
// convertMQSchemaToTableInfo converts MQ schema to SQL table information
|
|
// Assumptions:
|
|
// 1. MQ scalar types map directly to SQL types
|
|
// 2. Complex types (arrays, maps) are serialized as JSON strings
|
|
// 3. All fields are nullable unless specifically marked otherwise
|
|
func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
|
|
columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
|
|
|
|
for i, field := range mqSchema.RecordType.Fields {
|
|
sqlType, err := c.convertMQFieldTypeToSQL(field.Type)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err)
|
|
}
|
|
|
|
columns[i] = ColumnInfo{
|
|
Name: field.Name,
|
|
Type: sqlType,
|
|
Nullable: true, // Assumption: MQ fields are nullable by default
|
|
}
|
|
}
|
|
|
|
return &TableInfo{
|
|
Name: topicName,
|
|
Namespace: namespace,
|
|
Schema: mqSchema,
|
|
Columns: columns,
|
|
RevisionId: mqSchema.RevisionId,
|
|
}, nil
|
|
}
|
|
|
|
// convertMQFieldTypeToSQL maps MQ field types to SQL types
|
|
// Uses standard SQL type mappings with PostgreSQL compatibility
|
|
func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) {
|
|
switch t := fieldType.Kind.(type) {
|
|
case *schema_pb.Type_ScalarType:
|
|
switch t.ScalarType {
|
|
case schema_pb.ScalarType_BOOL:
|
|
return "BOOLEAN", nil
|
|
case schema_pb.ScalarType_INT32:
|
|
return "INT", nil
|
|
case schema_pb.ScalarType_INT64:
|
|
return "BIGINT", nil
|
|
case schema_pb.ScalarType_FLOAT:
|
|
return "FLOAT", nil
|
|
case schema_pb.ScalarType_DOUBLE:
|
|
return "DOUBLE", nil
|
|
case schema_pb.ScalarType_BYTES:
|
|
return "VARBINARY", nil
|
|
case schema_pb.ScalarType_STRING:
|
|
return "VARCHAR(255)", nil // Assumption: Default string length
|
|
default:
|
|
return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType)
|
|
}
|
|
case *schema_pb.Type_ListType:
|
|
// Assumption: Lists are serialized as JSON strings in SQL
|
|
return "TEXT", nil
|
|
case *schema_pb.Type_RecordType:
|
|
// Assumption: Nested records are serialized as JSON strings
|
|
return "TEXT", nil
|
|
default:
|
|
return "", fmt.Errorf("unsupported field type: %T", t)
|
|
}
|
|
}
|
|
|
|
// SetCurrentDatabase sets the active database context
|
|
// Assumption: Used for implementing "USE database" functionality
|
|
func (c *SchemaCatalog) SetCurrentDatabase(database string) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// TODO: Validate database exists in MQ broker
|
|
c.currentDatabase = database
|
|
return nil
|
|
}
|
|
|
|
// GetCurrentDatabase returns the currently active database
|
|
func (c *SchemaCatalog) GetCurrentDatabase() string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.currentDatabase
|
|
}
|
|
|
|
// SetDefaultPartitionCount sets the default number of partitions for new topics
|
|
func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.defaultPartitionCount = count
|
|
}
|
|
|
|
// GetDefaultPartitionCount returns the default number of partitions for new topics
|
|
func (c *SchemaCatalog) GetDefaultPartitionCount() int32 {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.defaultPartitionCount
|
|
}
|
|
|
|
// SetCacheTTL sets the time-to-live for cached database and table information
|
|
func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.cacheTTL = ttl
|
|
}
|
|
|
|
// GetCacheTTL returns the current cache TTL setting
|
|
func (c *SchemaCatalog) GetCacheTTL() time.Duration {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.cacheTTL
|
|
}
|
|
|
|
// isDatabaseCacheExpired checks if a database's cached information has expired
|
|
func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool {
|
|
return time.Since(db.CachedAt) > c.cacheTTL
|
|
}
|
|
|
|
// isTableCacheExpired checks if a table's cached information has expired
|
|
func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool {
|
|
return time.Since(table.CachedAt) > c.cacheTTL
|
|
}
|
|
|
|
// cleanExpiredDatabases removes expired database entries from cache
|
|
// Note: This method assumes the caller already holds the write lock
|
|
func (c *SchemaCatalog) cleanExpiredDatabases() {
|
|
for name, db := range c.databases {
|
|
if c.isDatabaseCacheExpired(db) {
|
|
delete(c.databases, name)
|
|
} else {
|
|
// Clean expired tables within non-expired databases
|
|
for tableName, table := range db.Tables {
|
|
if c.isTableCacheExpired(table) {
|
|
delete(db.Tables, tableName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// CleanExpiredCache removes all expired entries from the cache
|
|
// This method can be called externally to perform periodic cache cleanup
|
|
func (c *SchemaCatalog) CleanExpiredCache() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.cleanExpiredDatabases()
|
|
}
|