From e59654229546c94abbe9c7e1f6507ef30524de13 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Feb 2026 16:27:08 -0800 Subject: [PATCH] Move SQL engine and PostgreSQL server to their own binaries (#8417) * Drop SQL engine and PostgreSQL server * Split SQL tooling into weed-db and weed-sql * move * fix building --- Makefile | 5 +- weed/command/db.go => cmd/weed-db/dbcmd.go | 217 ++++++++-------- cmd/weed-db/main.go | 7 + cmd/weed-sql/main.go | 7 + weed/command/sql.go => cmd/weed-sql/sqlcmd.go | 241 ++++++++++-------- test/kafka/Dockerfile.kafka-gateway | 6 +- test/kafka/test_json_timestamp.sh | 3 +- test/postgres/Dockerfile.seaweedfs | 8 +- test/postgres/SETUP_OVERVIEW.md | 2 +- test/postgres/docker-compose.yml | 2 +- test/postgres/run-tests.sh | 0 test/postgres/validate-setup.sh | 0 weed/Makefile | 8 + weed/command/command.go | 2 - weed/server/postgres/DESIGN.md | 6 +- weed/server/postgres/README.md | 4 +- 16 files changed, 293 insertions(+), 225 deletions(-) rename weed/command/db.go => cmd/weed-db/dbcmd.go (66%) create mode 100644 cmd/weed-db/main.go create mode 100644 cmd/weed-sql/main.go rename weed/command/sql.go => cmd/weed-sql/sqlcmd.go (73%) mode change 100755 => 100644 test/kafka/test_json_timestamp.sh mode change 100755 => 100644 test/postgres/run-tests.sh mode change 100755 => 100644 test/postgres/validate-setup.sh diff --git a/Makefile b/Makefile index e23c7e11a..2370e65ef 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: test admin-generate admin-build admin-clean admin-dev admin-run admin-test admin-fmt admin-help +.PHONY: test admin-generate admin-build admin-clean admin-dev admin-run admin-test admin-fmt admin-help weed-commands BINARY = weed ADMIN_DIR = weed/admin @@ -11,6 +11,9 @@ all: install install: admin-generate cd weed; go install +weed-commands: + cd weed && $(MAKE) weed-db weed-sql + warp_install: go install github.com/minio/warp@v0.7.6 diff --git a/weed/command/db.go b/cmd/weed-db/dbcmd.go similarity index 66% rename from weed/command/db.go rename to cmd/weed-db/dbcmd.go index a521da093..05fa18c5b 100644 --- a/weed/command/db.go +++ b/cmd/weed-db/dbcmd.go @@ -1,10 +1,11 @@ -package command +package main import ( "context" "crypto/tls" "encoding/json" "fmt" + "io" "os" "os/signal" "strings" @@ -13,43 +14,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/server/postgres" "github.com/seaweedfs/seaweedfs/weed/util" + flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" ) -var ( - dbOptions DBOptions -) - -type DBOptions struct { - host *string - port *int - masterAddr *string - authMethod *string - users *string - database *string - maxConns *int - idleTimeout *string - tlsCert *string - tlsKey *string -} +const usageLine = "weed-db -port=5432 -master=" -func init() { - cmdDB.Run = runDB // break init cycle - dbOptions.host = cmdDB.Flag.String("host", "localhost", "Database server host") - dbOptions.port = cmdDB.Flag.Int("port", 5432, "Database server port") - dbOptions.masterAddr = cmdDB.Flag.String("master", "localhost:9333", "SeaweedFS master server address") - dbOptions.authMethod = cmdDB.Flag.String("auth", "trust", "Authentication method: trust, password, md5") - dbOptions.users = cmdDB.Flag.String("users", "", "User credentials for auth (JSON format '{\"user1\":\"pass1\",\"user2\":\"pass2\"}' or file '@/path/to/users.json')") - dbOptions.database = cmdDB.Flag.String("database", "default", "Default database name") - dbOptions.maxConns = cmdDB.Flag.Int("max-connections", 100, "Maximum concurrent connections per server") - dbOptions.idleTimeout = cmdDB.Flag.String("idle-timeout", "1h", "Connection idle timeout") - dbOptions.tlsCert = cmdDB.Flag.String("tls-cert", "", "TLS certificate file path") - dbOptions.tlsKey = cmdDB.Flag.String("tls-key", "", "TLS private key file path") -} - -var cmdDB = &Command{ - UsageLine: "db -port=5432 -master=", - Short: "start a PostgreSQL-compatible database server for SQL queries", - Long: `Start a PostgreSQL wire protocol compatible database server that provides SQL query access to SeaweedFS. +const longHelp = `Start a PostgreSQL wire protocol compatible database server that provides SQL query access to SeaweedFS. This database server enables any PostgreSQL client, tool, or application to connect to SeaweedFS and execute SQL queries against MQ topics. It implements the PostgreSQL wire protocol for maximum @@ -58,25 +28,25 @@ compatibility with the existing PostgreSQL ecosystem. Examples: # Start database server on default port 5432 - weed db + weed-db # Start with MD5 authentication using JSON format (recommended) - weed db -auth=md5 -users='{"admin":"secret","readonly":"view123"}' + weed-db -auth=md5 -users='{"admin":"secret","readonly":"view123"}' # Start with complex passwords using JSON format - weed db -auth=md5 -users='{"admin":"pass;with;semicolons","user":"password:with:colons"}' + weed-db -auth=md5 -users='{"admin":"pass;with;semicolons","user":"password:with:colons"}' # Start with credentials from JSON file (most secure) - weed db -auth=md5 -users="@/etc/seaweedfs/users.json" + weed-db -auth=md5 -users="@/etc/seaweedfs/users.json" # Start with custom port and master - weed db -port=5433 -master=master1:9333 + weed-db -port=5433 -master=master1:9333 # Allow connections from any host - weed db -host=0.0.0.0 -port=5432 + weed-db -host=0.0.0.0 -port=5432 # Start with TLS encryption - weed db -tls-cert=server.crt -tls-key=server.key + weed-db -tls-cert=server.crt -tls-key=server.key Client Connection Examples: @@ -95,7 +65,7 @@ Programming Language Examples: # Python (psycopg2) import psycopg2 conn = psycopg2.connect( - host="localhost", port=5432, + host="localhost", port=5432, user="seaweedfs", database="default" ) @@ -116,7 +86,7 @@ Supported SQL Operations: - SELECT queries on MQ topics - DESCRIBE/DESC table_name commands - EXPLAIN query execution plans - - SHOW DATABASES/TABLES commands + - SHOW DATABASES/TABLES commands - Aggregation functions (COUNT, SUM, AVG, MIN, MAX) - WHERE clauses with filtering - System columns (_timestamp_ns, _key, _source) @@ -149,50 +119,95 @@ Performance Features: - PostgreSQL wire protocol - Query result streaming -`, +` + +type Options struct { + Host string + Port int + MasterAddr string + AuthMethod string + Users string + Database string + MaxConns int + IdleTimeout string + TLSCert string + TLSKey string } -func runDB(cmd *Command, args []string) bool { +// Run executes the weed-db CLI. +func Run(args []string) int { + fs := flag.NewFlagSet("weed-db", flag.ContinueOnError) + usageWriter := io.Writer(os.Stderr) + fs.SetOutput(usageWriter) + + var opts Options + fs.StringVar(&opts.Host, "host", "localhost", "Database server host") + fs.IntVar(&opts.Port, "port", 5432, "Database server port") + fs.StringVar(&opts.MasterAddr, "master", "localhost:9333", "SeaweedFS master server address") + fs.StringVar(&opts.AuthMethod, "auth", "trust", "Authentication method: trust, password, md5") + fs.StringVar(&opts.Users, "users", "", "User credentials for auth (JSON format '{\"user1\":\"pass1\",\"user2\":\"pass2\"}' or file '@/path/to/users.json')") + fs.StringVar(&opts.Database, "database", "default", "Default database name") + fs.IntVar(&opts.MaxConns, "max-connections", 100, "Maximum concurrent connections per server") + fs.StringVar(&opts.IdleTimeout, "idle-timeout", "1h", "Connection idle timeout") + fs.StringVar(&opts.TLSCert, "tls-cert", "", "TLS certificate file path") + fs.StringVar(&opts.TLSKey, "tls-key", "", "TLS private key file path") + + fs.Usage = func() { + fmt.Fprintf(usageWriter, "Usage: %s\n\n%s\n", usageLine, longHelp) + fmt.Fprintln(usageWriter, "Default Parameters:") + fs.PrintDefaults() + } + + if err := fs.Parse(args); err != nil { + return 2 + } + + if !runWithOptions(&opts) { + return 1 + } + return 0 +} +func runWithOptions(opts *Options) bool { util.LoadConfiguration("security", false) - // Validate options - if *dbOptions.masterAddr == "" { + // Validate options. + if opts.MasterAddr == "" { fmt.Fprintf(os.Stderr, "Error: master address is required\n") return false } - // Parse authentication method - authMethod, err := parseAuthMethod(*dbOptions.authMethod) + // Parse authentication method. + authMethod, err := parseAuthMethod(opts.AuthMethod) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } - // Parse user credentials - users, err := parseUsers(*dbOptions.users, authMethod) + // Parse user credentials. + users, err := parseUsers(opts.Users, authMethod) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } - // Parse idle timeout - idleTimeout, err := time.ParseDuration(*dbOptions.idleTimeout) + // Parse idle timeout. + idleTimeout, err := time.ParseDuration(opts.IdleTimeout) if err != nil { fmt.Fprintf(os.Stderr, "Error parsing idle timeout: %v\n", err) return false } - // Validate port number - if err := validatePortNumber(*dbOptions.port); err != nil { + // Validate port number. + if err := validatePortNumber(opts.Port); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) return false } - // Setup TLS if requested + // Setup TLS if requested. var tlsConfig *tls.Config - if *dbOptions.tlsCert != "" && *dbOptions.tlsKey != "" { - cert, err := tls.LoadX509KeyPair(*dbOptions.tlsCert, *dbOptions.tlsKey) + if opts.TLSCert != "" && opts.TLSKey != "" { + cert, err := tls.LoadX509KeyPair(opts.TLSCert, opts.TLSKey) if err != nil { fmt.Fprintf(os.Stderr, "Error loading TLS certificates: %v\n", err) return false @@ -202,34 +217,34 @@ func runDB(cmd *Command, args []string) bool { } } - // Create server configuration + // Create server configuration. config := &postgres.PostgreSQLServerConfig{ - Host: *dbOptions.host, - Port: *dbOptions.port, + Host: opts.Host, + Port: opts.Port, AuthMethod: authMethod, Users: users, - Database: *dbOptions.database, - MaxConns: *dbOptions.maxConns, + Database: opts.Database, + MaxConns: opts.MaxConns, IdleTimeout: idleTimeout, TLSConfig: tlsConfig, } - // Create database server - dbServer, err := postgres.NewPostgreSQLServer(config, *dbOptions.masterAddr) + // Create database server. + dbServer, err := postgres.NewPostgreSQLServer(config, opts.MasterAddr) if err != nil { fmt.Fprintf(os.Stderr, "Error creating database server: %v\n", err) return false } - // Print startup information + // Print startup information. fmt.Printf("Starting SeaweedFS Database Server...\n") - fmt.Printf("Host: %s\n", *dbOptions.host) - fmt.Printf("Port: %d\n", *dbOptions.port) - fmt.Printf("Master: %s\n", *dbOptions.masterAddr) - fmt.Printf("Database: %s\n", *dbOptions.database) - fmt.Printf("Auth Method: %s\n", *dbOptions.authMethod) - fmt.Printf("Max Connections: %d\n", *dbOptions.maxConns) - fmt.Printf("Idle Timeout: %s\n", *dbOptions.idleTimeout) + fmt.Printf("Host: %s\n", opts.Host) + fmt.Printf("Port: %d\n", opts.Port) + fmt.Printf("Master: %s\n", opts.MasterAddr) + fmt.Printf("Database: %s\n", opts.Database) + fmt.Printf("Auth Method: %s\n", opts.AuthMethod) + fmt.Printf("Max Connections: %d\n", opts.MaxConns) + fmt.Printf("Idle Timeout: %s\n", opts.IdleTimeout) if tlsConfig != nil { fmt.Printf("TLS: Enabled\n") } else { @@ -240,15 +255,15 @@ func runDB(cmd *Command, args []string) bool { } fmt.Printf("\nDatabase Connection Examples:\n") - fmt.Printf(" psql -h %s -p %d -U seaweedfs -d %s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database) + fmt.Printf(" psql -h %s -p %d -U seaweedfs -d %s\n", opts.Host, opts.Port, opts.Database) if len(users) > 0 { - // Show first user as example + // Show first user as example. for username := range users { - fmt.Printf(" psql -h %s -p %d -U %s -d %s\n", *dbOptions.host, *dbOptions.port, username, *dbOptions.database) + fmt.Printf(" psql -h %s -p %d -U %s -d %s\n", opts.Host, opts.Port, username, opts.Database) break } } - fmt.Printf(" postgresql://%s:%d/%s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database) + fmt.Printf(" postgresql://%s:%d/%s\n", opts.Host, opts.Port, opts.Database) fmt.Printf("\nSupported Operations:\n") fmt.Printf(" - SELECT queries on MQ topics\n") @@ -261,26 +276,26 @@ func runDB(cmd *Command, args []string) bool { fmt.Printf("\nReady for database connections!\n\n") - // Start the server + // Start the server. err = dbServer.Start() if err != nil { fmt.Fprintf(os.Stderr, "Error starting database server: %v\n", err) return false } - // Set up signal handling for graceful shutdown + // Set up signal handling for graceful shutdown. sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - // Wait for shutdown signal + // Wait for shutdown signal. <-sigChan fmt.Printf("\nReceived shutdown signal, stopping database server...\n") - // Create context with timeout for graceful shutdown + // Create context with timeout for graceful shutdown. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Stop the server with timeout + // Stop the server with timeout. done := make(chan error, 1) go func() { done <- dbServer.Stop() @@ -301,7 +316,7 @@ func runDB(cmd *Command, args []string) bool { return true } -// parseAuthMethod parses the authentication method string +// parseAuthMethod parses the authentication method string. func parseAuthMethod(method string) (postgres.AuthMethod, error) { switch strings.ToLower(method) { case "trust": @@ -315,7 +330,7 @@ func parseAuthMethod(method string) (postgres.AuthMethod, error) { } } -// parseUsers parses the user credentials string with support for secure formats only +// parseUsers parses the user credentials string with support for secure formats only. // Supported formats: // 1. JSON format: {"username":"password","username2":"password2"} // 2. File format: /path/to/users.json or @/path/to/users.json @@ -323,41 +338,41 @@ func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]str users := make(map[string]string) if usersStr == "" { - // No users specified + // No users specified. if authMethod != postgres.AuthTrust { return nil, fmt.Errorf("users must be specified when auth method is not 'trust'") } return users, nil } - // Trim whitespace + // Trim whitespace. usersStr = strings.TrimSpace(usersStr) - // Determine format and parse accordingly + // Determine format and parse accordingly. if strings.HasPrefix(usersStr, "{") && strings.HasSuffix(usersStr, "}") { - // JSON format + // JSON format. return parseUsersJSON(usersStr, authMethod) } - // Check if it's a file path (with or without @ prefix) before declaring invalid format + // Check if it's a file path (with or without @ prefix) before declaring invalid format. filePath := strings.TrimPrefix(usersStr, "@") if _, err := os.Stat(filePath); err == nil { - // File format - return parseUsersFile(usersStr, authMethod) // Pass original string to preserve @ handling + // File format. + return parseUsersFile(usersStr, authMethod) // Pass original string to preserve @ handling. } - // Invalid format + // Invalid format. return nil, fmt.Errorf("invalid user credentials format. Use JSON format '{\"user\":\"pass\"}' or file format '@/path/to/users.json' or 'path/to/users.json'. Legacy semicolon-separated format is no longer supported") } -// parseUsersJSON parses user credentials from JSON format +// parseUsersJSON parses user credentials from JSON format. func parseUsersJSON(jsonStr string, authMethod postgres.AuthMethod) (map[string]string, error) { var users map[string]string if err := json.Unmarshal([]byte(jsonStr), &users); err != nil { return nil, fmt.Errorf("invalid JSON format for users: %v", err) } - // Validate users + // Validate users. for username, password := range users { if username == "" { return nil, fmt.Errorf("empty username in JSON user specification") @@ -370,12 +385,12 @@ func parseUsersJSON(jsonStr string, authMethod postgres.AuthMethod) (map[string] return users, nil } -// parseUsersFile parses user credentials from a JSON file +// parseUsersFile parses user credentials from a JSON file. func parseUsersFile(filePath string, authMethod postgres.AuthMethod) (map[string]string, error) { - // Remove @ prefix if present + // Remove @ prefix if present. filePath = strings.TrimPrefix(filePath, "@") - // Read file content + // Read file content. content, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("failed to read users file '%s': %v", filePath, err) @@ -383,16 +398,16 @@ func parseUsersFile(filePath string, authMethod postgres.AuthMethod) (map[string contentStr := strings.TrimSpace(string(content)) - // File must contain JSON format + // File must contain JSON format. if !strings.HasPrefix(contentStr, "{") || !strings.HasSuffix(contentStr, "}") { return nil, fmt.Errorf("users file '%s' must contain JSON format: {\"user\":\"pass\"}. Legacy formats are no longer supported", filePath) } - // Parse as JSON + // Parse as JSON. return parseUsersJSON(contentStr, authMethod) } -// validatePortNumber validates that the port number is reasonable +// validatePortNumber validates that the port number is reasonable. func validatePortNumber(port int) error { if port < 1 || port > 65535 { return fmt.Errorf("port number must be between 1 and 65535, got %d", port) diff --git a/cmd/weed-db/main.go b/cmd/weed-db/main.go new file mode 100644 index 000000000..735dde666 --- /dev/null +++ b/cmd/weed-db/main.go @@ -0,0 +1,7 @@ +package main + +import "os" + +func main() { + os.Exit(Run(os.Args[1:])) +} diff --git a/cmd/weed-sql/main.go b/cmd/weed-sql/main.go new file mode 100644 index 000000000..735dde666 --- /dev/null +++ b/cmd/weed-sql/main.go @@ -0,0 +1,7 @@ +package main + +import "os" + +func main() { + os.Exit(Run(os.Args[1:])) +} diff --git a/weed/command/sql.go b/cmd/weed-sql/sqlcmd.go similarity index 73% rename from weed/command/sql.go rename to cmd/weed-sql/sqlcmd.go index 682c8e46d..c25b6501f 100644 --- a/weed/command/sql.go +++ b/cmd/weed-sql/sqlcmd.go @@ -1,4 +1,4 @@ -package command +package main import ( "context" @@ -13,28 +13,24 @@ import ( "github.com/peterh/liner" "github.com/seaweedfs/seaweedfs/weed/query/engine" + flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "github.com/seaweedfs/seaweedfs/weed/util/grace" "github.com/seaweedfs/seaweedfs/weed/util/sqlutil" ) -func init() { - cmdSql.Run = runSql -} +const usageLine = "weed-sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]" -var cmdSql = &Command{ - UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]", - Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes", - Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes. +const longHelp = `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes. Execution Modes: -- Interactive shell (default): weed sql -interactive -- Single query: weed sql -query "SELECT * FROM user_events" -- Batch from file: weed sql -file queries.sql -- Context switching: weed sql -database analytics -interactive +- Interactive shell (default): weed-sql -interactive +- Single query: weed-sql -query "SELECT * FROM user_events" +- Batch from file: weed-sql -file queries.sql +- Context switching: weed-sql -database analytics -interactive Output Formats: - table: ASCII table format (default for interactive) -- json: JSON format (default for non-interactive) +- json: JSON format (default for non-interactive) - csv: Comma-separated values Features: @@ -45,24 +41,23 @@ Features: - Database context switching Examples: - weed sql -interactive - weed sql -query "SHOW DATABASES" -output json - weed sql -file batch_queries.sql -output csv - weed sql -database analytics -query "SELECT COUNT(*) FROM metrics" - weed sql -master broker1:9333 -interactive -`, + weed-sql -interactive + weed-sql -query "SHOW DATABASES" -output json + weed-sql -file batch_queries.sql -output csv + weed-sql -database analytics -query "SELECT COUNT(*) FROM metrics" + weed-sql -master broker1:9333 -interactive +` + +type Options struct { + Master string + Interactive bool + File string + Output string + Database string + Query string } -var ( - sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address") - sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode") - sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file") - sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)") - sqlDatabase = cmdSql.Flag.String("database", "", "default database context") - sqlQuery = cmdSql.Flag.String("query", "", "execute single SQL query") -) - -// OutputFormat represents different output formatting options +// OutputFormat represents different output formatting options. type OutputFormat string const ( @@ -71,50 +66,82 @@ const ( OutputCSV OutputFormat = "csv" ) -// SQLContext holds the execution context for SQL operations +// SQLContext holds the execution context for SQL operations. type SQLContext struct { engine *engine.SQLEngine currentDatabase string outputFormat OutputFormat interactive bool + master string +} + +// Run executes the weed-sql CLI. +func Run(args []string) int { + fs := flag.NewFlagSet("weed-sql", flag.ContinueOnError) + usageWriter := io.Writer(os.Stderr) + fs.SetOutput(usageWriter) + + var opts Options + fs.StringVar(&opts.Master, "master", "localhost:9333", "SeaweedFS master server HTTP address") + fs.BoolVar(&opts.Interactive, "interactive", false, "start interactive shell mode") + fs.StringVar(&opts.File, "file", "", "execute SQL queries from file") + fs.StringVar(&opts.Output, "output", "", "output format: table, json, csv (auto-detected if not specified)") + fs.StringVar(&opts.Database, "database", "", "default database context") + fs.StringVar(&opts.Query, "query", "", "execute single SQL query") + + fs.Usage = func() { + fmt.Fprintf(usageWriter, "Usage: %s\n\n%s\n", usageLine, longHelp) + fmt.Fprintln(usageWriter, "Default Parameters:") + fs.PrintDefaults() + } + + if err := fs.Parse(args); err != nil { + return 2 + } + + if !runWithOptions(&opts) { + return 1 + } + return 0 } -func runSql(command *Command, args []string) bool { - // Initialize SQL engine with master address for service discovery - sqlEngine := engine.NewSQLEngine(*sqlMaster) +func runWithOptions(opts *Options) bool { + // Initialize SQL engine with master address for service discovery. + sqlEngine := engine.NewSQLEngine(opts.Master) - // Determine execution mode and output format - interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "") - outputFormat := determineOutputFormat(*sqlOutput, interactive) + // Determine execution mode and output format. + interactive := opts.Interactive || (opts.Query == "" && opts.File == "") + outputFormat := determineOutputFormat(opts.Output, interactive) - // Create SQL context + // Create SQL context. ctx := &SQLContext{ engine: sqlEngine, - currentDatabase: *sqlDatabase, + currentDatabase: opts.Database, outputFormat: outputFormat, interactive: interactive, + master: opts.Master, } - // Set current database in SQL engine if specified via command line - if *sqlDatabase != "" { - ctx.engine.GetCatalog().SetCurrentDatabase(*sqlDatabase) + // Set current database in SQL engine if specified via command line. + if opts.Database != "" { + ctx.engine.GetCatalog().SetCurrentDatabase(opts.Database) } - // Execute based on mode + // Execute based on mode. switch { - case *sqlQuery != "": - // Single query mode - return executeSingleQuery(ctx, *sqlQuery) - case *sqlFile != "": - // Batch file mode - return executeFileQueries(ctx, *sqlFile) + case opts.Query != "": + // Single query mode. + return executeSingleQuery(ctx, opts.Query) + case opts.File != "": + // Batch file mode. + return executeFileQueries(ctx, opts.File) default: - // Interactive mode + // Interactive mode. return runInteractiveShell(ctx) } } -// determineOutputFormat selects the appropriate output format +// determineOutputFormat selects the appropriate output format. func determineOutputFormat(specified string, interactive bool) OutputFormat { switch strings.ToLower(specified) { case "table": @@ -124,7 +151,7 @@ func determineOutputFormat(specified string, interactive bool) OutputFormat { case "csv": return OutputCSV default: - // Auto-detect based on mode + // Auto-detect based on mode. if interactive { return OutputTable } @@ -132,18 +159,18 @@ func determineOutputFormat(specified string, interactive bool) OutputFormat { } } -// executeSingleQuery executes a single query and outputs the result +// executeSingleQuery executes a single query and outputs the result. func executeSingleQuery(ctx *SQLContext, query string) bool { if ctx.outputFormat != OutputTable { - // Suppress banner for non-interactive output + // Suppress banner for non-interactive output. return executeAndDisplay(ctx, query, false) } - fmt.Printf("Executing query against %s...\n", *sqlMaster) + fmt.Printf("Executing query against %s...\n", ctx.master) return executeAndDisplay(ctx, query, true) } -// executeFileQueries processes SQL queries from a file +// executeFileQueries processes SQL queries from a file. func executeFileQueries(ctx *SQLContext, filename string) bool { content, err := os.ReadFile(filename) if err != nil { @@ -152,10 +179,10 @@ func executeFileQueries(ctx *SQLContext, filename string) bool { } if ctx.outputFormat == OutputTable && ctx.interactive { - fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster) + fmt.Printf("Executing queries from %s against %s...\n", filename, ctx.master) } - // Split file content into individual queries (robust approach) + // Split file content into individual queries (robust approach). queries := sqlutil.SplitStatements(string(content)) for i, query := range queries { @@ -176,11 +203,11 @@ func executeFileQueries(ctx *SQLContext, filename string) bool { return true } -// runInteractiveShell starts the enhanced interactive shell with readline support +// runInteractiveShell starts the enhanced interactive shell with readline support. func runInteractiveShell(ctx *SQLContext) bool { fmt.Println("SeaweedFS Enhanced SQL Interface") fmt.Println("Type 'help;' for help, 'exit;' to quit") - fmt.Printf("Connected to master: %s\n", *sqlMaster) + fmt.Printf("Connected to master: %s\n", ctx.master) if ctx.currentDatabase != "" { fmt.Printf("Current database: %s\n", ctx.currentDatabase) } @@ -188,24 +215,24 @@ func runInteractiveShell(ctx *SQLContext) bool { fmt.Println("Use up/down arrows for command history") fmt.Println() - // Initialize liner for readline functionality + // Initialize liner for readline functionality. line := liner.NewLiner() defer line.Close() - // Handle Ctrl+C gracefully + // Handle Ctrl+C gracefully. line.SetCtrlCAborts(true) grace.OnInterrupt(func() { line.Close() }) - // Load command history + // Load command history. historyPath := path.Join(os.TempDir(), "weed-sql-history") if f, err := os.Open(historyPath); err == nil { line.ReadHistory(f) f.Close() } - // Save history on exit + // Save history on exit. defer func() { if f, err := os.Create(historyPath); err == nil { line.WriteHistory(f) @@ -216,7 +243,7 @@ func runInteractiveShell(ctx *SQLContext) bool { var queryBuffer strings.Builder for { - // Show prompt with current database context + // Show prompt with current database context. var prompt string if queryBuffer.Len() == 0 { if ctx.currentDatabase != "" { @@ -225,10 +252,10 @@ func runInteractiveShell(ctx *SQLContext) bool { prompt = "seaweedfs> " } } else { - prompt = " -> " // Continuation prompt + prompt = " -> " // Continuation prompt. } - // Read line with readline support + // Read line with readline support. input, err := line.Prompt(prompt) if err != nil { if err == liner.ErrPromptAborted { @@ -244,30 +271,30 @@ func runInteractiveShell(ctx *SQLContext) bool { lineStr := strings.TrimSpace(input) - // Handle empty lines + // Handle empty lines. if lineStr == "" { continue } - // Accumulate lines in query buffer + // Accumulate lines in query buffer. if queryBuffer.Len() > 0 { queryBuffer.WriteString(" ") } queryBuffer.WriteString(lineStr) - // Check if we have a complete statement (ends with semicolon or special command) + // Check if we have a complete statement (ends with semicolon or special command). fullQuery := strings.TrimSpace(queryBuffer.String()) isComplete := strings.HasSuffix(lineStr, ";") || isSpecialCommand(fullQuery) if !isComplete { - continue // Continue reading more lines + continue // Continue reading more lines. } - // Add completed command to history + // Add completed command to history. line.AppendHistory(fullQuery) - // Handle special commands (with or without semicolon) + // Handle special commands (with or without semicolon). cleanQuery := strings.TrimSuffix(fullQuery, ";") cleanQuery = strings.TrimSpace(cleanQuery) @@ -282,19 +309,19 @@ func runInteractiveShell(ctx *SQLContext) bool { continue } - // Handle database switching - use proper SQL parser instead of manual parsing + // Handle database switching - use proper SQL parser instead of manual parsing. if strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") { - // Execute USE statement through the SQL engine for proper parsing + // Execute USE statement through the SQL engine for proper parsing. result, err := ctx.engine.ExecuteSQL(context.Background(), cleanQuery) if err != nil { fmt.Printf("Error: %v\n\n", err) } else if result.Error != nil { fmt.Printf("Error: %v\n\n", result.Error) } else { - // Extract the database name from the result message for CLI context + // Extract the database name from the result message for CLI context. if len(result.Rows) > 0 && len(result.Rows[0]) > 0 { message := result.Rows[0][0].ToString() - // Extract database name from "Database changed to: dbname" + // Extract database name from "Database changed to: dbname". if strings.HasPrefix(message, "Database changed to: ") { ctx.currentDatabase = strings.TrimPrefix(message, "Database changed to: ") } @@ -305,7 +332,7 @@ func runInteractiveShell(ctx *SQLContext) bool { continue } - // Handle output format switching + // Handle output format switching. if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") { format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")) switch format { @@ -325,22 +352,22 @@ func runInteractiveShell(ctx *SQLContext) bool { continue } - // Execute SQL query (without semicolon) + // Execute SQL query (without semicolon). executeAndDisplay(ctx, cleanQuery, true) - // Reset buffer for next query + // Reset buffer for next query. queryBuffer.Reset() } return true } -// isSpecialCommand checks if a command is a special command that doesn't require semicolon +// isSpecialCommand checks if a command is a special command that doesn't require semicolon. func isSpecialCommand(query string) bool { cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";") cleanQuery = strings.ToLower(cleanQuery) - // Special commands that work with or without semicolon + // Special commands that work with or without semicolon. specialCommands := []string{ "exit", "quit", "\\q", "help", } @@ -351,7 +378,7 @@ func isSpecialCommand(query string) bool { } } - // Commands that are exactly specific commands (not just prefixes) + // Commands that are exactly specific commands (not just prefixes). parts := strings.Fields(strings.ToUpper(cleanQuery)) if len(parts) == 0 { return false @@ -360,11 +387,11 @@ func isSpecialCommand(query string) bool { strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") } -// executeAndDisplay executes a query and displays the result in the specified format +// executeAndDisplay executes a query and displays the result in the specified format. func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool { startTime := time.Now() - // Execute the query + // Execute the query. execCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -397,7 +424,7 @@ func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool { return false } - // Display results in the specified format + // Display results in the specified format. switch ctx.outputFormat { case OutputTable: displayTableResult(result) @@ -407,8 +434,8 @@ func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool { displayCSVResult(result) } - // Show execution time for interactive/table mode - // Only show timing if there are columns or if result is truly empty + // Show execution time for interactive/table mode. + // Only show timing if there are columns or if result is truly empty. if showTiming && ctx.outputFormat == OutputTable && (len(result.Columns) > 0 || len(result.Rows) == 0) { elapsed := time.Since(startTime) fmt.Printf("\n(%d rows in set, %.3f sec)\n\n", len(result.Rows), elapsed.Seconds()) @@ -417,20 +444,20 @@ func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool { return true } -// displayTableResult formats and displays query results in ASCII table format +// displayTableResult formats and displays query results in ASCII table format. func displayTableResult(result *engine.QueryResult) { if len(result.Columns) == 0 { fmt.Println("Empty result set") return } - // Calculate column widths for formatting + // Calculate column widths for formatting. colWidths := make([]int, len(result.Columns)) for i, col := range result.Columns { colWidths[i] = len(col) } - // Check data for wider columns + // Check data for wider columns. for _, row := range result.Rows { for i, val := range row { if i < len(colWidths) { @@ -442,28 +469,28 @@ func displayTableResult(result *engine.QueryResult) { } } - // Print header separator + // Print header separator. fmt.Print("+") for _, width := range colWidths { fmt.Print(strings.Repeat("-", width+2) + "+") } fmt.Println() - // Print column headers + // Print column headers. fmt.Print("|") for i, col := range result.Columns { fmt.Printf(" %-*s |", colWidths[i], col) } fmt.Println() - // Print separator + // Print separator. fmt.Print("+") for _, width := range colWidths { fmt.Print(strings.Repeat("-", width+2) + "+") } fmt.Println() - // Print data rows + // Print data rows. for _, row := range result.Rows { fmt.Print("|") for i, val := range row { @@ -474,7 +501,7 @@ func displayTableResult(result *engine.QueryResult) { fmt.Println() } - // Print bottom separator + // Print bottom separator. fmt.Print("+") for _, width := range colWidths { fmt.Print(strings.Repeat("-", width+2) + "+") @@ -482,16 +509,16 @@ func displayTableResult(result *engine.QueryResult) { fmt.Println() } -// displayJSONResult outputs query results in JSON format +// displayJSONResult outputs query results in JSON format. func displayJSONResult(result *engine.QueryResult) { - // Convert result to JSON-friendly format + // Convert result to JSON-friendly format. jsonResult := map[string]interface{}{ "columns": result.Columns, "rows": make([]map[string]interface{}, len(result.Rows)), "count": len(result.Rows), } - // Convert rows to JSON objects + // Convert rows to JSON objects. for i, row := range result.Rows { rowObj := make(map[string]interface{}) for j, val := range row { @@ -502,7 +529,7 @@ func displayJSONResult(result *engine.QueryResult) { jsonResult["rows"].([]map[string]interface{})[i] = rowObj } - // Marshal and print JSON + // Marshal and print JSON. jsonBytes, err := json.MarshalIndent(jsonResult, "", " ") if err != nil { fmt.Printf("Error formatting JSON: %v\n", err) @@ -512,11 +539,11 @@ func displayJSONResult(result *engine.QueryResult) { fmt.Println(string(jsonBytes)) } -// displayCSVResult outputs query results in CSV format +// displayCSVResult outputs query results in CSV format. func displayCSVResult(result *engine.QueryResult) { - // Handle execution plan results specially to avoid CSV quoting issues + // Handle execution plan results specially to avoid CSV quoting issues. if len(result.Columns) == 1 && result.Columns[0] == "Query Execution Plan" { - // For execution plans, output directly without CSV encoding to avoid quotes + // For execution plans, output directly without CSV encoding to avoid quotes. for _, row := range result.Rows { if len(row) > 0 { fmt.Println(row[0].ToString()) @@ -525,17 +552,17 @@ func displayCSVResult(result *engine.QueryResult) { return } - // Standard CSV output for regular query results + // Standard CSV output for regular query results. writer := csv.NewWriter(os.Stdout) defer writer.Flush() - // Write headers + // Write headers. if err := writer.Write(result.Columns); err != nil { fmt.Printf("Error writing CSV headers: %v\n", err) return } - // Write data rows + // Write data rows. for _, row := range result.Rows { csvRow := make([]string, len(row)) for i, val := range row { @@ -553,7 +580,7 @@ func showEnhancedHelp() { METADATA OPERATIONS: SHOW DATABASES; - List all MQ namespaces - SHOW TABLES; - List all topics in current namespace + SHOW TABLES; - List all topics in current namespace SHOW TABLES FROM database; - List topics in specific namespace DESCRIBE table_name; - Show table schema @@ -581,7 +608,7 @@ SPECIAL COMMANDS: EXTENDED WHERE OPERATORS: =, <, >, <=, >= - Comparison operators - !=, <> - Not equal operators + !=, <> - Not equal operators LIKE 'pattern%' - Pattern matching (% = any chars, _ = single char) IN (value1, value2, ...) - Multi-value matching AND, OR - Logical operators diff --git a/test/kafka/Dockerfile.kafka-gateway b/test/kafka/Dockerfile.kafka-gateway index c2f975f6d..6e96fa893 100644 --- a/test/kafka/Dockerfile.kafka-gateway +++ b/test/kafka/Dockerfile.kafka-gateway @@ -16,8 +16,9 @@ RUN go mod download # Copy source code COPY . . -# Build the weed binary with Kafka gateway support +# Build the weed binaries with Kafka gateway support RUN CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o weed ./weed +RUN CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o weed-sql ./cmd/weed-sql # Final stage FROM alpine:latest @@ -32,8 +33,9 @@ RUN addgroup -g 1000 seaweedfs && \ # Set working directory WORKDIR /usr/bin -# Copy binary from builder +# Copy binaries from builder COPY --from=builder /app/weed . +COPY --from=builder /app/weed-sql . # Create data directory RUN mkdir -p /data && chown seaweedfs:seaweedfs /data diff --git a/test/kafka/test_json_timestamp.sh b/test/kafka/test_json_timestamp.sh old mode 100755 new mode 100644 index 545c07d6f..5868b7293 --- a/test/kafka/test_json_timestamp.sh +++ b/test/kafka/test_json_timestamp.sh @@ -14,8 +14,7 @@ sleep 2 echo "Querying messages..." cd /Users/chrislu/go/src/github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest -docker compose exec kafka-gateway /usr/local/bin/weed sql \ +docker compose exec kafka-gateway /usr/bin/weed-sql \ -master=seaweedfs-master:9333 \ -database=kafka \ -query="SELECT id, timestamp, producer_id, counter, user_id, event_type FROM \"test-json-topic\" LIMIT 5;" - diff --git a/test/postgres/Dockerfile.seaweedfs b/test/postgres/Dockerfile.seaweedfs index 49ff74930..e9f49825d 100644 --- a/test/postgres/Dockerfile.seaweedfs +++ b/test/postgres/Dockerfile.seaweedfs @@ -13,8 +13,9 @@ RUN go mod download # Copy source code COPY . . -# Build the weed binary without CGO +# Build the weed binaries without CGO RUN CGO_ENABLED=0 GOOS=linux go build -ldflags "-s -w" -o weed ./weed/ +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags "-s -w" -o weed-db ./cmd/weed-db # Final stage - minimal runtime image FROM alpine:latest @@ -24,11 +25,12 @@ RUN apk --no-cache add ca-certificates netcat-openbsd curl WORKDIR /root/ -# Copy the weed binary from builder stage +# Copy the binaries from builder stage COPY --from=builder /app/weed . +COPY --from=builder /app/weed-db . # Make it executable -RUN chmod +x ./weed +RUN chmod +x ./weed ./weed-db # Expose ports EXPOSE 9333 8888 8333 8085 9533 5432 diff --git a/test/postgres/SETUP_OVERVIEW.md b/test/postgres/SETUP_OVERVIEW.md index 8715e5a9f..134a9cd20 100644 --- a/test/postgres/SETUP_OVERVIEW.md +++ b/test/postgres/SETUP_OVERVIEW.md @@ -280,7 +280,7 @@ This test setup proves: - Comprehensive error handling ### ✅ Performance and Scalability -- Direct SQL engine integration (same as `weed sql`) +- Direct SQL engine integration (same as `weed-sql`) - No translation overhead for real queries - Efficient data access from stored formats - Scalable architecture with service discovery diff --git a/test/postgres/docker-compose.yml b/test/postgres/docker-compose.yml index 87c36d0e8..83dd6f095 100644 --- a/test/postgres/docker-compose.yml +++ b/test/postgres/docker-compose.yml @@ -54,7 +54,7 @@ services: seaweedfs: condition: service_healthy command: > - ./weed db + ./weed-db -host=0.0.0.0 -port=5432 -master=seaweedfs:9333 diff --git a/test/postgres/run-tests.sh b/test/postgres/run-tests.sh old mode 100755 new mode 100644 diff --git a/test/postgres/validate-setup.sh b/test/postgres/validate-setup.sh old mode 100755 new mode 100644 diff --git a/weed/Makefile b/weed/Makefile index 38c0d9317..ef236792a 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -9,6 +9,14 @@ all: install install: go install -ldflags="-s -w" +.PHONY: weed-db weed-sql + +weed-db: + go build -ldflags="-s -w" -o weed-db ./cmd/weed-db + +weed-sql: + go build -ldflags="-s -w" -o weed-sql ./cmd/weed-sql + build_docker: CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" diff --git a/weed/command/command.go b/weed/command/command.go index b2cef4540..90eb3ad68 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -37,12 +37,10 @@ var Commands = []*Command{ cmdMqAgent, cmdMqBroker, cmdMqKafkaGateway, - cmdDB, cmdS3, cmdScaffold, cmdServer, cmdShell, - cmdSql, cmdUpdate, cmdUpload, cmdVersion, diff --git a/weed/server/postgres/DESIGN.md b/weed/server/postgres/DESIGN.md index 33d922a43..abef9662f 100644 --- a/weed/server/postgres/DESIGN.md +++ b/weed/server/postgres/DESIGN.md @@ -296,9 +296,9 @@ jdbc:postgresql://localhost:5432/default?user=seaweedfs&password=secret ```bash # Start PostgreSQL protocol server -weed db -port=5432 -auth=trust -weed db -port=5432 -auth=password -users="admin:secret;readonly:pass" -weed db -port=5432 -tls-cert=server.crt -tls-key=server.key +weed-db -port=5432 -auth=trust +weed-db -port=5432 -auth=password -users="admin:secret;readonly:pass" +weed-db -port=5432 -tls-cert=server.crt -tls-key=server.key # Configuration options -host=localhost # Listen host diff --git a/weed/server/postgres/README.md b/weed/server/postgres/README.md index 7d9ecefe5..d39d71eae 100644 --- a/weed/server/postgres/README.md +++ b/weed/server/postgres/README.md @@ -43,7 +43,7 @@ The PostgreSQL server now directly integrates with SeaweedFS Message Queue topic - **Real Schema Information**: Reads actual topic schemas from broker configuration - **Actual Data Access**: Queries real MQ data stored in Parquet and log files - **Dynamic Updates**: Reflects topic additions and schema changes automatically -- **Consistent SQL Engine**: Uses the same SQL engine as `weed sql` command +- **Consistent SQL Engine**: Uses the same SQL engine as `weed-sql` command ### Database Context Management - **Session Isolation**: Each PostgreSQL connection has its own database context @@ -232,7 +232,7 @@ psql -h localhost -p 5432 -U seaweedfs -d default - **DESIGN.md**: Complete architecture and design overview - **IMPLEMENTATION.md**: Detailed implementation guide - **postgres-examples/**: Client examples and test scripts -- **Command Documentation**: `weed db -help` +- **Command Documentation**: `weed-db -help` ## Security Considerations