* feat: Phase 1 - Add SQL query engine foundation for MQ topics Implements core SQL infrastructure with metadata operations: New Components: - SQL parser integration using github.com/xwb1989/sqlparser - Query engine framework in weed/query/engine/ - Schema catalog mapping MQ topics to SQL tables - Interactive SQL CLI command 'weed sql' Supported Operations: - SHOW DATABASES (lists MQ namespaces) - SHOW TABLES (lists MQ topics) - SQL statement parsing and routing - Error handling and result formatting Key Design Decisions: - MQ namespaces ↔ SQL databases - MQ topics ↔ SQL tables - Parquet message storage ready for querying - Backward-compatible schema evolution support Testing: - Unit tests for core engine functionality - Command integration tests - Parse error handling validation Assumptions (documented in code): - All MQ messages stored in Parquet format - Schema evolution maintains backward compatibility - MySQL-compatible SQL syntax via sqlparser - Single-threaded usage per SQL session Next Phase: DDL operations (CREATE/ALTER/DROP TABLE) * feat: Phase 2 - Add DDL operations and real MQ broker integration Implements comprehensive DDL support for MQ topic management: New Components: - Real MQ broker connectivity via BrokerClient - CREATE TABLE → ConfigureTopic gRPC calls - DROP TABLE → DeleteTopic operations - DESCRIBE table → Schema introspection - SQL type mapping (SQL ↔ MQ schema types) Enhanced Features: - Live topic discovery from MQ broker - Fallback to cached/sample data when broker unavailable - MySQL-compatible DESCRIBE output - Schema validation and error handling - CREATE TABLE with column definitions Key Infrastructure: - broker_client.go: gRPC communication with MQ broker - sql_types.go: Bidirectional SQL/MQ type conversion - describe.go: Table schema introspection - Enhanced engine.go: Full DDL routing and execution Supported SQL Operations: ✅ SHOW DATABASES, SHOW TABLES (live + fallback) ✅ CREATE TABLE table_name (col1 INT, col2 VARCHAR(50), ...) ✅ DROP TABLE table_name ✅ DESCRIBE table_name / SHOW COLUMNS FROM table_name Known Limitations: - SQL parser issues with reserved keywords (e.g., 'timestamp') - Requires running MQ broker for full functionality - ALTER TABLE not yet implemented - DeleteTopic method needs broker-side implementation Architecture Decisions: - Broker discovery via filer lock mechanism (same as shell commands) - Graceful fallback when broker unavailable - ConfigureTopic for CREATE TABLE with 6 default partitions - Schema versioning ready for ALTER TABLE support Testing: - Unit tests updated with filer address parameter - Integration tests for DDL operations - Error handling for connection failures Next Phase: SELECT query execution with Parquet scanning * fix: Resolve High Priority TODOs - Real MQ Broker Integration ✅ COMPLETED HIGH PRIORITY TODOs: 🔧 **Real FilerClient Integration** (engine.go:131) - Implemented GetFilerClient() method in BrokerClient - Added filerClientImpl with full FilerClient interface compliance - Added AdjustedUrl() and GetDataCenter() methods - Real filerClient connection replaces nil fallback 🔧 **Partition Discovery via MQ Broker** (hybrid_message_scanner.go:116) - Added ListTopicPartitions() method using topic configuration - Implemented discoverTopicPartitions() in HybridMessageScanner - Reads actual partition count from BrokerPartitionAssignments - Generates proper partition ranges based on topic.PartitionCount 📋 **Technical Fixes:** - Fixed compilation errors with undefined variables - Proper error handling with filerClientErr variable - Corrected ConfigureTopicResponse field usage (BrokerPartitionAssignments vs PartitionCount) - Complete FilerClient interface implementation 🎯 **Impact:** - SQL engine now connects to real MQ broker infrastructure - Actual topic partition discovery instead of hardcoded defaults - Production-ready broker integration with graceful fallbacks - Maintains backward compatibility with sample data when broker unavailable ✅ All tests passing - High priority TODO resolution complete! Next: Schema-aware message parsing and time filter optimization. * feat: Time Filter Extraction - Complete Performance Optimization ✅ FOURTH HIGH PRIORITY TODO COMPLETED! ⏰ **Time Filter Extraction & Push-Down Optimization** (engine.go:198-199) - Replaced hardcoded StartTimeNs=0, StopTimeNs=0 with intelligent extraction - Added extractTimeFilters() with recursive WHERE clause analysis - Smart time column detection (\_timestamp_ns, created_at, timestamp, etc.) - Comprehensive time value parsing (nanoseconds, ISO dates, datetime formats) - Operator reversal handling (column op value vs value op column) 🧠 **Intelligent WHERE Clause Processing:** - AND expressions: Combine time bounds (intersection) ✅ - OR expressions: Skip extraction (safety) ✅ - Parentheses: Recursive unwrapping ✅ - Comparison operators: >, >=, <, <=, = ✅ - Multiple time formats: nanoseconds, RFC3339, date-only, datetime ✅ 🚀 **Performance Impact:** - Push-down filtering to hybrid scanner level - Reduced data scanning at source (live logs + Parquet files) - Time-based partition pruning potential - Significant performance gains for time-series queries 📊 **Comprehensive Testing (21 tests passing):** - ✅ Time filter extraction (6 test scenarios) - ✅ Time column recognition (case-insensitive) - ✅ Time value parsing (5 formats) - ✅ Full integration with SELECT queries - ✅ Backward compatibility maintained 💡 **Real-World Query Examples:** Before: Scans ALL data, filters in memory SELECT * FROM events WHERE \_timestamp_ns > 1672531200000000000; After: Scans ONLY relevant time range at source level → StartTimeNs=1672531200000000000, StopTimeNs=0 → Massive performance improvement for large datasets! 🎯 **Production Ready Features:** - Multiple time column formats supported - Graceful fallbacks for invalid dates - OR clause safety (avoids incorrect optimization) - Comprehensive error handling **ALL MEDIUM PRIORITY TODOs NOW READY FOR NEXT PHASEtest ./weed/query/engine/ -v* 🎉 * feat: Extended WHERE Operators - Complete Advanced Filtering ✅ **EXTENDED WHERE OPERATORS IMPLEMENTEDtest ./weed/query/engine/ -v | grep -E PASS * feat: Enhanced SQL CLI Experience ✅ COMPLETE ENHANCED CLI IMPLEMENTATION: 🚀 **Multiple Execution Modes:** - Interactive shell with enhanced prompts and context - Single query execution: --query 'SQL' --output format - Batch file processing: --file queries.sql --output csv - Database context switching: --database dbname 📊 **Multi-Format Output:** - Table format (ASCII) - default for interactive - JSON format - structured data for programmatic use - CSV format - spreadsheet-friendly output - Smart auto-detection based on execution mode ⚙️ **Enhanced Interactive Shell:** - Database context switching: USE database_name; - Output format switching: \format table|json|csv - Command history tracking (basic implementation) - Enhanced help with WHERE operator examples - Contextual prompts: seaweedfs:dbname> 🛠️ **Production Features:** - Comprehensive error handling (JSON + user-friendly) - Query execution timing and performance metrics - 30-second timeout protection with graceful handling - Real MQ integration with hybrid data scanning 📖 **Complete CLI Interface:** - Full flag support: --server, --interactive, --file, --output, --database, --query - Auto-detection of execution mode and output format - Structured help system with practical examples - Batch processing with multi-query file support 💡 **Advanced WHERE Integration:** All extended operators (<=, >=, !=, LIKE, IN) fully supported across all execution modes and output formats. 🎯 **Usage Examples:** - weed sql --interactive - weed sql --query 'SHOW DATABASES' --output json - weed sql --file queries.sql --output csv - weed sql --database analytics --interactive Enhanced CLI experience complete - production ready! 🚀 * Delete test_utils_test.go * fmt * integer conversion * show databases works * show tables works * Update describe.go * actual column types * Update .gitignore * scan topic messages * remove emoji * support aggregation functions * column name case insensitive, better auto column names * fmt * fix reading system fields * use parquet statistics for optimization * remove emoji * parquet file generate stats * scan all files * parquet file generation remember the sources also * fmt * sql * truncate topic * combine parquet results with live logs * explain * explain the execution plan * add tests * improve tests * skip * use mock for testing * add tests * refactor * fix after refactoring * detailed logs during explain. Fix bugs on reading live logs. * fix decoding data * save source buffer index start for log files * process buffer from brokers * filter out already flushed messages * dedup with buffer start index * explain with broker buffer * the parquet file should also remember the first buffer_start attribute from the sources * parquet file can query messages in broker memory, if log files do not exist * buffer start stored as 8 bytes * add jdbc * add postgres protocol * Revert "add jdbc" This reverts commit |
3 months ago | |
|---|---|---|
| .. | ||
| agent | Accumulated changes for message queue (#6600) | 9 months ago |
| broker | Message Queue: Add sql querying (#7185) | 3 months ago |
| client | convert error fromating to %w everywhere (#6995) | 5 months ago |
| logstore | Message Queue: Add sql querying (#7185) | 3 months ago |
| pub_balancer | S3 API: Advanced IAM System (#7160) | 4 months ago |
| schema | Message Queue: Add sql querying (#7185) | 3 months ago |
| segment | Squashed commit of the following: | 2 years ago |
| sub_coordinator | Message Queue: Add sql querying (#7185) | 3 months ago |
| topic | Message Queue: Add sql querying (#7185) | 3 months ago |
| README.md | scaffold | 3 years ago |
README.md
SeaweedMQ Message Queue on SeaweedFS (WIP, not ready)
What are the use cases it is designed for?
Message queues are like water pipes. Messages flow in the pipes to their destinations.
However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, and watch the traffic level closely.
Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. You may want to ensure enough brokers to handle the data and reduce them later to cut cost.
SeaweedMQ is designed for use cases that need to:
- Receive and save large number of messages.
- Handle spike traffic automatically.
What is special about SeaweedMQ?
- Separate computation and storage nodes to scale independently.
- Unlimited storage space by adding volume servers.
- Unlimited message brokers to handle incoming messages.
- Offline messages can be operated as normal files.
- Scale up and down with auto split and merge message topics.
- Topics can automatically split into segments when traffic increases, and vice verse.
- Pass messages by reference instead of copying.
- Clients can optionally upload the messages first and just submit the references.
- Drastically reduce the broker load.
- Stateless brokers
- All brokers are equal. One broker is dynamically picked as the leader.
- Add brokers at any time.
- Allow rolling restart brokers or remove brokers at a pace.
Design
How it works?
Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. Among all the brokers, one of them will be selected as the leader by the masters.
A topic needs to define its partition key on its messages.
Messages for a topic are divided into segments. One segment can cover a range of partitions. A segment can be split into 2 segments, or 2 neighboring segments can be merged back to one segment.
During write time, the client will ask the broker leader for a few brokers to process the segment.
The broker leader will check whether the segment already has assigned the brokers. If not, select a few brokers based on their loads, save the selection into filer, and tell the client.
The client will write the messages for this segment to the selected brokers.
Failover
The broker leader does not contain any state. If it fails, the masters will select a different broker.
For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages to the filer, and close the segment to the clients.
Then the clients should start a new segment. The masters should assign other healthy brokers to handle the new segment.
So any brokers can go down without losing data.
Auto Split or Merge
(The idea is learned from Pravega.)
The brokers should report its traffic load to the broker leader periodically.
If any segment has too much load, the broker leader will ask the brokers to tell the client to close current one and create two new segments.
If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask the brokers to tell the client to close this 2 segments and create a new segment.