You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
584 lines
16 KiB
584 lines
16 KiB
package command
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/peterh/liner"
|
|
"github.com/seaweedfs/seaweedfs/weed/query/engine"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/sqlutil"
|
|
)
|
|
|
|
func init() {
|
|
cmdSql.Run = runSql
|
|
}
|
|
|
|
var cmdSql = &Command{
|
|
UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]",
|
|
Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes",
|
|
Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes.
|
|
|
|
Execution Modes:
|
|
- Interactive shell (default): weed sql -interactive
|
|
- Single query: weed sql -query "SELECT * FROM user_events"
|
|
- Batch from file: weed sql -file queries.sql
|
|
- Context switching: weed sql -database analytics -interactive
|
|
|
|
Output Formats:
|
|
- table: ASCII table format (default for interactive)
|
|
- json: JSON format (default for non-interactive)
|
|
- csv: Comma-separated values
|
|
|
|
Features:
|
|
- Full WHERE clause support (=, <, >, <=, >=, !=, LIKE, IN)
|
|
- Advanced pattern matching with LIKE wildcards (%, _)
|
|
- Multi-value filtering with IN operator
|
|
- Real MQ namespace and topic discovery
|
|
- Database context switching
|
|
|
|
Examples:
|
|
weed sql -interactive
|
|
weed sql -query "SHOW DATABASES" -output json
|
|
weed sql -file batch_queries.sql -output csv
|
|
weed sql -database analytics -query "SELECT COUNT(*) FROM metrics"
|
|
weed sql -master broker1:9333 -interactive
|
|
`,
|
|
}
|
|
|
|
var (
|
|
sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address")
|
|
sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode")
|
|
sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file")
|
|
sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)")
|
|
sqlDatabase = cmdSql.Flag.String("database", "", "default database context")
|
|
sqlQuery = cmdSql.Flag.String("query", "", "execute single SQL query")
|
|
)
|
|
|
|
// OutputFormat represents different output formatting options
|
|
type OutputFormat string
|
|
|
|
const (
|
|
OutputTable OutputFormat = "table"
|
|
OutputJSON OutputFormat = "json"
|
|
OutputCSV OutputFormat = "csv"
|
|
)
|
|
|
|
// SQLContext holds the execution context for SQL operations
|
|
type SQLContext struct {
|
|
engine *engine.SQLEngine
|
|
currentDatabase string
|
|
outputFormat OutputFormat
|
|
interactive bool
|
|
}
|
|
|
|
func runSql(command *Command, args []string) bool {
|
|
// Initialize SQL engine with master address for service discovery
|
|
sqlEngine := engine.NewSQLEngine(*sqlMaster)
|
|
|
|
// Determine execution mode and output format
|
|
interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "")
|
|
outputFormat := determineOutputFormat(*sqlOutput, interactive)
|
|
|
|
// Create SQL context
|
|
ctx := &SQLContext{
|
|
engine: sqlEngine,
|
|
currentDatabase: *sqlDatabase,
|
|
outputFormat: outputFormat,
|
|
interactive: interactive,
|
|
}
|
|
|
|
// Set current database in SQL engine if specified via command line
|
|
if *sqlDatabase != "" {
|
|
ctx.engine.GetCatalog().SetCurrentDatabase(*sqlDatabase)
|
|
}
|
|
|
|
// Execute based on mode
|
|
switch {
|
|
case *sqlQuery != "":
|
|
// Single query mode
|
|
return executeSingleQuery(ctx, *sqlQuery)
|
|
case *sqlFile != "":
|
|
// Batch file mode
|
|
return executeFileQueries(ctx, *sqlFile)
|
|
default:
|
|
// Interactive mode
|
|
return runInteractiveShell(ctx)
|
|
}
|
|
}
|
|
|
|
// determineOutputFormat selects the appropriate output format
|
|
func determineOutputFormat(specified string, interactive bool) OutputFormat {
|
|
switch strings.ToLower(specified) {
|
|
case "table":
|
|
return OutputTable
|
|
case "json":
|
|
return OutputJSON
|
|
case "csv":
|
|
return OutputCSV
|
|
default:
|
|
// Auto-detect based on mode
|
|
if interactive {
|
|
return OutputTable
|
|
}
|
|
return OutputJSON
|
|
}
|
|
}
|
|
|
|
// executeSingleQuery executes a single query and outputs the result
|
|
func executeSingleQuery(ctx *SQLContext, query string) bool {
|
|
if ctx.outputFormat != OutputTable {
|
|
// Suppress banner for non-interactive output
|
|
return executeAndDisplay(ctx, query, false)
|
|
}
|
|
|
|
fmt.Printf("Executing query against %s...\n", *sqlMaster)
|
|
return executeAndDisplay(ctx, query, true)
|
|
}
|
|
|
|
// executeFileQueries processes SQL queries from a file
|
|
func executeFileQueries(ctx *SQLContext, filename string) bool {
|
|
content, err := ioutil.ReadFile(filename)
|
|
if err != nil {
|
|
fmt.Printf("Error reading file %s: %v\n", filename, err)
|
|
return false
|
|
}
|
|
|
|
if ctx.outputFormat == OutputTable && ctx.interactive {
|
|
fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster)
|
|
}
|
|
|
|
// Split file content into individual queries (robust approach)
|
|
queries := sqlutil.SplitStatements(string(content))
|
|
|
|
for i, query := range queries {
|
|
query = strings.TrimSpace(query)
|
|
if query == "" {
|
|
continue
|
|
}
|
|
|
|
if ctx.outputFormat == OutputTable && len(queries) > 1 {
|
|
fmt.Printf("\n--- Query %d ---\n", i+1)
|
|
}
|
|
|
|
if !executeAndDisplay(ctx, query, ctx.outputFormat == OutputTable) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// runInteractiveShell starts the enhanced interactive shell with readline support
|
|
func runInteractiveShell(ctx *SQLContext) bool {
|
|
fmt.Println("SeaweedFS Enhanced SQL Interface")
|
|
fmt.Println("Type 'help;' for help, 'exit;' to quit")
|
|
fmt.Printf("Connected to master: %s\n", *sqlMaster)
|
|
if ctx.currentDatabase != "" {
|
|
fmt.Printf("Current database: %s\n", ctx.currentDatabase)
|
|
}
|
|
fmt.Println("Advanced WHERE operators supported: <=, >=, !=, LIKE, IN")
|
|
fmt.Println("Use up/down arrows for command history")
|
|
fmt.Println()
|
|
|
|
// Initialize liner for readline functionality
|
|
line := liner.NewLiner()
|
|
defer line.Close()
|
|
|
|
// Handle Ctrl+C gracefully
|
|
line.SetCtrlCAborts(true)
|
|
grace.OnInterrupt(func() {
|
|
line.Close()
|
|
})
|
|
|
|
// Load command history
|
|
historyPath := path.Join(os.TempDir(), "weed-sql-history")
|
|
if f, err := os.Open(historyPath); err == nil {
|
|
line.ReadHistory(f)
|
|
f.Close()
|
|
}
|
|
|
|
// Save history on exit
|
|
defer func() {
|
|
if f, err := os.Create(historyPath); err == nil {
|
|
line.WriteHistory(f)
|
|
f.Close()
|
|
}
|
|
}()
|
|
|
|
var queryBuffer strings.Builder
|
|
|
|
for {
|
|
// Show prompt with current database context
|
|
var prompt string
|
|
if queryBuffer.Len() == 0 {
|
|
if ctx.currentDatabase != "" {
|
|
prompt = fmt.Sprintf("seaweedfs:%s> ", ctx.currentDatabase)
|
|
} else {
|
|
prompt = "seaweedfs> "
|
|
}
|
|
} else {
|
|
prompt = " -> " // Continuation prompt
|
|
}
|
|
|
|
// Read line with readline support
|
|
input, err := line.Prompt(prompt)
|
|
if err != nil {
|
|
if err == liner.ErrPromptAborted {
|
|
fmt.Println("Query cancelled")
|
|
queryBuffer.Reset()
|
|
continue
|
|
}
|
|
if err != io.EOF {
|
|
fmt.Printf("Input error: %v\n", err)
|
|
}
|
|
break
|
|
}
|
|
|
|
lineStr := strings.TrimSpace(input)
|
|
|
|
// Handle empty lines
|
|
if lineStr == "" {
|
|
continue
|
|
}
|
|
|
|
// Accumulate lines in query buffer
|
|
if queryBuffer.Len() > 0 {
|
|
queryBuffer.WriteString(" ")
|
|
}
|
|
queryBuffer.WriteString(lineStr)
|
|
|
|
// Check if we have a complete statement (ends with semicolon or special command)
|
|
fullQuery := strings.TrimSpace(queryBuffer.String())
|
|
isComplete := strings.HasSuffix(lineStr, ";") ||
|
|
isSpecialCommand(fullQuery)
|
|
|
|
if !isComplete {
|
|
continue // Continue reading more lines
|
|
}
|
|
|
|
// Add completed command to history
|
|
line.AppendHistory(fullQuery)
|
|
|
|
// Handle special commands (with or without semicolon)
|
|
cleanQuery := strings.TrimSuffix(fullQuery, ";")
|
|
cleanQuery = strings.TrimSpace(cleanQuery)
|
|
|
|
if cleanQuery == "exit" || cleanQuery == "quit" || cleanQuery == "\\q" {
|
|
fmt.Println("Goodbye!")
|
|
break
|
|
}
|
|
|
|
if cleanQuery == "help" {
|
|
showEnhancedHelp()
|
|
queryBuffer.Reset()
|
|
continue
|
|
}
|
|
|
|
// Handle database switching - use proper SQL parser instead of manual parsing
|
|
if strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") {
|
|
// Execute USE statement through the SQL engine for proper parsing
|
|
result, err := ctx.engine.ExecuteSQL(context.Background(), cleanQuery)
|
|
if err != nil {
|
|
fmt.Printf("Error: %v\n\n", err)
|
|
} else if result.Error != nil {
|
|
fmt.Printf("Error: %v\n\n", result.Error)
|
|
} else {
|
|
// Extract the database name from the result message for CLI context
|
|
if len(result.Rows) > 0 && len(result.Rows[0]) > 0 {
|
|
message := result.Rows[0][0].ToString()
|
|
// Extract database name from "Database changed to: dbname"
|
|
if strings.HasPrefix(message, "Database changed to: ") {
|
|
ctx.currentDatabase = strings.TrimPrefix(message, "Database changed to: ")
|
|
}
|
|
fmt.Printf("%s\n\n", message)
|
|
}
|
|
}
|
|
queryBuffer.Reset()
|
|
continue
|
|
}
|
|
|
|
// Handle output format switching
|
|
if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") {
|
|
format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT "))
|
|
switch format {
|
|
case "TABLE":
|
|
ctx.outputFormat = OutputTable
|
|
fmt.Println("Output format set to: table")
|
|
case "JSON":
|
|
ctx.outputFormat = OutputJSON
|
|
fmt.Println("Output format set to: json")
|
|
case "CSV":
|
|
ctx.outputFormat = OutputCSV
|
|
fmt.Println("Output format set to: csv")
|
|
default:
|
|
fmt.Printf("Invalid format: %s. Supported: table, json, csv\n", format)
|
|
}
|
|
queryBuffer.Reset()
|
|
continue
|
|
}
|
|
|
|
// Execute SQL query (without semicolon)
|
|
executeAndDisplay(ctx, cleanQuery, true)
|
|
|
|
// Reset buffer for next query
|
|
queryBuffer.Reset()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// isSpecialCommand checks if a command is a special command that doesn't require semicolon
|
|
func isSpecialCommand(query string) bool {
|
|
cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";")
|
|
cleanQuery = strings.ToLower(cleanQuery)
|
|
|
|
// Special commands that work with or without semicolon
|
|
specialCommands := []string{
|
|
"exit", "quit", "\\q", "help",
|
|
}
|
|
|
|
for _, cmd := range specialCommands {
|
|
if cleanQuery == cmd {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Commands that are exactly specific commands (not just prefixes)
|
|
parts := strings.Fields(strings.ToUpper(cleanQuery))
|
|
if len(parts) == 0 {
|
|
return false
|
|
}
|
|
return (parts[0] == "USE" && len(parts) >= 2) ||
|
|
strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")
|
|
}
|
|
|
|
// executeAndDisplay executes a query and displays the result in the specified format
|
|
func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool {
|
|
startTime := time.Now()
|
|
|
|
// Execute the query
|
|
execCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
result, err := ctx.engine.ExecuteSQL(execCtx, query)
|
|
if err != nil {
|
|
if ctx.outputFormat == OutputJSON {
|
|
errorResult := map[string]interface{}{
|
|
"error": err.Error(),
|
|
"query": query,
|
|
}
|
|
jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
|
|
fmt.Println(string(jsonBytes))
|
|
} else {
|
|
fmt.Printf("Error: %v\n", err)
|
|
}
|
|
return false
|
|
}
|
|
|
|
if result.Error != nil {
|
|
if ctx.outputFormat == OutputJSON {
|
|
errorResult := map[string]interface{}{
|
|
"error": result.Error.Error(),
|
|
"query": query,
|
|
}
|
|
jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
|
|
fmt.Println(string(jsonBytes))
|
|
} else {
|
|
fmt.Printf("Query Error: %v\n", result.Error)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Display results in the specified format
|
|
switch ctx.outputFormat {
|
|
case OutputTable:
|
|
displayTableResult(result)
|
|
case OutputJSON:
|
|
displayJSONResult(result)
|
|
case OutputCSV:
|
|
displayCSVResult(result)
|
|
}
|
|
|
|
// Show execution time for interactive/table mode
|
|
if showTiming && ctx.outputFormat == OutputTable {
|
|
elapsed := time.Since(startTime)
|
|
fmt.Printf("\n(%d rows in set, %.3f sec)\n\n", len(result.Rows), elapsed.Seconds())
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// displayTableResult formats and displays query results in ASCII table format
|
|
func displayTableResult(result *engine.QueryResult) {
|
|
if len(result.Columns) == 0 {
|
|
fmt.Println("Empty result set")
|
|
return
|
|
}
|
|
|
|
// Calculate column widths for formatting
|
|
colWidths := make([]int, len(result.Columns))
|
|
for i, col := range result.Columns {
|
|
colWidths[i] = len(col)
|
|
}
|
|
|
|
// Check data for wider columns
|
|
for _, row := range result.Rows {
|
|
for i, val := range row {
|
|
if i < len(colWidths) {
|
|
valStr := val.ToString()
|
|
if len(valStr) > colWidths[i] {
|
|
colWidths[i] = len(valStr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Print header separator
|
|
fmt.Print("+")
|
|
for _, width := range colWidths {
|
|
fmt.Print(strings.Repeat("-", width+2) + "+")
|
|
}
|
|
fmt.Println()
|
|
|
|
// Print column headers
|
|
fmt.Print("|")
|
|
for i, col := range result.Columns {
|
|
fmt.Printf(" %-*s |", colWidths[i], col)
|
|
}
|
|
fmt.Println()
|
|
|
|
// Print separator
|
|
fmt.Print("+")
|
|
for _, width := range colWidths {
|
|
fmt.Print(strings.Repeat("-", width+2) + "+")
|
|
}
|
|
fmt.Println()
|
|
|
|
// Print data rows
|
|
for _, row := range result.Rows {
|
|
fmt.Print("|")
|
|
for i, val := range row {
|
|
if i < len(colWidths) {
|
|
fmt.Printf(" %-*s |", colWidths[i], val.ToString())
|
|
}
|
|
}
|
|
fmt.Println()
|
|
}
|
|
|
|
// Print bottom separator
|
|
fmt.Print("+")
|
|
for _, width := range colWidths {
|
|
fmt.Print(strings.Repeat("-", width+2) + "+")
|
|
}
|
|
fmt.Println()
|
|
}
|
|
|
|
// displayJSONResult outputs query results in JSON format
|
|
func displayJSONResult(result *engine.QueryResult) {
|
|
// Convert result to JSON-friendly format
|
|
jsonResult := map[string]interface{}{
|
|
"columns": result.Columns,
|
|
"rows": make([]map[string]interface{}, len(result.Rows)),
|
|
"count": len(result.Rows),
|
|
}
|
|
|
|
// Convert rows to JSON objects
|
|
for i, row := range result.Rows {
|
|
rowObj := make(map[string]interface{})
|
|
for j, val := range row {
|
|
if j < len(result.Columns) {
|
|
rowObj[result.Columns[j]] = val.ToString()
|
|
}
|
|
}
|
|
jsonResult["rows"].([]map[string]interface{})[i] = rowObj
|
|
}
|
|
|
|
// Marshal and print JSON
|
|
jsonBytes, err := json.MarshalIndent(jsonResult, "", " ")
|
|
if err != nil {
|
|
fmt.Printf("Error formatting JSON: %v\n", err)
|
|
return
|
|
}
|
|
|
|
fmt.Println(string(jsonBytes))
|
|
}
|
|
|
|
// displayCSVResult outputs query results in CSV format
|
|
func displayCSVResult(result *engine.QueryResult) {
|
|
writer := csv.NewWriter(os.Stdout)
|
|
defer writer.Flush()
|
|
|
|
// Write headers
|
|
if err := writer.Write(result.Columns); err != nil {
|
|
fmt.Printf("Error writing CSV headers: %v\n", err)
|
|
return
|
|
}
|
|
|
|
// Write data rows
|
|
for _, row := range result.Rows {
|
|
csvRow := make([]string, len(row))
|
|
for i, val := range row {
|
|
csvRow[i] = val.ToString()
|
|
}
|
|
if err := writer.Write(csvRow); err != nil {
|
|
fmt.Printf("Error writing CSV row: %v\n", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func showEnhancedHelp() {
|
|
fmt.Println(`SeaweedFS Enhanced SQL Interface Help:
|
|
|
|
METADATA OPERATIONS:
|
|
SHOW DATABASES; - List all MQ namespaces
|
|
SHOW TABLES; - List all topics in current namespace
|
|
SHOW TABLES FROM database; - List topics in specific namespace
|
|
DESCRIBE table_name; - Show table schema
|
|
|
|
ADVANCED QUERYING:
|
|
SELECT * FROM table_name; - Query all data
|
|
SELECT col1, col2 FROM table WHERE ...; - Column projection
|
|
SELECT * FROM table WHERE id <= 100; - Range filtering
|
|
SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching
|
|
SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value
|
|
SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions
|
|
|
|
QUERY ANALYSIS:
|
|
EXPLAIN SELECT ...; - Show hierarchical execution plan
|
|
(data sources, optimizations, timing)
|
|
|
|
DDL OPERATIONS:
|
|
CREATE TABLE topic (field1 INT, field2 STRING); - Create topic
|
|
Note: ALTER TABLE and DROP TABLE are not supported
|
|
|
|
SPECIAL COMMANDS:
|
|
USE database_name; - Switch database context
|
|
\format table|json|csv - Change output format
|
|
help; - Show this help
|
|
exit; or quit; or \q - Exit interface
|
|
|
|
EXTENDED WHERE OPERATORS:
|
|
=, <, >, <=, >= - Comparison operators
|
|
!=, <> - Not equal operators
|
|
LIKE 'pattern%' - Pattern matching (% = any chars, _ = single char)
|
|
IN (value1, value2, ...) - Multi-value matching
|
|
AND, OR - Logical operators
|
|
|
|
EXAMPLES:
|
|
SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted';
|
|
SELECT username FROM users WHERE email LIKE '%@company.com';
|
|
SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01';
|
|
EXPLAIN SELECT MAX(id) FROM events; -- View execution plan
|
|
|
|
Current Status: Full WHERE clause support + Real MQ integration`)
|
|
}
|