You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
12 KiB
12 KiB
SeaweedFS Task Distribution System Design
Overview
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
System Architecture
High-Level Components
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Master │◄──►│ Admin Server │◄──►│ Workers │
│ │ │ │ │ │
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
│ - Shard Status │ │ - Task Assign │ │ - Progress │
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
│ │ │ │ │ │
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
│ - Report Status │ │ - State Sync │ │ - Status Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
1. Admin Server Design
1.1 Core Responsibilities
- Task Discovery: Scan volumes to identify EC and vacuum candidates
- Worker Management: Track available workers and their capabilities
- Task Assignment: Match tasks to optimal workers
- Progress Tracking: Monitor in-progress tasks for capacity planning
- State Reconciliation: Sync with master server for volume state updates
1.2 Task Discovery Engine
type TaskDiscoveryEngine struct {
masterClient MasterClient
volumeScanner VolumeScanner
taskDetectors map[TaskType]TaskDetector
scanInterval time.Duration
}
type VolumeCandidate struct {
VolumeID uint32
Server string
Collection string
TaskType TaskType
Priority TaskPriority
Reason string
DetectedAt time.Time
Parameters map[string]interface{}
}
EC Detection Logic:
- Find volumes >= 95% full and idle for > 1 hour
- Exclude volumes already in EC format
- Exclude volumes with ongoing operations
- Prioritize by collection and age
Vacuum Detection Logic:
- Find volumes with garbage ratio > 30%
- Exclude read-only volumes
- Exclude volumes with recent vacuum operations
- Prioritize by garbage percentage
1.3 Worker Registry & Management
type WorkerRegistry struct {
workers map[string]*Worker
capabilities map[TaskType][]*Worker
lastHeartbeat map[string]time.Time
taskAssignment map[string]*Task
mutex sync.RWMutex
}
type Worker struct {
ID string
Address string
Capabilities []TaskType
MaxConcurrent int
CurrentLoad int
Status WorkerStatus
LastSeen time.Time
Performance WorkerMetrics
}
1.4 Task Assignment Algorithm
type TaskScheduler struct {
registry *WorkerRegistry
taskQueue *PriorityQueue
inProgressTasks map[string]*InProgressTask
volumeReservations map[uint32]*VolumeReservation
}
// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
2. Worker Process Design
2.1 Worker Architecture
type MaintenanceWorker struct {
id string
config *WorkerConfig
adminClient AdminClient
taskExecutors map[TaskType]TaskExecutor
currentTasks map[string]*RunningTask
registry *TaskRegistry
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
}
2.2 Task Execution Framework
type TaskExecutor interface {
Execute(ctx context.Context, task *Task) error
EstimateTime(task *Task) time.Duration
ValidateResources(task *Task) error
GetProgress() float64
Cancel() error
}
type ErasureCodingExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type VacuumExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
2.3 Worker Capabilities & Registration
type WorkerCapabilities struct {
SupportedTasks []TaskType
MaxConcurrent int
ResourceLimits ResourceLimits
PreferredServers []string // Affinity for specific volume servers
}
type ResourceLimits struct {
MaxMemoryMB int64
MaxDiskSpaceMB int64
MaxNetworkMbps int64
MaxCPUPercent float64
}
3. Task Lifecycle Management
3.1 Task States
type TaskState string
const (
TaskStatePending TaskState = "pending"
TaskStateAssigned TaskState = "assigned"
TaskStateInProgress TaskState = "in_progress"
TaskStateCompleted TaskState = "completed"
TaskStateFailed TaskState = "failed"
TaskStateCancelled TaskState = "cancelled"
TaskStateStuck TaskState = "stuck" // Taking too long
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
)
3.2 Progress Tracking & Monitoring
type InProgressTask struct {
Task *Task
WorkerID string
StartedAt time.Time
LastUpdate time.Time
Progress float64
EstimatedEnd time.Time
VolumeReserved bool // Reserved for capacity planning
}
type TaskMonitor struct {
inProgressTasks map[string]*InProgressTask
timeoutChecker *time.Ticker
stuckDetector *time.Ticker
duplicateChecker *time.Ticker
}
4. Volume Capacity Reconciliation
4.1 Volume State Tracking
type VolumeStateManager struct {
masterClient MasterClient
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
committedChanges map[uint32]*VolumeChange // Changes not yet in master
reconcileInterval time.Duration
}
type VolumeChange struct {
VolumeID uint32
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
OldCapacity int64
NewCapacity int64
TaskID string
CompletedAt time.Time
ReportedToMaster bool
}
4.2 Shard Assignment Integration
When the master needs to assign shards, it must consider:
- Current volume state from its own records
- In-progress capacity changes from admin server
- Committed but unreported changes from admin server
type CapacityOracle struct {
adminServer AdminServerClient
masterState *MasterVolumeState
updateFreq time.Duration
}
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
baseCapacity := o.masterState.GetCapacity(volumeID)
// Adjust for in-progress tasks
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
switch task.Type {
case TaskTypeErasureCoding:
// EC reduces effective capacity
return baseCapacity / 2 // Simplified
case TaskTypeVacuum:
// Vacuum may increase available space
return baseCapacity + int64(float64(baseCapacity) * 0.3)
}
}
// Adjust for completed but unreported changes
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
return change.NewCapacity
}
return baseCapacity
}
5. Error Handling & Recovery
5.1 Worker Failure Scenarios
type FailureHandler struct {
taskRescheduler *TaskRescheduler
workerMonitor *WorkerMonitor
alertManager *AlertManager
}
// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
5.2 Recovery Strategies
Worker Timeout Recovery:
- Mark worker as inactive after 3 missed heartbeats
- Reschedule all assigned tasks to other workers
- Cleanup any partial state
Task Stuck Recovery:
- Detect tasks with no progress for > 2x estimated time
- Cancel stuck task and mark volume for cleanup
- Reschedule if retry count < max_retries
Duplicate Task Prevention:
type DuplicateDetector struct {
activeFingerprints map[string]bool // VolumeID+TaskType
recentCompleted *LRUCache // Recently completed tasks
}
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
return d.activeFingerprints[fingerprint] ||
d.recentCompleted.Contains(fingerprint)
}
6. Simulation & Testing Framework
6.1 Failure Simulation
type TaskSimulator struct {
scenarios map[string]SimulationScenario
}
type SimulationScenario struct {
Name string
WorkerCount int
VolumeCount int
FailurePatterns []FailurePattern
Duration time.Duration
}
type FailurePattern struct {
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
Probability float64 // 0.0 to 1.0
Timing TimingSpec // When during task execution
Duration time.Duration
}
6.2 Test Scenarios
Scenario 1: Worker Timeout During EC
- Start EC task on 30GB volume
- Kill worker at 50% progress
- Verify task reassignment
- Verify no duplicate EC operations
Scenario 2: Stuck Vacuum Task
- Start vacuum on high-garbage volume
- Simulate worker hanging at 75% progress
- Verify timeout detection and cleanup
- Verify volume state consistency
Scenario 3: Duplicate Task Prevention
- Submit same EC task from multiple sources
- Verify only one task executes
- Verify proper conflict resolution
Scenario 4: Master-Admin State Divergence
- Create in-progress EC task
- Simulate master restart
- Verify state reconciliation
- Verify shard assignment accounts for in-progress work
7. Performance & Scalability
7.1 Metrics & Monitoring
type SystemMetrics struct {
TasksPerSecond float64
WorkerUtilization float64
AverageTaskTime time.Duration
FailureRate float64
QueueDepth int
VolumeStatesSync bool
}
7.2 Scalability Considerations
- Horizontal Worker Scaling: Add workers without admin server changes
- Admin Server HA: Master-slave admin servers for fault tolerance
- Task Partitioning: Partition tasks by collection or datacenter
- Batch Operations: Group similar tasks for efficiency
8. Implementation Plan
Phase 1: Core Infrastructure
- Admin server basic framework
- Worker registration and heartbeat
- Simple task assignment
- Basic progress tracking
Phase 2: Advanced Features
- Volume state reconciliation
- Sophisticated worker selection
- Failure detection and recovery
- Duplicate prevention
Phase 3: Optimization & Monitoring
- Performance metrics
- Load balancing algorithms
- Capacity planning integration
- Comprehensive monitoring
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.