From e3798c2ec91a644edea10f01dc330abc1de2a504 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 1 Sep 2025 16:12:10 -0700 Subject: [PATCH] sql --- weed/command/sql.go | 86 ++++++++++++++------- weed/query/engine/engine.go | 20 +++-- weed/query/engine/hybrid_message_scanner.go | 5 +- 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/weed/command/sql.go b/weed/command/sql.go index baf179613..743bb42ca 100644 --- a/weed/command/sql.go +++ b/weed/command/sql.go @@ -244,36 +244,63 @@ func runInteractiveShell(ctx *SQLContext) bool { lineStr := strings.TrimSpace(input) - // Handle special commands - if lineStr == "exit;" || lineStr == "quit;" || lineStr == "\\q" { + // Handle empty lines + if lineStr == "" { + continue + } + + // Accumulate lines in query buffer + if queryBuffer.Len() > 0 { + queryBuffer.WriteString(" ") + } + queryBuffer.WriteString(lineStr) + + // Check if we have a complete statement (ends with semicolon or special command) + fullQuery := strings.TrimSpace(queryBuffer.String()) + isComplete := strings.HasSuffix(lineStr, ";") || + isSpecialCommand(fullQuery) + + if !isComplete { + continue // Continue reading more lines + } + + // Add completed command to history + line.AppendHistory(fullQuery) + + // Handle special commands (with or without semicolon) + cleanQuery := strings.TrimSuffix(fullQuery, ";") + cleanQuery = strings.TrimSpace(cleanQuery) + + if cleanQuery == "exit" || cleanQuery == "quit" || cleanQuery == "\\q" { fmt.Println("Goodbye!") break } - if lineStr == "help;" { + if cleanQuery == "help" { showEnhancedHelp() + queryBuffer.Reset() continue } // Handle database switching - upperLine := strings.ToUpper(lineStr) - if strings.HasPrefix(upperLine, "USE ") { + upperQuery := strings.ToUpper(cleanQuery) + if strings.HasPrefix(upperQuery, "USE ") { // Extract database name preserving original case - parts := strings.SplitN(lineStr, " ", 2) + parts := strings.SplitN(cleanQuery, " ", 2) if len(parts) >= 2 { dbName := strings.TrimSpace(parts[1]) - dbName = strings.TrimSuffix(dbName, ";") ctx.currentDatabase = dbName // Also update the SQL engine's catalog current database ctx.engine.GetCatalog().SetCurrentDatabase(dbName) - fmt.Printf("Database changed to: %s\n\n", strings.ToUpper(dbName)) + fmt.Printf("Database changed to: %s\n\n", dbName) + queryBuffer.Reset() continue } } // Handle output format switching - if strings.HasPrefix(strings.ToUpper(lineStr), "\\FORMAT ") { - format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(lineStr), "\\FORMAT ")) + if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") { + format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")) switch format { case "TABLE": ctx.outputFormat = OutputTable @@ -287,34 +314,39 @@ func runInteractiveShell(ctx *SQLContext) bool { default: fmt.Printf("Invalid format: %s. Supported: table, json, csv\n", format) } + queryBuffer.Reset() continue } - if lineStr == "" { - continue - } + // Execute SQL query (without semicolon) + executeAndDisplay(ctx, cleanQuery, true) - // Accumulate multi-line queries - queryBuffer.WriteString(lineStr) - queryBuffer.WriteString(" ") + // Reset buffer for next query + queryBuffer.Reset() + } - // Execute when query ends with semicolon - if strings.HasSuffix(lineStr, ";") { - fullQuery := strings.TrimSpace(queryBuffer.String()) - query := strings.TrimSuffix(fullQuery, ";") // Remove trailing semicolon for execution + return true +} - // Add to history with semicolon (as user actually typed it) - line.AppendHistory(fullQuery) +// isSpecialCommand checks if a command is a special command that doesn't require semicolon +func isSpecialCommand(query string) bool { + cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";") + cleanQuery = strings.ToLower(cleanQuery) - // Execute query (without semicolon) - executeAndDisplay(ctx, query, true) + // Special commands that work with or without semicolon + specialCommands := []string{ + "exit", "quit", "\\q", "help", + } - // Reset buffer for next query - queryBuffer.Reset() + for _, cmd := range specialCommands { + if cleanQuery == cmd { + return true } } - return true + // Commands that start with specific prefixes + return strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") || + strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") } // executeAndDisplay executes a query and displays the result in the specified format diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index ce96e9dfb..782fdb268 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -397,9 +397,6 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu for columnName := range columnSet { columns = append(columns, columnName) } - - // Add metadata columns showing data source - columns = append(columns, "_source") } // Convert to SQL rows @@ -407,12 +404,19 @@ func convertHybridResultsToSQL(results []HybridScanResult, columns []string) *Qu for i, result := range results { row := make([]sqltypes.Value, len(columns)) for j, columnName := range columns { - if columnName == "_source" { + switch columnName { + case "_source": row[j] = sqltypes.NewVarChar(result.Source) - } else if value, exists := result.Values[columnName]; exists { - row[j] = convertSchemaValueToSQL(value) - } else { - row[j] = sqltypes.NULL + case "_timestamp_ns": + row[j] = sqltypes.NewInt64(result.Timestamp) + case "_key": + row[j] = sqltypes.NewVarBinary(string(result.Key)) + default: + if value, exists := result.Values[columnName]; exists { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } } } rows[i] = row diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 5bcb65534..fb6b84665 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -580,9 +580,6 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, for columnName := range columnSet { columns = append(columns, columnName) } - - // Add metadata columns for debugging - columns = append(columns, "_source", "_timestamp_ns") } // Convert to SQL rows @@ -595,6 +592,8 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, row[j] = sqltypes.NewVarChar(result.Source) case "_timestamp_ns": row[j] = sqltypes.NewInt64(result.Timestamp) + case "_key": + row[j] = sqltypes.NewVarBinary(string(result.Key)) default: if value, exists := result.Values[columnName]; exists { row[j] = convertSchemaValueToSQL(value)