diff --git a/ENHANCED_CLI_FEATURES.md b/ENHANCED_CLI_FEATURES.md deleted file mode 100644 index d428d2ddd..000000000 --- a/ENHANCED_CLI_FEATURES.md +++ /dev/null @@ -1,205 +0,0 @@ -# SeaweedFS Enhanced SQL CLI Features - -## 🚀 **ENHANCED CLI EXPERIENCE IMPLEMENTED!** - -### ✅ **NEW EXECUTION MODES** - -#### **Interactive Mode (Enhanced)** -```bash -weed sql --interactive -# or -weed sql # defaults to interactive if no other options -``` -**Features:** -- 🎯 Database context switching: `USE database_name;` -- 🔄 Output format switching: `\format table|json|csv` -- 📝 Command history (basic implementation) -- 🌟 Enhanced prompts with current database context -- ✨ Improved help with advanced WHERE operator examples - -#### **Single Query Mode** -```bash -weed sql --query "SHOW DATABASES" --output json -weed sql --database analytics --query "SELECT COUNT(*) FROM metrics" -``` - -#### **Batch File Processing** -```bash -weed sql --file queries.sql --output csv -weed sql --file batch_queries.sql --output json -``` - -### ✅ **MULTIPLE OUTPUT FORMATS** - -#### **Table Format (ASCII)** -```bash -weed sql --query "SHOW DATABASES" --output table -``` -``` -+-----------+ -| Database | -+-----------+ -| analytics | -| user_data | -+-----------+ -``` - -#### **JSON Format** -```bash -weed sql --query "SHOW DATABASES" --output json -``` -```json -{ - "columns": ["Database"], - "count": 2, - "rows": [ - {"Database": "analytics"}, - {"Database": "user_data"} - ] -} -``` - -#### **CSV Format** -```bash -weed sql --query "SHOW DATABASES" --output csv -``` -```csv -Database -analytics -user_data -``` - -### ✅ **SMART FORMAT AUTO-DETECTION** - -- **Interactive mode:** Defaults to `table` format for readability -- **Non-interactive mode:** Defaults to `json` format for programmatic use -- **Override with `--output` flag:** Always respected - -### ✅ **DATABASE CONTEXT SWITCHING** - -#### **Command Line Context** -```bash -weed sql --database analytics --interactive -# Starts with analytics database pre-selected -``` - -#### **Interactive Context Switching** -```sql -seaweedfs> USE analytics; -Database changed to: analytics - -seaweedfs:analytics> SHOW TABLES; --- Shows tables in analytics database -``` - -### ✅ **COMPREHENSIVE HELP SYSTEM** - -#### **Enhanced Help Command** -```sql -seaweedfs> help; -``` -Shows: -- 📊 **Metadata Operations:** SHOW, DESCRIBE commands -- 🔍 **Advanced Querying:** Full WHERE clause support -- 📝 **DDL Operations:** CREATE, DROP TABLE -- ⚙️ **Special Commands:** USE, \format, help, exit -- 🎯 **Extended WHERE Operators:** All supported operators with examples -- 💡 **Real Examples:** Practical query demonstrations - -### ✅ **ADVANCED WHERE CLAUSE SUPPORT** - -All implemented and working in CLI: - -```sql --- Comparison operators -SELECT * FROM events WHERE user_id <= 100; -SELECT * FROM events WHERE timestamp >= '2023-01-01'; - --- Not equal operators -SELECT * FROM events WHERE status != 'deleted'; -SELECT * FROM events WHERE status <> 'inactive'; - --- Pattern matching -SELECT * FROM events WHERE username LIKE 'admin%'; -SELECT * FROM events WHERE email LIKE '%@company.com'; - --- Multi-value matching -SELECT * FROM events WHERE status IN ('active', 'pending', 'verified'); -SELECT * FROM events WHERE user_id IN (1, 5, 10, 25); - --- Complex combinations -SELECT * FROM events -WHERE user_id >= 10 - AND status != 'deleted' - AND username LIKE 'test%' - AND user_type IN ('premium', 'enterprise'); -``` - -## 🛠️ **COMMAND LINE INTERFACE** - -### **Complete Flag Reference** -```bash -weed sql [flags] - -FLAGS: - -server string SeaweedFS server address (default "localhost:8888") - -interactive Start interactive shell mode - -file string Execute SQL queries from file - -output string Output format: table, json, csv (auto-detected if not specified) - -database string Default database context - -query string Execute single SQL query - -help Show help message -``` - -### **Usage Examples** -```bash -# Interactive shell -weed sql --interactive -weed sql --database analytics --interactive - -# Single query execution -weed sql --query "SHOW DATABASES" --output json -weed sql --query "SELECT * FROM user_events WHERE user_id <= 100" --output table - -# Batch processing -weed sql --file queries.sql --output csv -weed sql --file analytics_queries.sql --output json - -# Context switching -weed sql --database analytics --query "SHOW TABLES" -weed sql --server broker1:8888 --interactive -``` - -## 📊 **PRODUCTION READY FEATURES** - -### ✅ **Error Handling** -- **JSON Error Format:** Structured error responses for programmatic use -- **User-Friendly Errors:** Clear error messages for interactive use -- **Query Validation:** Comprehensive SQL parsing error reporting - -### ✅ **Performance Features** -- **Execution Timing:** Query performance metrics in table mode -- **Streaming Results:** Efficient handling of large result sets -- **Timeout Protection:** 30-second query timeout with graceful handling - -### ✅ **Integration Features** -- **Real MQ Discovery:** Dynamic namespace and topic discovery -- **Hybrid Data Scanning:** Live logs + archived Parquet files -- **Schema-Aware Parsing:** Intelligent message interpretation -- **Zero Fallback Data:** Pure real MQ data discovery - -## 🎯 **DEMONSTRATION** - -Run the complete demo: -```bash -./enhanced_cli_demo.sh -``` - -**Demo covers:** -- Single query mode with all output formats -- Batch file processing -- Database context switching -- Advanced WHERE operators (LIKE, IN, <=, >=, !=) -- Real data scanning from hybrid sources - -**All enhanced CLI features are production-ready and fully tested!** 🚀 diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 701133e38..c2d693db1 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -6,10 +6,13 @@ import ( "fmt" "strings" + "google.golang.org/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) @@ -61,20 +64,56 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage // Stream messages from LogBuffer with filtering messageCount := 0 startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) - _, _, err = localPartition.LogBuffer.LoopProcessLogData("sql_query_stream", startPosition, 0, - func() bool { return false }, // Don't wait for more data - func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + + // Create a custom LoopProcessLogData function that captures the batch index + // Since we can't modify the existing EachLogEntryFuncType signature, + // we'll implement our own iteration logic based on LoopProcessLogData + var lastReadPosition = startPosition + var isDone bool + + for !isDone { + // Use ReadFromBuffer to get the next batch with its correct batch index + bytesBuf, batchIndex, err := localPartition.LogBuffer.ReadFromBuffer(lastReadPosition) + if err == log_buffer.ResumeFromDiskError { + break + } + if err != nil { + return err + } + + // If no more data in memory, we're done + if bytesBuf == nil { + break + } + + // Process all messages in this batch + buf := bytesBuf.Bytes() + for pos := 0; pos+4 < len(buf); { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + continue + } + + // Now we have the correct batchIndex for this message // Apply buffer index filtering if specified - currentBatchIndex := localPartition.LogBuffer.GetBatchIndex() - if startBufferIndex > 0 && currentBatchIndex < startBufferIndex { - glog.V(3).Infof("Skipping message from buffer index %d (< %d)", currentBatchIndex, startBufferIndex) - return false, nil + if startBufferIndex > 0 && batchIndex < startBufferIndex { + glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) + pos += 4 + int(size) + continue } // Check if this message is from a buffer range that's already been flushed - if b.isBufferIndexFlushed(currentBatchIndex, flushedBufferRanges) { - glog.V(3).Infof("Skipping message from flushed buffer index %d", currentBatchIndex) - return false, nil + if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { + glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) + pos += 4 + int(size) + continue } // Stream this message @@ -90,12 +129,18 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage if err != nil { glog.Errorf("Failed to stream message: %v", err) - return true, err // Stop streaming on error + isDone = true + break } messageCount++ - return false, nil - }) + lastReadPosition = log_buffer.NewMessagePosition(logEntry.TsNs, batchIndex) + pos += 4 + int(size) + } + + // Release the buffer back to the pool + localPartition.LogBuffer.ReleaseMemory(bytesBuf) + } // Handle collection errors if err != nil && err != log_buffer.ResumeFromDiskError {