12 KiB
SQL Query Engine Feature, Dev, and Test Plan
This document outlines the plan for adding SQL querying support to SeaweedFS, focusing on reading and analyzing data from Message Queue (MQ) topics and S3 objects.
Feature Plan
1. Goal
To provide a SQL querying interface for SeaweedFS, enabling analytics on existing MQ topics and S3 objects. This enables:
- Advanced querying with SELECT, WHERE, JOIN, aggregations on MQ topics
- Schema discovery and metadata operations (SHOW DATABASES, SHOW TABLES, DESCRIBE)
- In-place analytics on Parquet-stored messages without data movement
- Direct querying of S3 objects in various formats
2. Key Features
- Schema Discovery and Metadata (Priority 1):
SHOW DATABASES
- List all MQ namespacesSHOW TABLES
- List all topics in a namespaceDESCRIBE table_name
- Show topic schema details- Automatic schema detection from existing Parquet data
- Advanced Query Engine (Priority 1):
- Full
SELECT
support withWHERE
,ORDER BY
,LIMIT
,OFFSET
- Aggregation functions:
COUNT()
,SUM()
,AVG()
,MIN()
,MAX()
,GROUP BY
- Join operations between topics (leveraging Parquet columnar format)
- Window functions and advanced analytics
- Temporal queries with timestamp-based filtering
- Full
- S3 Select (Priority 2):
- Support for querying objects in standard data formats (CSV, JSON, Parquet)
- Queries executed directly on storage nodes to minimize data transfer
- User Interfaces:
- New API endpoint
/sql
for HTTP-based SQL execution - New CLI command
weed sql
with interactive shell mode - Optional: Web UI for query execution and result visualization
- New API endpoint
- Output Formats:
- JSON (default), CSV, Parquet for result sets
- Streaming results for large queries
- Pagination support for result navigation
Development Plan
1. Scaffolding & Dependencies
2. SQL Engine Architecture
- Schema Catalog:
- Leverage existing
weed/mq/schema/
infrastructure - Map MQ namespaces to "databases" and topics to "tables"
- Discover schema metadata from existing Parquet files
- Handle schema evolution in read operations
- Leverage existing
- Query Planner:
- Parse SQL statements using custom PostgreSQL parser
- Create optimized execution plans leveraging Parquet columnar format
- Push-down predicates to storage layer for efficient filtering
- Optimize aggregations using Parquet column statistics
- Support time-based filtering with intelligent time range extraction
- Query Executor:
- Utilize existing
weed/mq/logstore/
for Parquet reading - Implement streaming execution for large result sets
- Support parallel processing across topic partitions
- Handle schema evolution during query execution
- Utilize existing
3. Data Source Integration
- MQ Topic Connector (Primary):
- Build on existing
weed/mq/logstore/read_parquet_to_log.go
- Implement efficient Parquet scanning with predicate pushdown
- Support schema evolution and backward compatibility
- Handle partition-based parallelism for scalable queries
- Build on existing
- Schema Registry Integration:
- Extend
weed/mq/schema/schema.go
for SQL metadata operations - Read existing topic schemas for query planning
- Handle schema evolution during query execution
- Extend
- S3 Connector (Secondary):
- Reading data from S3 objects with CSV, JSON, and Parquet parsers
- Efficient streaming for large files with columnar optimizations
4. API & CLI Integration
- HTTP API Endpoint:
- Add
/sql
endpoint to Filer server following existing patterns inweed/server/filer_server.go
- Support both POST (for queries) and GET (for metadata operations)
- Include query result pagination and streaming
- Authentication and authorization integration
- Add
- CLI Command:
- New
weed sql
command with interactive shell mode (similar toweed shell
) - Support for script execution and result formatting
- Connection management for remote SeaweedFS clusters
- New
- gRPC API:
- Add SQL service to existing MQ broker gRPC interface
- Enable efficient query execution with streaming results
Example Usage Scenarios
Scenario 1: Schema Discovery and Metadata
-- List all namespaces (databases)
SHOW DATABASES;
-- List topics in a namespace
USE my_namespace;
SHOW TABLES;
-- View topic structure and discovered schema
DESCRIBE user_events;
Scenario 2: Data Querying
-- Basic filtering and projection
SELECT user_id, event_type, timestamp
FROM user_events
WHERE timestamp > 1640995200000
ORDER BY timestamp DESC
LIMIT 100;
-- Aggregation queries
SELECT event_type, COUNT(*) as event_count
FROM user_events
WHERE timestamp >= 1640995200000
GROUP BY event_type;
-- Cross-topic joins
SELECT u.user_id, u.event_type, p.product_name
FROM user_events u
JOIN product_catalog p ON u.product_id = p.id
WHERE u.event_type = 'purchase';
Scenario 3: Analytics & Monitoring
-- Time-series analysis
SELECT
DATE_TRUNC('hour', FROM_UNIXTIME(timestamp/1000)) as hour,
COUNT(*) as events_per_hour
FROM user_events
WHERE timestamp >= 1640995200000
GROUP BY hour
ORDER BY hour;
-- Real-time monitoring
SELECT event_type, AVG(response_time) as avg_response
FROM api_logs
WHERE timestamp >= UNIX_TIMESTAMP() - 3600
GROUP BY event_type
HAVING avg_response > 1000;
Architecture Overview
SQL Query Flow:
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Client │ │ SQL Parser │ │ Query Planner │ │ Execution │
│ (CLI/HTTP) │──→ │ PostgreSQL │──→ │ & Optimizer │──→ │ Engine │
│ │ │ (Custom) │ │ │ │ │
└─────────────┘ └──────────────┘ └─────────────────┘ └──────────────┘
│ │
▼ │
┌─────────────────────────────────────────────────┐│
│ Schema Catalog ││
│ • Namespace → Database mapping ││
│ • Topic → Table mapping ││
│ • Schema version management ││
└─────────────────────────────────────────────────┘│
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ MQ Storage Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Topic A │ │ Topic B │ │ Topic C │ │ ... │ │
│ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Key Design Decisions
1. SQL-to-MQ Mapping Strategy:
- MQ Namespaces ↔ SQL Databases
- MQ Topics ↔ SQL Tables
- Topic Partitions ↔ Table Shards (transparent to users)
- Schema Fields ↔ Table Columns
2. Schema Evolution Handling:
- Read schema version history from existing topic metadata
- Support backward-compatible queries across schema versions
- Automatic type coercion where possible during reads
- Clear error messages for incompatible data
3. Query Optimization:
- Leverage Parquet columnar format for projection pushdown
- Use topic partitioning for parallel query execution
- Implement predicate pushdown to minimize data scanning
- Cache frequently accessed schema metadata
5. Query Semantics:
- SELECT queries provide read-consistent snapshots of topic data
- Queries operate on immutable Parquet files for consistency
- No transactional guarantees across multiple topics
6. Performance Considerations:
- Prioritize read performance over write consistency
- Leverage MQ's natural partitioning for parallel queries
- Use Parquet metadata for query optimization
- Implement connection pooling and query caching
Implementation Phases
Phase 1: Core SQL Infrastructure (Weeks 1-3) ✅ COMPLETED
- Implemented custom PostgreSQL parser for optimal compatibility (no CGO dependencies)
- Created
weed/query/engine/
package with comprehensive SQL execution framework - Implemented metadata catalog mapping MQ topics to SQL tables
- Added
SHOW DATABASES
,SHOW TABLES
,DESCRIBE
commands with full PostgreSQL compatibility
Phase 2: Query Engine (Weeks 4-6)
SELECT
withWHERE
,ORDER BY
,LIMIT
,OFFSET
- Aggregation functions and
GROUP BY
- Basic joins between topics
- Predicate pushdown to Parquet layer
- Schema discovery from existing Parquet files
Phase 3: API & CLI Integration (Weeks 7-8)
- HTTP
/sql
endpoint implementation weed sql
CLI command with interactive mode- Result streaming and pagination
- Error handling and query optimization
Phase 4: Advanced Features (Weeks 9-10)
- Window functions and advanced analytics
- S3 object querying capabilities
- Performance optimizations
- Connection pooling and query caching
Test Plan
1. Unit Tests
- SQL Parser Tests: Validate parsing of all supported SELECT statements and metadata operations
- Schema Mapping Tests: Test topic-to-table conversion and schema discovery
- Query Planning Tests: Verify optimization and predicate pushdown logic
- Execution Engine Tests: Test query execution with various data patterns
- Edge Cases: Malformed queries, schema evolution in existing data, concurrent reads
2. Integration Tests
- End-to-End Workflow: Complete SQL querying operations against live SeaweedFS cluster
- Schema Discovery: Test automatic schema detection from existing Parquet data
- Multi-Topic Joins: Validate cross-topic query performance and correctness
- Large Dataset Tests: Performance validation with GB-scale Parquet data
- Concurrent Access: Multiple SQL query sessions operating simultaneously
3. Performance & Security Testing
- Query Performance: Benchmark latency for various query patterns
- Memory Usage: Monitor resource consumption during large result sets
- Scalability Tests: Performance across multiple partitions and topics
- SQL Injection Prevention: Security validation of parser and execution engine
- Fuzz Testing: Automated testing with malformed SQL inputs
Success Metrics
- Feature Completeness: Support for all specified SELECT operations and metadata commands
- Performance:
- Simple SELECT queries: < 100ms latency for single-table queries with up to 3 WHERE predicates on ≤ 100K records
- Complex queries: < 1s latency for queries involving aggregations (COUNT, SUM, MAX, MIN) on ≤ 1M records
- Time-range queries: < 500ms for timestamp-based filtering on ≤ 500K records within 24-hour windows
- Scalability: Handle topics with millions of messages efficiently