diff --git a/weed/command/sql.go b/weed/command/sql.go index 61fed9633..0193d7ea4 100644 --- a/weed/command/sql.go +++ b/weed/command/sql.go @@ -19,15 +19,15 @@ func init() { } var cmdSql = &Command{ - UsageLine: "sql [-server=localhost:8888] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]", + UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]", Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes", Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes. Execution Modes: -- Interactive shell (default): weed sql --interactive -- Single query: weed sql --query "SELECT * FROM user_events" -- Batch from file: weed sql --file queries.sql -- Context switching: weed sql --database analytics --interactive +- Interactive shell (default): weed sql -interactive +- Single query: weed sql -query "SELECT * FROM user_events" +- Batch from file: weed sql -file queries.sql +- Context switching: weed sql -database analytics -interactive Output Formats: - table: ASCII table format (default for interactive) @@ -42,16 +42,16 @@ Features: - Database context switching Examples: - weed sql --interactive - weed sql --query "SHOW DATABASES" --output json - weed sql --file batch_queries.sql --output csv - weed sql --database analytics --query "SELECT COUNT(*) FROM metrics" - weed sql --server broker1:8888 --interactive + weed sql -interactive + weed sql -query "SHOW DATABASES" -output json + weed sql -file batch_queries.sql -output csv + weed sql -database analytics -query "SELECT COUNT(*) FROM metrics" + weed sql -master broker1:9333 -interactive `, } var ( - sqlServer = cmdSql.Flag.String("server", "localhost:8888", "SeaweedFS server address") + sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address") sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode") sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file") sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)") @@ -77,8 +77,8 @@ type SQLContext struct { } func runSql(command *Command, args []string) bool { - // Initialize SQL engine - sqlEngine := engine.NewSQLEngine(*sqlServer) + // Initialize SQL engine with master address for service discovery + sqlEngine := engine.NewSQLEngine(*sqlMaster) // Determine execution mode and output format interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "") @@ -131,7 +131,7 @@ func executeSingleQuery(ctx *SQLContext, query string) bool { return executeAndDisplay(ctx, query, false) } - fmt.Printf("Executing query against %s...\n", *sqlServer) + fmt.Printf("Executing query against %s...\n", *sqlMaster) return executeAndDisplay(ctx, query, true) } @@ -144,7 +144,7 @@ func executeFileQueries(ctx *SQLContext, filename string) bool { } if ctx.outputFormat == OutputTable && ctx.interactive { - fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlServer) + fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster) } // Split file content into individual queries (simple approach) @@ -172,7 +172,7 @@ func executeFileQueries(ctx *SQLContext, filename string) bool { func runInteractiveShell(ctx *SQLContext) bool { fmt.Println("🚀 SeaweedFS Enhanced SQL Interface") fmt.Println("Type 'help;' for help, 'exit;' to quit") - fmt.Printf("Connected to: %s\n", *sqlServer) + fmt.Printf("Connected to master: %s\n", *sqlMaster) if ctx.currentDatabase != "" { fmt.Printf("Current database: %s\n", ctx.currentDatabase) } diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 0e7a1291d..9c678df4d 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -4,11 +4,15 @@ import ( "context" "fmt" "io" + "strconv" + "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" @@ -17,32 +21,90 @@ import ( // BrokerClient handles communication with SeaweedFS MQ broker // Assumptions: -// 1. Broker discovery via filer lock mechanism (same as shell commands) +// 1. Service discovery via master server (discovers filers and brokers) // 2. gRPC connection with default timeout of 30 seconds // 3. Topics and namespaces are managed via SeaweedMessaging service type BrokerClient struct { + masterAddress string filerAddress string brokerAddress string grpcDialOption grpc.DialOption } // NewBrokerClient creates a new MQ broker client -// Assumption: Filer address is used to discover broker balancer -func NewBrokerClient(filerAddress string) *BrokerClient { +// Uses master HTTP address and converts it to gRPC address for service discovery +func NewBrokerClient(masterHTTPAddress string) *BrokerClient { + // Convert HTTP address to gRPC address (typically HTTP port + 10000) + masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + return &BrokerClient{ - filerAddress: filerAddress, + masterAddress: masterGRPCAddress, grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), } } +// convertHTTPToGRPC converts HTTP address to gRPC address +// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 +func convertHTTPToGRPC(httpAddress string) string { + if strings.Contains(httpAddress, ":") { + parts := strings.Split(httpAddress, ":") + if len(parts) == 2 { + if port, err := strconv.Atoi(parts[1]); err == nil { + return fmt.Sprintf("%s:%d", parts[0], port+10000) + } + } + } + // Fallback: return original address if conversion fails + return httpAddress +} + +// discoverFiler finds a filer from the master server +func (c *BrokerClient) discoverFiler() error { + if c.filerAddress != "" { + return nil // already discovered + } + + conn, err := grpc.Dial(c.masterAddress, c.grpcDialOption) + if err != nil { + return fmt.Errorf("failed to connect to master at %s: %v", c.masterAddress, err) + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return fmt.Errorf("failed to list filers from master: %v", err) + } + + if len(resp.ClusterNodes) == 0 { + return fmt.Errorf("no filers found in cluster") + } + + // Use the first available filer and convert HTTP address to gRPC + filerHTTPAddress := resp.ClusterNodes[0].Address + c.filerAddress = convertHTTPToGRPC(filerHTTPAddress) + + return nil +} + // findBrokerBalancer discovers the broker balancer using filer lock mechanism -// Assumption: Uses same pattern as existing shell commands +// First discovers filer from master, then uses filer to find broker balancer func (c *BrokerClient) findBrokerBalancer() error { if c.brokerAddress != "" { return nil // already found } - conn, err := grpc.Dial(c.filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + // First discover filer from master + if err := c.discoverFiler(); err != nil { + return fmt.Errorf("failed to discover filer: %v", err) + } + + conn, err := grpc.Dial(c.filerAddress, c.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to filer at %s: %v", c.filerAddress, err) } @@ -65,10 +127,11 @@ func (c *BrokerClient) findBrokerBalancer() error { } // GetFilerClient creates a filer client for accessing MQ data files -// This resolves TODO: Get real filerClient from broker connection +// Discovers filer from master if not already known func (c *BrokerClient) GetFilerClient() (filer_pb.FilerClient, error) { - if c.filerAddress == "" { - return nil, fmt.Errorf("filer address not specified") + // Ensure filer is discovered + if err := c.discoverFiler(); err != nil { + return nil, fmt.Errorf("failed to discover filer: %v", err) } return &filerClientImpl{ @@ -113,8 +176,7 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { // Get filer client to list directories under /topics filerClient, err := c.GetFilerClient() if err != nil { - // Return empty list if filer unavailable - no fallback sample data - return []string{}, nil + return []string{}, fmt.Errorf("failed to get filer client: %v", err) } var namespaces []string @@ -148,8 +210,7 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { }) if err != nil { - // Return empty list if directory listing fails - no fallback sample data - return []string{}, nil + return []string{}, fmt.Errorf("failed to list namespaces from /topics: %v", err) } // Return actual namespaces found (may be empty if no topics exist) diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 8ce164de4..9b64ac49e 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" "time" - + "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -18,15 +18,15 @@ import ( // 4. Schema evolution is tracked via RevisionId type SchemaCatalog struct { mu sync.RWMutex - + // databases maps namespace names to database metadata // Assumption: Namespace names are valid SQL database identifiers databases map[string]*DatabaseInfo - + // currentDatabase tracks the active database context (for USE database) // Assumption: Single-threaded usage per SQL session currentDatabase string - + // brokerClient handles communication with MQ broker brokerClient *BrokerClient } @@ -43,26 +43,26 @@ type DatabaseInfo struct { // 2. Schema evolution maintains backward compatibility // 3. Primary key is implicitly the message timestamp/offset type TableInfo struct { - Name string - Namespace string - Schema *schema.Schema - Columns []ColumnInfo - RevisionId uint32 + Name string + Namespace string + Schema *schema.Schema + Columns []ColumnInfo + RevisionId uint32 } // ColumnInfo represents a SQL column (MQ schema field) type ColumnInfo struct { Name string - Type string // SQL type representation - Nullable bool // Assumption: MQ fields are nullable by default + Type string // SQL type representation + Nullable bool // Assumption: MQ fields are nullable by default } // NewSchemaCatalog creates a new schema catalog -// Assumption: Catalog starts empty and is populated on-demand -func NewSchemaCatalog(filerAddress string) *SchemaCatalog { +// Uses master address for service discovery of filers and brokers +func NewSchemaCatalog(masterAddress string) *SchemaCatalog { return &SchemaCatalog{ databases: make(map[string]*DatabaseInfo), - brokerClient: NewBrokerClient(filerAddress), + brokerClient: NewBrokerClient(masterAddress), } } @@ -71,26 +71,25 @@ func NewSchemaCatalog(filerAddress string) *SchemaCatalog { func (c *SchemaCatalog) ListDatabases() []string { c.mu.RLock() defer c.mu.RUnlock() - + // Try to get real namespaces from broker first ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - + namespaces, err := c.brokerClient.ListNamespaces(ctx) if err != nil { + // Silently handle broker connection errors + // Fallback to cached databases if broker unavailable databases := make([]string, 0, len(c.databases)) for name := range c.databases { databases = append(databases, name) } - - // If no cached data, return sample data for testing - if len(databases) == 0 { - return []string{"default", "analytics", "logs"} - } + + // Return empty list if no cached data (no more sample data) return databases } - + return namespaces } @@ -98,36 +97,27 @@ func (c *SchemaCatalog) ListDatabases() []string { func (c *SchemaCatalog) ListTables(database string) ([]string, error) { c.mu.RLock() defer c.mu.RUnlock() - + // Try to get real topics from broker first ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - + topics, err := c.brokerClient.ListTopics(ctx, database) if err != nil { // Fallback to cached data if broker unavailable db, exists := c.databases[database] if !exists { - // Return sample data if no cache - switch database { - case "default": - return []string{"user_events", "system_logs"}, nil - case "analytics": - return []string{"page_views", "click_events"}, nil - case "logs": - return []string{"error_logs", "access_logs"}, nil - default: - return nil, fmt.Errorf("database '%s' not found", database) - } + // Return empty list if database not found (no more sample data) + return []string{}, nil } - + tables := make([]string, 0, len(db.Tables)) for name := range db.Tables { tables = append(tables, name) } return tables, nil } - + return topics, nil } @@ -136,17 +126,17 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) { func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { c.mu.RLock() defer c.mu.RUnlock() - + db, exists := c.databases[database] if !exists { return nil, fmt.Errorf("database '%s' not found", database) } - + tableInfo, exists := db.Tables[table] if !exists { return nil, fmt.Errorf("table '%s' not found in database '%s'", table, database) } - + return tableInfo, nil } @@ -155,7 +145,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { c.mu.Lock() defer c.mu.Unlock() - + // Ensure database exists db, exists := c.databases[namespace] if !exists { @@ -165,13 +155,13 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch } c.databases[namespace] = db } - + // Convert MQ schema to SQL table info tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) if err != nil { return fmt.Errorf("failed to convert MQ schema: %v", err) } - + db.Tables[topicName] = tableInfo return nil } @@ -183,20 +173,20 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch // 3. All fields are nullable unless specifically marked otherwise func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) - + for i, field := range mqSchema.RecordType.Fields { sqlType, err := c.convertMQFieldTypeToSQL(field.Type) if err != nil { return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) } - + columns[i] = ColumnInfo{ Name: field.Name, Type: sqlType, Nullable: true, // Assumption: MQ fields are nullable by default } } - + return &TableInfo{ Name: topicName, Namespace: namespace, @@ -245,7 +235,7 @@ func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (stri func (c *SchemaCatalog) SetCurrentDatabase(database string) error { c.mu.Lock() defer c.mu.Unlock() - + // TODO: Validate database exists in MQ broker c.currentDatabase = database return nil diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 0779cedf3..f984159a1 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -34,10 +34,10 @@ type QueryResult struct { } // NewSQLEngine creates a new SQL execution engine -// Assumption: Schema catalog is initialized with current MQ state -func NewSQLEngine(filerAddress string) *SQLEngine { +// Uses master address for service discovery and initialization +func NewSQLEngine(masterAddress string) *SQLEngine { return &SQLEngine{ - catalog: NewSchemaCatalog(filerAddress), + catalog: NewSchemaCatalog(masterAddress), } }