Browse Source

address comments

pull/7185/head
chrislu 1 month ago
parent
commit
59ec4eb68a
  1. 205
      ENHANCED_CLI_FEATURES.md
  2. 71
      weed/mq/broker/broker_grpc_query.go

205
ENHANCED_CLI_FEATURES.md

@ -1,205 +0,0 @@
# SeaweedFS Enhanced SQL CLI Features
## 🚀 **ENHANCED CLI EXPERIENCE IMPLEMENTED!**
### ✅ **NEW EXECUTION MODES**
#### **Interactive Mode (Enhanced)**
```bash
weed sql --interactive
# or
weed sql # defaults to interactive if no other options
```
**Features:**
- 🎯 Database context switching: `USE database_name;`
- 🔄 Output format switching: `\format table|json|csv`
- 📝 Command history (basic implementation)
- 🌟 Enhanced prompts with current database context
- ✨ Improved help with advanced WHERE operator examples
#### **Single Query Mode**
```bash
weed sql --query "SHOW DATABASES" --output json
weed sql --database analytics --query "SELECT COUNT(*) FROM metrics"
```
#### **Batch File Processing**
```bash
weed sql --file queries.sql --output csv
weed sql --file batch_queries.sql --output json
```
### ✅ **MULTIPLE OUTPUT FORMATS**
#### **Table Format (ASCII)**
```bash
weed sql --query "SHOW DATABASES" --output table
```
```
+-----------+
| Database |
+-----------+
| analytics |
| user_data |
+-----------+
```
#### **JSON Format**
```bash
weed sql --query "SHOW DATABASES" --output json
```
```json
{
"columns": ["Database"],
"count": 2,
"rows": [
{"Database": "analytics"},
{"Database": "user_data"}
]
}
```
#### **CSV Format**
```bash
weed sql --query "SHOW DATABASES" --output csv
```
```csv
Database
analytics
user_data
```
### ✅ **SMART FORMAT AUTO-DETECTION**
- **Interactive mode:** Defaults to `table` format for readability
- **Non-interactive mode:** Defaults to `json` format for programmatic use
- **Override with `--output` flag:** Always respected
### ✅ **DATABASE CONTEXT SWITCHING**
#### **Command Line Context**
```bash
weed sql --database analytics --interactive
# Starts with analytics database pre-selected
```
#### **Interactive Context Switching**
```sql
seaweedfs> USE analytics;
Database changed to: analytics
seaweedfs:analytics> SHOW TABLES;
-- Shows tables in analytics database
```
### ✅ **COMPREHENSIVE HELP SYSTEM**
#### **Enhanced Help Command**
```sql
seaweedfs> help;
```
Shows:
- 📊 **Metadata Operations:** SHOW, DESCRIBE commands
- 🔍 **Advanced Querying:** Full WHERE clause support
- 📝 **DDL Operations:** CREATE, DROP TABLE
- ⚙️ **Special Commands:** USE, \format, help, exit
- 🎯 **Extended WHERE Operators:** All supported operators with examples
- 💡 **Real Examples:** Practical query demonstrations
### ✅ **ADVANCED WHERE CLAUSE SUPPORT**
All implemented and working in CLI:
```sql
-- Comparison operators
SELECT * FROM events WHERE user_id <= 100;
SELECT * FROM events WHERE timestamp >= '2023-01-01';
-- Not equal operators
SELECT * FROM events WHERE status != 'deleted';
SELECT * FROM events WHERE status <> 'inactive';
-- Pattern matching
SELECT * FROM events WHERE username LIKE 'admin%';
SELECT * FROM events WHERE email LIKE '%@company.com';
-- Multi-value matching
SELECT * FROM events WHERE status IN ('active', 'pending', 'verified');
SELECT * FROM events WHERE user_id IN (1, 5, 10, 25);
-- Complex combinations
SELECT * FROM events
WHERE user_id >= 10
AND status != 'deleted'
AND username LIKE 'test%'
AND user_type IN ('premium', 'enterprise');
```
## 🛠️ **COMMAND LINE INTERFACE**
### **Complete Flag Reference**
```bash
weed sql [flags]
FLAGS:
-server string SeaweedFS server address (default "localhost:8888")
-interactive Start interactive shell mode
-file string Execute SQL queries from file
-output string Output format: table, json, csv (auto-detected if not specified)
-database string Default database context
-query string Execute single SQL query
-help Show help message
```
### **Usage Examples**
```bash
# Interactive shell
weed sql --interactive
weed sql --database analytics --interactive
# Single query execution
weed sql --query "SHOW DATABASES" --output json
weed sql --query "SELECT * FROM user_events WHERE user_id <= 100" --output table
# Batch processing
weed sql --file queries.sql --output csv
weed sql --file analytics_queries.sql --output json
# Context switching
weed sql --database analytics --query "SHOW TABLES"
weed sql --server broker1:8888 --interactive
```
## 📊 **PRODUCTION READY FEATURES**
### ✅ **Error Handling**
- **JSON Error Format:** Structured error responses for programmatic use
- **User-Friendly Errors:** Clear error messages for interactive use
- **Query Validation:** Comprehensive SQL parsing error reporting
### ✅ **Performance Features**
- **Execution Timing:** Query performance metrics in table mode
- **Streaming Results:** Efficient handling of large result sets
- **Timeout Protection:** 30-second query timeout with graceful handling
### ✅ **Integration Features**
- **Real MQ Discovery:** Dynamic namespace and topic discovery
- **Hybrid Data Scanning:** Live logs + archived Parquet files
- **Schema-Aware Parsing:** Intelligent message interpretation
- **Zero Fallback Data:** Pure real MQ data discovery
## 🎯 **DEMONSTRATION**
Run the complete demo:
```bash
./enhanced_cli_demo.sh
```
**Demo covers:**
- Single query mode with all output formats
- Batch file processing
- Database context switching
- Advanced WHERE operators (LIKE, IN, <=, >=, !=)
- Real data scanning from hybrid sources
**All enhanced CLI features are production-ready and fully tested!** 🚀

71
weed/mq/broker/broker_grpc_query.go

@ -6,10 +6,13 @@ import (
"fmt"
"strings"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
@ -61,20 +64,56 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
// Stream messages from LogBuffer with filtering
messageCount := 0
startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
_, _, err = localPartition.LogBuffer.LoopProcessLogData("sql_query_stream", startPosition, 0,
func() bool { return false }, // Don't wait for more data
func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
// Create a custom LoopProcessLogData function that captures the batch index
// Since we can't modify the existing EachLogEntryFuncType signature,
// we'll implement our own iteration logic based on LoopProcessLogData
var lastReadPosition = startPosition
var isDone bool
for !isDone {
// Use ReadFromBuffer to get the next batch with its correct batch index
bytesBuf, batchIndex, err := localPartition.LogBuffer.ReadFromBuffer(lastReadPosition)
if err == log_buffer.ResumeFromDiskError {
break
}
if err != nil {
return err
}
// If no more data in memory, we're done
if bytesBuf == nil {
break
}
// Process all messages in this batch
buf := bytesBuf.Bytes()
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
break
}
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
continue
}
// Now we have the correct batchIndex for this message
// Apply buffer index filtering if specified
currentBatchIndex := localPartition.LogBuffer.GetBatchIndex()
if startBufferIndex > 0 && currentBatchIndex < startBufferIndex {
glog.V(3).Infof("Skipping message from buffer index %d (< %d)", currentBatchIndex, startBufferIndex)
return false, nil
if startBufferIndex > 0 && batchIndex < startBufferIndex {
glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
pos += 4 + int(size)
continue
}
// Check if this message is from a buffer range that's already been flushed
if b.isBufferIndexFlushed(currentBatchIndex, flushedBufferRanges) {
glog.V(3).Infof("Skipping message from flushed buffer index %d", currentBatchIndex)
return false, nil
if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
pos += 4 + int(size)
continue
}
// Stream this message
@ -90,12 +129,18 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
if err != nil {
glog.Errorf("Failed to stream message: %v", err)
return true, err // Stop streaming on error
isDone = true
break
}
messageCount++
return false, nil
})
lastReadPosition = log_buffer.NewMessagePosition(logEntry.TsNs, batchIndex)
pos += 4 + int(size)
}
// Release the buffer back to the pool
localPartition.LogBuffer.ReleaseMemory(bytesBuf)
}
// Handle collection errors
if err != nil && err != log_buffer.ResumeFromDiskError {

Loading…
Cancel
Save