Browse Source

cleanup

pull/8017/head
Chris Lu 2 days ago
parent
commit
afbe52f262
  1. 242
      BUCKET_POLICY_ENGINE_INTEGRATION.md
  2. 413
      DESIGN.md
  3. 145
      SQL_FEATURE_PLAN.md

242
BUCKET_POLICY_ENGINE_INTEGRATION.md

@ -1,242 +0,0 @@
# Bucket Policy Engine Integration - Complete
## Summary
Successfully integrated the `policy_engine` package to evaluate bucket policies for **all requests** (both anonymous and authenticated). This provides comprehensive AWS S3-compatible bucket policy support.
## What Changed
### 1. **New File: `s3api_bucket_policy_engine.go`**
Created a wrapper around `policy_engine.PolicyEngine` to:
- Load bucket policies from filer entries
- Sync policies from the bucket config cache
- Evaluate policies for any request (bucket, object, action, principal)
- Return structured results (allowed, evaluated, error)
### 2. **Modified: `s3api_server.go`**
- Added `policyEngine *BucketPolicyEngine` field to `S3ApiServer` struct
- Initialized the policy engine in `NewS3ApiServerWithStore()`
- Linked `IdentityAccessManagement` back to `S3ApiServer` for policy evaluation
### 3. **Modified: `auth_credentials.go`**
- Added `s3ApiServer *S3ApiServer` field to `IdentityAccessManagement` struct
- Added `buildPrincipalARN()` helper to convert identities to AWS ARN format
- **Integrated bucket policy evaluation into the authentication flow:**
- Policies are now checked **before** IAM/identity-based permissions
- Explicit `Deny` in bucket policy blocks access immediately
- Explicit `Allow` in bucket policy grants access and **bypasses IAM checks** (enables cross-account access)
- If no policy exists, falls through to normal IAM checks
- Policy evaluation errors result in access denial (fail-close security)
### 4. **Modified: `s3api_bucket_config.go`**
- Added policy engine sync when bucket configs are loaded
- Ensures policies are loaded into the engine for evaluation
### 5. **Modified: `auth_credentials_subscribe.go`**
- Added policy engine sync when bucket metadata changes
- Keeps the policy engine up-to-date via event-driven updates
## How It Works
### Anonymous Requests
```
1. Request comes in (no credentials)
2. Check ACL-based public access → if public, allow
3. Check bucket policy for anonymous ("*") access → if allowed, allow
4. Otherwise, deny
```
### Authenticated Requests (NEW!)
```
1. Request comes in (with credentials)
2. Authenticate user → get Identity
3. Build principal ARN (e.g., "arn:aws:iam::123456:user/bob")
4. Check bucket policy:
- If DENY → reject immediately
- If ALLOW → grant access immediately (bypasses IAM checks)
- If no policy or no matching statements → continue to step 5
5. Check IAM/identity-based permissions (only if not already allowed by bucket policy)
6. Allow or deny based on identity permissions
```
## Policy Evaluation Flow
```
┌─────────────────────────────────────────────────────────┐
│ Request (GET /bucket/file) │
└───────────────────────────┬─────────────────────────────┘
┌───────────▼──────────┐
│ Authenticate User │
│ (or Anonymous) │
└───────────┬──────────┘
┌───────────▼──────────────────────────────┐
│ Build Principal ARN │
│ - Anonymous: "*" │
│ - User: "arn:aws:iam::123456:user/bob" │
└───────────┬──────────────────────────────┘
┌───────────▼──────────────────────────────┐
│ Evaluate Bucket Policy (PolicyEngine) │
│ - Action: "s3:GetObject" │
│ - Resource: "arn:aws:s3:::bucket/file" │
│ - Principal: (from above) │
└───────────┬──────────────────────────────┘
┌─────────────┼─────────────┐
│ │ │
DENY │ ALLOW │ NO POLICY
│ │ │
▼ ▼ ▼
Reject Request Grant Access Continue
┌───────────────────┘
┌────────────▼─────────────┐
│ IAM/Identity Check │
│ (identity.canDo) │
└────────────┬─────────────┘
┌─────────┴─────────┐
│ │
ALLOW │ DENY │
▼ ▼
Grant Access Reject Request
```
## Example Policies That Now Work
### 1. **Public Read Access** (Anonymous)
```json
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::mybucket/*"
}]
}
```
- Anonymous users can read all objects
- Authenticated users are also evaluated against this policy. If they don't match an explicit `Allow` for this action, they will fall back to their own IAM permissions
### 2. **Grant Access to Specific User** (Authenticated)
```json
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789012:user/bob"},
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::mybucket/shared/*"
}]
}
```
- User "bob" can read/write objects in `/shared/` prefix
- Other users cannot (unless granted by their IAM policies)
### 3. **Deny Access to Specific Path** (Both)
```json
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": "arn:aws:s3:::mybucket/confidential/*"
}]
}
```
- **No one** can access `/confidential/` objects
- Denies override all other allows (AWS policy evaluation rules)
## Performance Characteristics
### Policy Loading
- **Cold start**: Policy loaded from filer → parsed → compiled → cached
- **Warm path**: Policy retrieved from `BucketConfigCache` (already parsed)
- **Updates**: Event-driven sync via metadata subscription (real-time)
### Policy Evaluation
- **Compiled policies**: Pre-compiled regex patterns and matchers
- **Pattern cache**: Regex patterns cached with LRU eviction (max 1000)
- **Fast path**: Common patterns (`*`, exact matches) optimized
- **Case sensitivity**: Actions case-insensitive, resources case-sensitive (AWS-compatible)
### Overhead
- **Anonymous requests**: Minimal (policy already checked, now using compiled engine)
- **Authenticated requests**: ~1-2ms added for policy evaluation (compiled patterns)
- **No policy**: Near-zero overhead (quick indeterminate check)
## Testing
All tests pass:
```bash
✅ TestBucketPolicyValidationBasics
✅ TestPrincipalMatchesAnonymous
✅ TestActionToS3Action
✅ TestResourceMatching
✅ TestMatchesPatternRegexEscaping (security tests)
✅ TestActionMatchingCaseInsensitive
✅ TestResourceMatchingCaseSensitive
✅ All policy_engine package tests (30+ tests)
```
## Security Improvements
1. **Regex Metacharacter Escaping**: Patterns like `*.json` properly match only files ending in `.json` (not `filexjson`)
2. **Case-Insensitive Actions**: S3 actions matched case-insensitively per AWS spec
3. **Case-Sensitive Resources**: Resource paths matched case-sensitively for security
4. **Pattern Cache Size Limit**: Prevents DoS attacks via unbounded cache growth
5. **Principal Validation**: Supports `[]string` for manually constructed policies
## AWS Compatibility
The implementation follows AWS S3 bucket policy evaluation rules:
1. **Explicit Deny** always wins (checked first)
2. **Explicit Allow** grants access (checked second)
3. **Default Deny** if no matching statements (implicit)
4. Bucket policies work alongside IAM policies (both are evaluated)
## Files Changed
```
Modified:
weed/s3api/auth_credentials.go (+47 lines)
weed/s3api/auth_credentials_subscribe.go (+8 lines)
weed/s3api/s3api_bucket_config.go (+8 lines)
weed/s3api/s3api_server.go (+5 lines)
New:
weed/s3api/s3api_bucket_policy_engine.go (115 lines)
```
## Migration Notes
- **Backward Compatible**: Existing setups without bucket policies work unchanged
- **No Breaking Changes**: All existing ACL and IAM-based authorization still works
- **Additive Feature**: Bucket policies are an additional layer of authorization
- **Performance**: Minimal impact on existing workloads
## Future Enhancements
Potential improvements (not implemented yet):
- [ ] Condition support (IP address, time-based, etc.) - already in policy_engine
- [ ] Cross-account policies (different AWS accounts)
- [ ] Policy validation API endpoint
- [ ] Policy simulation/testing tool
- [ ] Metrics for policy evaluations (allow/deny counts)
## Conclusion
Bucket policies now work for **all requests** in SeaweedFS S3 API:
- ✅ Anonymous requests (public access)
- ✅ Authenticated requests (user-specific policies)
- ✅ High performance (compiled policies, caching)
- ✅ AWS-compatible (follows AWS evaluation rules)
- ✅ Secure (proper escaping, case sensitivity)
The integration is complete, tested, and ready for use!

413
DESIGN.md

@ -1,413 +0,0 @@
# SeaweedFS Task Distribution System Design
## Overview
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
## System Architecture
### High-Level Components
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Master │◄──►│ Admin Server │◄──►│ Workers │
│ │ │ │ │ │
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
│ - Shard Status │ │ - Task Assign │ │ - Progress │
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
│ │ │ │ │ │
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
│ - Report Status │ │ - State Sync │ │ - Status Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## 1. Admin Server Design
### 1.1 Core Responsibilities
- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
- **Worker Management**: Track available workers and their capabilities
- **Task Assignment**: Match tasks to optimal workers
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
- **State Reconciliation**: Sync with master server for volume state updates
### 1.2 Task Discovery Engine
```go
type TaskDiscoveryEngine struct {
masterClient MasterClient
volumeScanner VolumeScanner
taskDetectors map[TaskType]TaskDetector
scanInterval time.Duration
}
type VolumeCandidate struct {
VolumeID uint32
Server string
Collection string
TaskType TaskType
Priority TaskPriority
Reason string
DetectedAt time.Time
Parameters map[string]interface{}
}
```
**EC Detection Logic**:
- Find volumes >= 95% full and idle for > 1 hour
- Exclude volumes already in EC format
- Exclude volumes with ongoing operations
- Prioritize by collection and age
**Vacuum Detection Logic**:
- Find volumes with garbage ratio > 30%
- Exclude read-only volumes
- Exclude volumes with recent vacuum operations
- Prioritize by garbage percentage
### 1.3 Worker Registry & Management
```go
type WorkerRegistry struct {
workers map[string]*Worker
capabilities map[TaskType][]*Worker
lastHeartbeat map[string]time.Time
taskAssignment map[string]*Task
mutex sync.RWMutex
}
type Worker struct {
ID string
Address string
Capabilities []TaskType
MaxConcurrent int
CurrentLoad int
Status WorkerStatus
LastSeen time.Time
Performance WorkerMetrics
}
```
### 1.4 Task Assignment Algorithm
```go
type TaskScheduler struct {
registry *WorkerRegistry
taskQueue *PriorityQueue
inProgressTasks map[string]*InProgressTask
volumeReservations map[uint32]*VolumeReservation
}
// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
```
## 2. Worker Process Design
### 2.1 Worker Architecture
```go
type MaintenanceWorker struct {
id string
config *WorkerConfig
adminClient AdminClient
taskExecutors map[TaskType]TaskExecutor
currentTasks map[string]*RunningTask
registry *TaskRegistry
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
}
```
### 2.2 Task Execution Framework
```go
type TaskExecutor interface {
Execute(ctx context.Context, task *Task) error
EstimateTime(task *Task) time.Duration
ValidateResources(task *Task) error
GetProgress() float64
Cancel() error
}
type ErasureCodingExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type VacuumExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
```
### 2.3 Worker Capabilities & Registration
```go
type WorkerCapabilities struct {
SupportedTasks []TaskType
MaxConcurrent int
ResourceLimits ResourceLimits
PreferredServers []string // Affinity for specific volume servers
}
type ResourceLimits struct {
MaxMemoryMB int64
MaxDiskSpaceMB int64
MaxNetworkMbps int64
MaxCPUPercent float64
}
```
## 3. Task Lifecycle Management
### 3.1 Task States
```go
type TaskState string
const (
TaskStatePending TaskState = "pending"
TaskStateAssigned TaskState = "assigned"
TaskStateInProgress TaskState = "in_progress"
TaskStateCompleted TaskState = "completed"
TaskStateFailed TaskState = "failed"
TaskStateCancelled TaskState = "cancelled"
TaskStateStuck TaskState = "stuck" // Taking too long
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
)
```
### 3.2 Progress Tracking & Monitoring
```go
type InProgressTask struct {
Task *Task
WorkerID string
StartedAt time.Time
LastUpdate time.Time
Progress float64
EstimatedEnd time.Time
VolumeReserved bool // Reserved for capacity planning
}
type TaskMonitor struct {
inProgressTasks map[string]*InProgressTask
timeoutChecker *time.Ticker
stuckDetector *time.Ticker
duplicateChecker *time.Ticker
}
```
## 4. Volume Capacity Reconciliation
### 4.1 Volume State Tracking
```go
type VolumeStateManager struct {
masterClient MasterClient
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
committedChanges map[uint32]*VolumeChange // Changes not yet in master
reconcileInterval time.Duration
}
type VolumeChange struct {
VolumeID uint32
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
OldCapacity int64
NewCapacity int64
TaskID string
CompletedAt time.Time
ReportedToMaster bool
}
```
### 4.2 Shard Assignment Integration
When the master needs to assign shards, it must consider:
1. **Current volume state** from its own records
2. **In-progress capacity changes** from admin server
3. **Committed but unreported changes** from admin server
```go
type CapacityOracle struct {
adminServer AdminServerClient
masterState *MasterVolumeState
updateFreq time.Duration
}
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
baseCapacity := o.masterState.GetCapacity(volumeID)
// Adjust for in-progress tasks
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
switch task.Type {
case TaskTypeErasureCoding:
// EC reduces effective capacity
return baseCapacity / 2 // Simplified
case TaskTypeVacuum:
// Vacuum may increase available space
return baseCapacity + int64(float64(baseCapacity) * 0.3)
}
}
// Adjust for completed but unreported changes
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
return change.NewCapacity
}
return baseCapacity
}
```
## 5. Error Handling & Recovery
### 5.1 Worker Failure Scenarios
```go
type FailureHandler struct {
taskRescheduler *TaskRescheduler
workerMonitor *WorkerMonitor
alertManager *AlertManager
}
// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
```
### 5.2 Recovery Strategies
**Worker Timeout Recovery**:
- Mark worker as inactive after 3 missed heartbeats
- Reschedule all assigned tasks to other workers
- Cleanup any partial state
**Task Stuck Recovery**:
- Detect tasks with no progress for > 2x estimated time
- Cancel stuck task and mark volume for cleanup
- Reschedule if retry count < max_retries
**Duplicate Task Prevention**:
```go
type DuplicateDetector struct {
activeFingerprints map[string]bool // VolumeID+TaskType
recentCompleted *LRUCache // Recently completed tasks
}
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
return d.activeFingerprints[fingerprint] ||
d.recentCompleted.Contains(fingerprint)
}
```
## 6. Simulation & Testing Framework
### 6.1 Failure Simulation
```go
type TaskSimulator struct {
scenarios map[string]SimulationScenario
}
type SimulationScenario struct {
Name string
WorkerCount int
VolumeCount int
FailurePatterns []FailurePattern
Duration time.Duration
}
type FailurePattern struct {
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
Probability float64 // 0.0 to 1.0
Timing TimingSpec // When during task execution
Duration time.Duration
}
```
### 6.2 Test Scenarios
**Scenario 1: Worker Timeout During EC**
- Start EC task on 30GB volume
- Kill worker at 50% progress
- Verify task reassignment
- Verify no duplicate EC operations
**Scenario 2: Stuck Vacuum Task**
- Start vacuum on high-garbage volume
- Simulate worker hanging at 75% progress
- Verify timeout detection and cleanup
- Verify volume state consistency
**Scenario 3: Duplicate Task Prevention**
- Submit same EC task from multiple sources
- Verify only one task executes
- Verify proper conflict resolution
**Scenario 4: Master-Admin State Divergence**
- Create in-progress EC task
- Simulate master restart
- Verify state reconciliation
- Verify shard assignment accounts for in-progress work
## 7. Performance & Scalability
### 7.1 Metrics & Monitoring
```go
type SystemMetrics struct {
TasksPerSecond float64
WorkerUtilization float64
AverageTaskTime time.Duration
FailureRate float64
QueueDepth int
VolumeStatesSync bool
}
```
### 7.2 Scalability Considerations
- **Horizontal Worker Scaling**: Add workers without admin server changes
- **Admin Server HA**: Master-slave admin servers for fault tolerance
- **Task Partitioning**: Partition tasks by collection or datacenter
- **Batch Operations**: Group similar tasks for efficiency
## 8. Implementation Plan
### Phase 1: Core Infrastructure
1. Admin server basic framework
2. Worker registration and heartbeat
3. Simple task assignment
4. Basic progress tracking
### Phase 2: Advanced Features
1. Volume state reconciliation
2. Sophisticated worker selection
3. Failure detection and recovery
4. Duplicate prevention
### Phase 3: Optimization & Monitoring
1. Performance metrics
2. Load balancing algorithms
3. Capacity planning integration
4. Comprehensive monitoring
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.

145
SQL_FEATURE_PLAN.md

@ -1,145 +0,0 @@
# SQL Query Engine Feature, Dev, and Test Plan
This document outlines the plan for adding SQL querying support to SeaweedFS, focusing on reading and analyzing data from Message Queue (MQ) topics.
## Feature Plan
**1. Goal**
To provide a SQL querying interface for SeaweedFS, enabling analytics on existing MQ topics. This enables:
- Basic querying with SELECT, WHERE, aggregations on MQ topics
- Schema discovery and metadata operations (SHOW DATABASES, SHOW TABLES, DESCRIBE)
- In-place analytics on Parquet-stored messages without data movement
**2. Key Features**
* **Schema Discovery and Metadata:**
* `SHOW DATABASES` - List all MQ namespaces
* `SHOW TABLES` - List all topics in a namespace
* `DESCRIBE table_name` - Show topic schema details
* Automatic schema detection from existing Parquet data
* **Basic Query Engine:**
* `SELECT` support with `WHERE`, `LIMIT`, `OFFSET`
* Aggregation functions: `COUNT()`, `SUM()`, `AVG()`, `MIN()`, `MAX()`
* Temporal queries with timestamp-based filtering
* **User Interfaces:**
* New CLI command `weed sql` with interactive shell mode
* Optional: Web UI for query execution and result visualization
* **Output Formats:**
* JSON (default), CSV, Parquet for result sets
* Streaming results for large queries
* Pagination support for result navigation
## Development Plan
**3. Data Source Integration**
* **MQ Topic Connector (Primary):**
* Build on existing `weed/mq/logstore/read_parquet_to_log.go`
* Implement efficient Parquet scanning with predicate pushdown
* Support schema evolution and backward compatibility
* Handle partition-based parallelism for scalable queries
* **Schema Registry Integration:**
* Extend `weed/mq/schema/schema.go` for SQL metadata operations
* Read existing topic schemas for query planning
* Handle schema evolution during query execution
**4. API & CLI Integration**
* **CLI Command:**
* New `weed sql` command with interactive shell mode (similar to `weed shell`)
* Support for script execution and result formatting
* Connection management for remote SeaweedFS clusters
* **gRPC API:**
* Add SQL service to existing MQ broker gRPC interface
* Enable efficient query execution with streaming results
## Example Usage Scenarios
**Scenario 1: Schema Discovery and Metadata**
```sql
-- List all namespaces (databases)
SHOW DATABASES;
-- List topics in a namespace
USE my_namespace;
SHOW TABLES;
-- View topic structure and discovered schema
DESCRIBE user_events;
```
**Scenario 2: Data Querying**
```sql
-- Basic filtering and projection
SELECT user_id, event_type, timestamp
FROM user_events
WHERE timestamp > 1640995200000
LIMIT 100;
-- Aggregation queries
SELECT COUNT(*) as event_count
FROM user_events
WHERE timestamp >= 1640995200000;
-- More aggregation examples
SELECT MAX(timestamp), MIN(timestamp)
FROM user_events;
```
**Scenario 3: Analytics & Monitoring**
```sql
-- Basic analytics
SELECT COUNT(*) as total_events
FROM user_events
WHERE timestamp >= 1640995200000;
-- Simple monitoring
SELECT AVG(response_time) as avg_response
FROM api_logs
WHERE timestamp >= 1640995200000;
## Architecture Overview
```
SQL Query Flow:
1. Parse SQL 2. Plan & Optimize 3. Execute Query
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Client │ │ SQL Parser │ │ Query Planner │ │ Execution │
│ (CLI) │──→ │ PostgreSQL │──→ │ & Optimizer │──→ │ Engine │
│ │ │ (Custom) │ │ │ │ │
└─────────────┘ └──────────────┘ └─────────────────┘ └──────────────┘
│ │
│ Schema Lookup │ Data Access
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Schema Catalog │
│ • Namespace → Database mapping │
│ • Topic → Table mapping │
│ • Schema version management │
└─────────────────────────────────────────────────────────────┘
│ Metadata
┌─────────────────────────────────────────────────────────────────────────────┐
│ MQ Storage Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ▲ │
│ │ Topic A │ │ Topic B │ │ Topic C │ │ ... │ │ │
│ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ (Parquet) │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
└──────────────────────────────────────────────────────────────────────────│──┘
Data Access
```
## Success Metrics
* **Feature Completeness:** Support for all specified SELECT operations and metadata commands
* **Performance:**
* **Simple SELECT queries**: < 100ms latency for single-table queries with up to 3 WHERE predicates on 100K records
* **Complex queries**: < 1s latency for queries involving aggregations (COUNT, SUM, MAX, MIN) on 1M records
* **Time-range queries**: < 500ms for timestamp-based filtering on 500K records within 24-hour windows
* **Scalability:** Handle topics with millions of messages efficiently
Loading…
Cancel
Save