From afbe52f262aa77ebd5057585d570343089b7a21b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 23 Jan 2026 12:46:10 -0800 Subject: [PATCH] cleanup --- BUCKET_POLICY_ENGINE_INTEGRATION.md | 242 ---------------- DESIGN.md | 413 ---------------------------- SQL_FEATURE_PLAN.md | 145 ---------- 3 files changed, 800 deletions(-) delete mode 100644 BUCKET_POLICY_ENGINE_INTEGRATION.md delete mode 100644 DESIGN.md delete mode 100644 SQL_FEATURE_PLAN.md diff --git a/BUCKET_POLICY_ENGINE_INTEGRATION.md b/BUCKET_POLICY_ENGINE_INTEGRATION.md deleted file mode 100644 index 5b9eefe6e..000000000 --- a/BUCKET_POLICY_ENGINE_INTEGRATION.md +++ /dev/null @@ -1,242 +0,0 @@ -# Bucket Policy Engine Integration - Complete - -## Summary - -Successfully integrated the `policy_engine` package to evaluate bucket policies for **all requests** (both anonymous and authenticated). This provides comprehensive AWS S3-compatible bucket policy support. - -## What Changed - -### 1. **New File: `s3api_bucket_policy_engine.go`** -Created a wrapper around `policy_engine.PolicyEngine` to: -- Load bucket policies from filer entries -- Sync policies from the bucket config cache -- Evaluate policies for any request (bucket, object, action, principal) -- Return structured results (allowed, evaluated, error) - -### 2. **Modified: `s3api_server.go`** -- Added `policyEngine *BucketPolicyEngine` field to `S3ApiServer` struct -- Initialized the policy engine in `NewS3ApiServerWithStore()` -- Linked `IdentityAccessManagement` back to `S3ApiServer` for policy evaluation - -### 3. **Modified: `auth_credentials.go`** -- Added `s3ApiServer *S3ApiServer` field to `IdentityAccessManagement` struct -- Added `buildPrincipalARN()` helper to convert identities to AWS ARN format -- **Integrated bucket policy evaluation into the authentication flow:** - - Policies are now checked **before** IAM/identity-based permissions - - Explicit `Deny` in bucket policy blocks access immediately - - Explicit `Allow` in bucket policy grants access and **bypasses IAM checks** (enables cross-account access) - - If no policy exists, falls through to normal IAM checks - - Policy evaluation errors result in access denial (fail-close security) - -### 4. **Modified: `s3api_bucket_config.go`** -- Added policy engine sync when bucket configs are loaded -- Ensures policies are loaded into the engine for evaluation - -### 5. **Modified: `auth_credentials_subscribe.go`** -- Added policy engine sync when bucket metadata changes -- Keeps the policy engine up-to-date via event-driven updates - -## How It Works - -### Anonymous Requests -``` -1. Request comes in (no credentials) -2. Check ACL-based public access → if public, allow -3. Check bucket policy for anonymous ("*") access → if allowed, allow -4. Otherwise, deny -``` - -### Authenticated Requests (NEW!) -``` -1. Request comes in (with credentials) -2. Authenticate user → get Identity -3. Build principal ARN (e.g., "arn:aws:iam::123456:user/bob") -4. Check bucket policy: - - If DENY → reject immediately - - If ALLOW → grant access immediately (bypasses IAM checks) - - If no policy or no matching statements → continue to step 5 -5. Check IAM/identity-based permissions (only if not already allowed by bucket policy) -6. Allow or deny based on identity permissions -``` - -## Policy Evaluation Flow - -``` -┌─────────────────────────────────────────────────────────┐ -│ Request (GET /bucket/file) │ -└───────────────────────────┬─────────────────────────────┘ - │ - ┌───────────▼──────────┐ - │ Authenticate User │ - │ (or Anonymous) │ - └───────────┬──────────┘ - │ - ┌───────────▼──────────────────────────────┐ - │ Build Principal ARN │ - │ - Anonymous: "*" │ - │ - User: "arn:aws:iam::123456:user/bob" │ - └───────────┬──────────────────────────────┘ - │ - ┌───────────▼──────────────────────────────┐ - │ Evaluate Bucket Policy (PolicyEngine) │ - │ - Action: "s3:GetObject" │ - │ - Resource: "arn:aws:s3:::bucket/file" │ - │ - Principal: (from above) │ - └───────────┬──────────────────────────────┘ - │ - ┌─────────────┼─────────────┐ - │ │ │ - DENY │ ALLOW │ NO POLICY - │ │ │ - ▼ ▼ ▼ - Reject Request Grant Access Continue - │ - ┌───────────────────┘ - │ - ┌────────────▼─────────────┐ - │ IAM/Identity Check │ - │ (identity.canDo) │ - └────────────┬─────────────┘ - │ - ┌─────────┴─────────┐ - │ │ - ALLOW │ DENY │ - ▼ ▼ - Grant Access Reject Request -``` - -## Example Policies That Now Work - -### 1. **Public Read Access** (Anonymous) -```json -{ - "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": "*", - "Action": "s3:GetObject", - "Resource": "arn:aws:s3:::mybucket/*" - }] -} -``` -- Anonymous users can read all objects -- Authenticated users are also evaluated against this policy. If they don't match an explicit `Allow` for this action, they will fall back to their own IAM permissions - -### 2. **Grant Access to Specific User** (Authenticated) -```json -{ - "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": {"AWS": "arn:aws:iam::123456789012:user/bob"}, - "Action": ["s3:GetObject", "s3:PutObject"], - "Resource": "arn:aws:s3:::mybucket/shared/*" - }] -} -``` -- User "bob" can read/write objects in `/shared/` prefix -- Other users cannot (unless granted by their IAM policies) - -### 3. **Deny Access to Specific Path** (Both) -```json -{ - "Version": "2012-10-17", - "Statement": [{ - "Effect": "Deny", - "Principal": "*", - "Action": "s3:*", - "Resource": "arn:aws:s3:::mybucket/confidential/*" - }] -} -``` -- **No one** can access `/confidential/` objects -- Denies override all other allows (AWS policy evaluation rules) - -## Performance Characteristics - -### Policy Loading -- **Cold start**: Policy loaded from filer → parsed → compiled → cached -- **Warm path**: Policy retrieved from `BucketConfigCache` (already parsed) -- **Updates**: Event-driven sync via metadata subscription (real-time) - -### Policy Evaluation -- **Compiled policies**: Pre-compiled regex patterns and matchers -- **Pattern cache**: Regex patterns cached with LRU eviction (max 1000) -- **Fast path**: Common patterns (`*`, exact matches) optimized -- **Case sensitivity**: Actions case-insensitive, resources case-sensitive (AWS-compatible) - -### Overhead -- **Anonymous requests**: Minimal (policy already checked, now using compiled engine) -- **Authenticated requests**: ~1-2ms added for policy evaluation (compiled patterns) -- **No policy**: Near-zero overhead (quick indeterminate check) - -## Testing - -All tests pass: -```bash -✅ TestBucketPolicyValidationBasics -✅ TestPrincipalMatchesAnonymous -✅ TestActionToS3Action -✅ TestResourceMatching -✅ TestMatchesPatternRegexEscaping (security tests) -✅ TestActionMatchingCaseInsensitive -✅ TestResourceMatchingCaseSensitive -✅ All policy_engine package tests (30+ tests) -``` - -## Security Improvements - -1. **Regex Metacharacter Escaping**: Patterns like `*.json` properly match only files ending in `.json` (not `filexjson`) -2. **Case-Insensitive Actions**: S3 actions matched case-insensitively per AWS spec -3. **Case-Sensitive Resources**: Resource paths matched case-sensitively for security -4. **Pattern Cache Size Limit**: Prevents DoS attacks via unbounded cache growth -5. **Principal Validation**: Supports `[]string` for manually constructed policies - -## AWS Compatibility - -The implementation follows AWS S3 bucket policy evaluation rules: -1. **Explicit Deny** always wins (checked first) -2. **Explicit Allow** grants access (checked second) -3. **Default Deny** if no matching statements (implicit) -4. Bucket policies work alongside IAM policies (both are evaluated) - -## Files Changed - -``` -Modified: - weed/s3api/auth_credentials.go (+47 lines) - weed/s3api/auth_credentials_subscribe.go (+8 lines) - weed/s3api/s3api_bucket_config.go (+8 lines) - weed/s3api/s3api_server.go (+5 lines) - -New: - weed/s3api/s3api_bucket_policy_engine.go (115 lines) -``` - -## Migration Notes - -- **Backward Compatible**: Existing setups without bucket policies work unchanged -- **No Breaking Changes**: All existing ACL and IAM-based authorization still works -- **Additive Feature**: Bucket policies are an additional layer of authorization -- **Performance**: Minimal impact on existing workloads - -## Future Enhancements - -Potential improvements (not implemented yet): -- [ ] Condition support (IP address, time-based, etc.) - already in policy_engine -- [ ] Cross-account policies (different AWS accounts) -- [ ] Policy validation API endpoint -- [ ] Policy simulation/testing tool -- [ ] Metrics for policy evaluations (allow/deny counts) - -## Conclusion - -Bucket policies now work for **all requests** in SeaweedFS S3 API: -- ✅ Anonymous requests (public access) -- ✅ Authenticated requests (user-specific policies) -- ✅ High performance (compiled policies, caching) -- ✅ AWS-compatible (follows AWS evaluation rules) -- ✅ Secure (proper escaping, case sensitivity) - -The integration is complete, tested, and ready for use! - diff --git a/DESIGN.md b/DESIGN.md deleted file mode 100644 index d164467c3..000000000 --- a/DESIGN.md +++ /dev/null @@ -1,413 +0,0 @@ -# SeaweedFS Task Distribution System Design - -## Overview - -This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture. - -## System Architecture - -### High-Level Components - -``` -┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ -│ Master │◄──►│ Admin Server │◄──►│ Workers │ -│ │ │ │ │ │ -│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │ -│ - Shard Status │ │ - Task Assign │ │ - Progress │ -│ - Heartbeats │ │ - Progress Track │ │ - Error Report │ -└─────────────────┘ └──────────────────┘ └─────────────────┘ - │ │ │ - │ │ │ - ▼ ▼ ▼ -┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ -│ Volume Servers │ │ Volume Monitor │ │ Task Execution │ -│ │ │ │ │ │ -│ - Store Volumes │ │ - Health Check │ │ - EC Convert │ -│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │ -│ - Report Status │ │ - State Sync │ │ - Status Report │ -└─────────────────┘ └──────────────────┘ └─────────────────┘ -``` - -## 1. Admin Server Design - -### 1.1 Core Responsibilities - -- **Task Discovery**: Scan volumes to identify EC and vacuum candidates -- **Worker Management**: Track available workers and their capabilities -- **Task Assignment**: Match tasks to optimal workers -- **Progress Tracking**: Monitor in-progress tasks for capacity planning -- **State Reconciliation**: Sync with master server for volume state updates - -### 1.2 Task Discovery Engine - -```go -type TaskDiscoveryEngine struct { - masterClient MasterClient - volumeScanner VolumeScanner - taskDetectors map[TaskType]TaskDetector - scanInterval time.Duration -} - -type VolumeCandidate struct { - VolumeID uint32 - Server string - Collection string - TaskType TaskType - Priority TaskPriority - Reason string - DetectedAt time.Time - Parameters map[string]interface{} -} -``` - -**EC Detection Logic**: -- Find volumes >= 95% full and idle for > 1 hour -- Exclude volumes already in EC format -- Exclude volumes with ongoing operations -- Prioritize by collection and age - -**Vacuum Detection Logic**: -- Find volumes with garbage ratio > 30% -- Exclude read-only volumes -- Exclude volumes with recent vacuum operations -- Prioritize by garbage percentage - -### 1.3 Worker Registry & Management - -```go -type WorkerRegistry struct { - workers map[string]*Worker - capabilities map[TaskType][]*Worker - lastHeartbeat map[string]time.Time - taskAssignment map[string]*Task - mutex sync.RWMutex -} - -type Worker struct { - ID string - Address string - Capabilities []TaskType - MaxConcurrent int - CurrentLoad int - Status WorkerStatus - LastSeen time.Time - Performance WorkerMetrics -} -``` - -### 1.4 Task Assignment Algorithm - -```go -type TaskScheduler struct { - registry *WorkerRegistry - taskQueue *PriorityQueue - inProgressTasks map[string]*InProgressTask - volumeReservations map[uint32]*VolumeReservation -} - -// Worker Selection Criteria: -// 1. Has required capability (EC or Vacuum) -// 2. Available capacity (CurrentLoad < MaxConcurrent) -// 3. Best performance history for task type -// 4. Lowest current load -// 5. Geographically close to volume server (optional) -``` - -## 2. Worker Process Design - -### 2.1 Worker Architecture - -```go -type MaintenanceWorker struct { - id string - config *WorkerConfig - adminClient AdminClient - taskExecutors map[TaskType]TaskExecutor - currentTasks map[string]*RunningTask - registry *TaskRegistry - heartbeatTicker *time.Ticker - requestTicker *time.Ticker -} -``` - -### 2.2 Task Execution Framework - -```go -type TaskExecutor interface { - Execute(ctx context.Context, task *Task) error - EstimateTime(task *Task) time.Duration - ValidateResources(task *Task) error - GetProgress() float64 - Cancel() error -} - -type ErasureCodingExecutor struct { - volumeClient VolumeServerClient - progress float64 - cancelled bool -} - -type VacuumExecutor struct { - volumeClient VolumeServerClient - progress float64 - cancelled bool -} -``` - -### 2.3 Worker Capabilities & Registration - -```go -type WorkerCapabilities struct { - SupportedTasks []TaskType - MaxConcurrent int - ResourceLimits ResourceLimits - PreferredServers []string // Affinity for specific volume servers -} - -type ResourceLimits struct { - MaxMemoryMB int64 - MaxDiskSpaceMB int64 - MaxNetworkMbps int64 - MaxCPUPercent float64 -} -``` - -## 3. Task Lifecycle Management - -### 3.1 Task States - -```go -type TaskState string - -const ( - TaskStatePending TaskState = "pending" - TaskStateAssigned TaskState = "assigned" - TaskStateInProgress TaskState = "in_progress" - TaskStateCompleted TaskState = "completed" - TaskStateFailed TaskState = "failed" - TaskStateCancelled TaskState = "cancelled" - TaskStateStuck TaskState = "stuck" // Taking too long - TaskStateDuplicate TaskState = "duplicate" // Detected duplicate -) -``` - -### 3.2 Progress Tracking & Monitoring - -```go -type InProgressTask struct { - Task *Task - WorkerID string - StartedAt time.Time - LastUpdate time.Time - Progress float64 - EstimatedEnd time.Time - VolumeReserved bool // Reserved for capacity planning -} - -type TaskMonitor struct { - inProgressTasks map[string]*InProgressTask - timeoutChecker *time.Ticker - stuckDetector *time.Ticker - duplicateChecker *time.Ticker -} -``` - -## 4. Volume Capacity Reconciliation - -### 4.1 Volume State Tracking - -```go -type VolumeStateManager struct { - masterClient MasterClient - inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task - committedChanges map[uint32]*VolumeChange // Changes not yet in master - reconcileInterval time.Duration -} - -type VolumeChange struct { - VolumeID uint32 - ChangeType ChangeType // "ec_encoding", "vacuum_completed" - OldCapacity int64 - NewCapacity int64 - TaskID string - CompletedAt time.Time - ReportedToMaster bool -} -``` - -### 4.2 Shard Assignment Integration - -When the master needs to assign shards, it must consider: -1. **Current volume state** from its own records -2. **In-progress capacity changes** from admin server -3. **Committed but unreported changes** from admin server - -```go -type CapacityOracle struct { - adminServer AdminServerClient - masterState *MasterVolumeState - updateFreq time.Duration -} - -func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 { - baseCapacity := o.masterState.GetCapacity(volumeID) - - // Adjust for in-progress tasks - if task := o.adminServer.GetInProgressTask(volumeID); task != nil { - switch task.Type { - case TaskTypeErasureCoding: - // EC reduces effective capacity - return baseCapacity / 2 // Simplified - case TaskTypeVacuum: - // Vacuum may increase available space - return baseCapacity + int64(float64(baseCapacity) * 0.3) - } - } - - // Adjust for completed but unreported changes - if change := o.adminServer.GetPendingChange(volumeID); change != nil { - return change.NewCapacity - } - - return baseCapacity -} -``` - -## 5. Error Handling & Recovery - -### 5.1 Worker Failure Scenarios - -```go -type FailureHandler struct { - taskRescheduler *TaskRescheduler - workerMonitor *WorkerMonitor - alertManager *AlertManager -} - -// Failure Scenarios: -// 1. Worker becomes unresponsive (heartbeat timeout) -// 2. Task execution fails (reported by worker) -// 3. Task gets stuck (progress timeout) -// 4. Duplicate task detection -// 5. Resource exhaustion -``` - -### 5.2 Recovery Strategies - -**Worker Timeout Recovery**: -- Mark worker as inactive after 3 missed heartbeats -- Reschedule all assigned tasks to other workers -- Cleanup any partial state - -**Task Stuck Recovery**: -- Detect tasks with no progress for > 2x estimated time -- Cancel stuck task and mark volume for cleanup -- Reschedule if retry count < max_retries - -**Duplicate Task Prevention**: -```go -type DuplicateDetector struct { - activeFingerprints map[string]bool // VolumeID+TaskType - recentCompleted *LRUCache // Recently completed tasks -} - -func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool { - fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type) - return d.activeFingerprints[fingerprint] || - d.recentCompleted.Contains(fingerprint) -} -``` - -## 6. Simulation & Testing Framework - -### 6.1 Failure Simulation - -```go -type TaskSimulator struct { - scenarios map[string]SimulationScenario -} - -type SimulationScenario struct { - Name string - WorkerCount int - VolumeCount int - FailurePatterns []FailurePattern - Duration time.Duration -} - -type FailurePattern struct { - Type FailureType // "worker_timeout", "task_stuck", "duplicate" - Probability float64 // 0.0 to 1.0 - Timing TimingSpec // When during task execution - Duration time.Duration -} -``` - -### 6.2 Test Scenarios - -**Scenario 1: Worker Timeout During EC** -- Start EC task on 30GB volume -- Kill worker at 50% progress -- Verify task reassignment -- Verify no duplicate EC operations - -**Scenario 2: Stuck Vacuum Task** -- Start vacuum on high-garbage volume -- Simulate worker hanging at 75% progress -- Verify timeout detection and cleanup -- Verify volume state consistency - -**Scenario 3: Duplicate Task Prevention** -- Submit same EC task from multiple sources -- Verify only one task executes -- Verify proper conflict resolution - -**Scenario 4: Master-Admin State Divergence** -- Create in-progress EC task -- Simulate master restart -- Verify state reconciliation -- Verify shard assignment accounts for in-progress work - -## 7. Performance & Scalability - -### 7.1 Metrics & Monitoring - -```go -type SystemMetrics struct { - TasksPerSecond float64 - WorkerUtilization float64 - AverageTaskTime time.Duration - FailureRate float64 - QueueDepth int - VolumeStatesSync bool -} -``` - -### 7.2 Scalability Considerations - -- **Horizontal Worker Scaling**: Add workers without admin server changes -- **Admin Server HA**: Master-slave admin servers for fault tolerance -- **Task Partitioning**: Partition tasks by collection or datacenter -- **Batch Operations**: Group similar tasks for efficiency - -## 8. Implementation Plan - -### Phase 1: Core Infrastructure -1. Admin server basic framework -2. Worker registration and heartbeat -3. Simple task assignment -4. Basic progress tracking - -### Phase 2: Advanced Features -1. Volume state reconciliation -2. Sophisticated worker selection -3. Failure detection and recovery -4. Duplicate prevention - -### Phase 3: Optimization & Monitoring -1. Performance metrics -2. Load balancing algorithms -3. Capacity planning integration -4. Comprehensive monitoring - -This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns. \ No newline at end of file diff --git a/SQL_FEATURE_PLAN.md b/SQL_FEATURE_PLAN.md deleted file mode 100644 index 28a6d2c24..000000000 --- a/SQL_FEATURE_PLAN.md +++ /dev/null @@ -1,145 +0,0 @@ -# SQL Query Engine Feature, Dev, and Test Plan - -This document outlines the plan for adding SQL querying support to SeaweedFS, focusing on reading and analyzing data from Message Queue (MQ) topics. - -## Feature Plan - -**1. Goal** - -To provide a SQL querying interface for SeaweedFS, enabling analytics on existing MQ topics. This enables: -- Basic querying with SELECT, WHERE, aggregations on MQ topics -- Schema discovery and metadata operations (SHOW DATABASES, SHOW TABLES, DESCRIBE) -- In-place analytics on Parquet-stored messages without data movement - -**2. Key Features** - -* **Schema Discovery and Metadata:** - * `SHOW DATABASES` - List all MQ namespaces - * `SHOW TABLES` - List all topics in a namespace - * `DESCRIBE table_name` - Show topic schema details - * Automatic schema detection from existing Parquet data -* **Basic Query Engine:** - * `SELECT` support with `WHERE`, `LIMIT`, `OFFSET` - * Aggregation functions: `COUNT()`, `SUM()`, `AVG()`, `MIN()`, `MAX()` - * Temporal queries with timestamp-based filtering -* **User Interfaces:** - * New CLI command `weed sql` with interactive shell mode - * Optional: Web UI for query execution and result visualization -* **Output Formats:** - * JSON (default), CSV, Parquet for result sets - * Streaming results for large queries - * Pagination support for result navigation - -## Development Plan - - - -**3. Data Source Integration** - -* **MQ Topic Connector (Primary):** - * Build on existing `weed/mq/logstore/read_parquet_to_log.go` - * Implement efficient Parquet scanning with predicate pushdown - * Support schema evolution and backward compatibility - * Handle partition-based parallelism for scalable queries -* **Schema Registry Integration:** - * Extend `weed/mq/schema/schema.go` for SQL metadata operations - * Read existing topic schemas for query planning - * Handle schema evolution during query execution - -**4. API & CLI Integration** - -* **CLI Command:** - * New `weed sql` command with interactive shell mode (similar to `weed shell`) - * Support for script execution and result formatting - * Connection management for remote SeaweedFS clusters -* **gRPC API:** - * Add SQL service to existing MQ broker gRPC interface - * Enable efficient query execution with streaming results - -## Example Usage Scenarios - -**Scenario 1: Schema Discovery and Metadata** -```sql --- List all namespaces (databases) -SHOW DATABASES; - --- List topics in a namespace -USE my_namespace; -SHOW TABLES; - --- View topic structure and discovered schema -DESCRIBE user_events; -``` - -**Scenario 2: Data Querying** -```sql --- Basic filtering and projection -SELECT user_id, event_type, timestamp -FROM user_events -WHERE timestamp > 1640995200000 -LIMIT 100; - --- Aggregation queries -SELECT COUNT(*) as event_count -FROM user_events -WHERE timestamp >= 1640995200000; - --- More aggregation examples -SELECT MAX(timestamp), MIN(timestamp) -FROM user_events; -``` - -**Scenario 3: Analytics & Monitoring** -```sql --- Basic analytics -SELECT COUNT(*) as total_events -FROM user_events -WHERE timestamp >= 1640995200000; - --- Simple monitoring -SELECT AVG(response_time) as avg_response -FROM api_logs -WHERE timestamp >= 1640995200000; - -## Architecture Overview - -``` -SQL Query Flow: - 1. Parse SQL 2. Plan & Optimize 3. Execute Query -┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐ -│ Client │ │ SQL Parser │ │ Query Planner │ │ Execution │ -│ (CLI) │──→ │ PostgreSQL │──→ │ & Optimizer │──→ │ Engine │ -│ │ │ (Custom) │ │ │ │ │ -└─────────────┘ └──────────────┘ └─────────────────┘ └──────────────┘ - │ │ - │ Schema Lookup │ Data Access - ▼ ▼ - ┌─────────────────────────────────────────────────────────────┐ - │ Schema Catalog │ - │ • Namespace → Database mapping │ - │ • Topic → Table mapping │ - │ • Schema version management │ - └─────────────────────────────────────────────────────────────┘ - ▲ - │ Metadata - │ -┌─────────────────────────────────────────────────────────────────────────────┐ -│ MQ Storage Layer │ -│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ▲ │ -│ │ Topic A │ │ Topic B │ │ Topic C │ │ ... │ │ │ -│ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ │ -│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ -└──────────────────────────────────────────────────────────────────────────│──┘ - │ - Data Access -``` - - -## Success Metrics - -* **Feature Completeness:** Support for all specified SELECT operations and metadata commands -* **Performance:** - * **Simple SELECT queries**: < 100ms latency for single-table queries with up to 3 WHERE predicates on ≤ 100K records - * **Complex queries**: < 1s latency for queries involving aggregations (COUNT, SUM, MAX, MIN) on ≤ 1M records - * **Time-range queries**: < 500ms for timestamp-based filtering on ≤ 500K records within 24-hour windows -* **Scalability:** Handle topics with millions of messages efficiently