You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

12 KiB

SeaweedFS Task Distribution System Design

Overview

This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.

System Architecture

High-Level Components

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Master        │◄──►│  Admin Server    │◄──►│   Workers       │
│                 │    │                  │    │                 │
│ - Volume Info   │    │ - Task Discovery │    │ - Task Exec     │
│ - Shard Status  │    │ - Task Assign    │    │ - Progress      │
│ - Heartbeats    │    │ - Progress Track │    │ - Error Report  │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                        │
         │                        │                        │
         ▼                        ▼                        ▼
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Volume Servers  │    │ Volume Monitor   │    │ Task Execution  │
│                 │    │                  │    │                 │
│ - Store Volumes │    │ - Health Check   │    │ - EC Convert    │
│ - EC Shards     │    │ - Usage Stats    │    │ - Vacuum Clean  │
│ - Report Status │    │ - State Sync     │    │ - Status Report │
└─────────────────┘    └──────────────────┘    └─────────────────┘

1. Admin Server Design

1.1 Core Responsibilities

  • Task Discovery: Scan volumes to identify EC and vacuum candidates
  • Worker Management: Track available workers and their capabilities
  • Task Assignment: Match tasks to optimal workers
  • Progress Tracking: Monitor in-progress tasks for capacity planning
  • State Reconciliation: Sync with master server for volume state updates

1.2 Task Discovery Engine

type TaskDiscoveryEngine struct {
    masterClient   MasterClient
    volumeScanner  VolumeScanner
    taskDetectors  map[TaskType]TaskDetector
    scanInterval   time.Duration
}

type VolumeCandidate struct {
    VolumeID       uint32
    Server         string
    Collection     string
    TaskType       TaskType
    Priority       TaskPriority
    Reason         string
    DetectedAt     time.Time
    Parameters     map[string]interface{}
}

EC Detection Logic:

  • Find volumes >= 95% full and idle for > 1 hour
  • Exclude volumes already in EC format
  • Exclude volumes with ongoing operations
  • Prioritize by collection and age

Vacuum Detection Logic:

  • Find volumes with garbage ratio > 30%
  • Exclude read-only volumes
  • Exclude volumes with recent vacuum operations
  • Prioritize by garbage percentage

1.3 Worker Registry & Management

type WorkerRegistry struct {
    workers        map[string]*Worker
    capabilities   map[TaskType][]*Worker
    lastHeartbeat  map[string]time.Time
    taskAssignment map[string]*Task
    mutex          sync.RWMutex
}

type Worker struct {
    ID            string
    Address       string
    Capabilities  []TaskType
    MaxConcurrent int
    CurrentLoad   int
    Status        WorkerStatus
    LastSeen      time.Time
    Performance   WorkerMetrics
}

1.4 Task Assignment Algorithm

type TaskScheduler struct {
    registry       *WorkerRegistry
    taskQueue      *PriorityQueue
    inProgressTasks map[string]*InProgressTask
    volumeReservations map[uint32]*VolumeReservation
}

// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)

2. Worker Process Design

2.1 Worker Architecture

type MaintenanceWorker struct {
    id              string
    config          *WorkerConfig
    adminClient     AdminClient
    taskExecutors   map[TaskType]TaskExecutor
    currentTasks    map[string]*RunningTask
    registry        *TaskRegistry
    heartbeatTicker *time.Ticker
    requestTicker   *time.Ticker
}

2.2 Task Execution Framework

type TaskExecutor interface {
    Execute(ctx context.Context, task *Task) error
    EstimateTime(task *Task) time.Duration
    ValidateResources(task *Task) error
    GetProgress() float64
    Cancel() error
}

type ErasureCodingExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}

type VacuumExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}

2.3 Worker Capabilities & Registration

type WorkerCapabilities struct {
    SupportedTasks   []TaskType
    MaxConcurrent    int
    ResourceLimits   ResourceLimits
    PreferredServers []string  // Affinity for specific volume servers
}

type ResourceLimits struct {
    MaxMemoryMB      int64
    MaxDiskSpaceMB   int64
    MaxNetworkMbps   int64
    MaxCPUPercent    float64
}

3. Task Lifecycle Management

3.1 Task States

type TaskState string

const (
    TaskStatePending     TaskState = "pending"
    TaskStateAssigned    TaskState = "assigned"
    TaskStateInProgress  TaskState = "in_progress"
    TaskStateCompleted   TaskState = "completed"
    TaskStateFailed      TaskState = "failed"
    TaskStateCancelled   TaskState = "cancelled"
    TaskStateStuck       TaskState = "stuck"       // Taking too long
    TaskStateDuplicate   TaskState = "duplicate"   // Detected duplicate
)

3.2 Progress Tracking & Monitoring

type InProgressTask struct {
    Task           *Task
    WorkerID       string
    StartedAt      time.Time
    LastUpdate     time.Time
    Progress       float64
    EstimatedEnd   time.Time
    VolumeReserved bool  // Reserved for capacity planning
}

type TaskMonitor struct {
    inProgressTasks map[string]*InProgressTask
    timeoutChecker  *time.Ticker
    stuckDetector   *time.Ticker
    duplicateChecker *time.Ticker
}

4. Volume Capacity Reconciliation

4.1 Volume State Tracking

type VolumeStateManager struct {
    masterClient      MasterClient
    inProgressTasks   map[uint32]*InProgressTask  // VolumeID -> Task
    committedChanges  map[uint32]*VolumeChange    // Changes not yet in master
    reconcileInterval time.Duration
}

type VolumeChange struct {
    VolumeID     uint32
    ChangeType   ChangeType  // "ec_encoding", "vacuum_completed"
    OldCapacity  int64
    NewCapacity  int64
    TaskID       string
    CompletedAt  time.Time
    ReportedToMaster bool
}

4.2 Shard Assignment Integration

When the master needs to assign shards, it must consider:

  1. Current volume state from its own records
  2. In-progress capacity changes from admin server
  3. Committed but unreported changes from admin server
type CapacityOracle struct {
    adminServer   AdminServerClient
    masterState   *MasterVolumeState
    updateFreq    time.Duration
}

func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
    baseCapacity := o.masterState.GetCapacity(volumeID)
    
    // Adjust for in-progress tasks
    if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
        switch task.Type {
        case TaskTypeErasureCoding:
            // EC reduces effective capacity
            return baseCapacity / 2  // Simplified
        case TaskTypeVacuum:
            // Vacuum may increase available space
            return baseCapacity + int64(float64(baseCapacity) * 0.3)
        }
    }
    
    // Adjust for completed but unreported changes
    if change := o.adminServer.GetPendingChange(volumeID); change != nil {
        return change.NewCapacity
    }
    
    return baseCapacity
}

5. Error Handling & Recovery

5.1 Worker Failure Scenarios

type FailureHandler struct {
    taskRescheduler *TaskRescheduler
    workerMonitor   *WorkerMonitor
    alertManager    *AlertManager
}

// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion

5.2 Recovery Strategies

Worker Timeout Recovery:

  • Mark worker as inactive after 3 missed heartbeats
  • Reschedule all assigned tasks to other workers
  • Cleanup any partial state

Task Stuck Recovery:

  • Detect tasks with no progress for > 2x estimated time
  • Cancel stuck task and mark volume for cleanup
  • Reschedule if retry count < max_retries

Duplicate Task Prevention:

type DuplicateDetector struct {
    activeFingerprints map[string]bool  // VolumeID+TaskType
    recentCompleted    *LRUCache        // Recently completed tasks
}

func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
    fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
    return d.activeFingerprints[fingerprint] || 
           d.recentCompleted.Contains(fingerprint)
}

6. Simulation & Testing Framework

6.1 Failure Simulation

type TaskSimulator struct {
    scenarios map[string]SimulationScenario
}

type SimulationScenario struct {
    Name            string
    WorkerCount     int
    VolumeCount     int
    FailurePatterns []FailurePattern
    Duration        time.Duration
}

type FailurePattern struct {
    Type        FailureType  // "worker_timeout", "task_stuck", "duplicate"
    Probability float64      // 0.0 to 1.0
    Timing      TimingSpec   // When during task execution
    Duration    time.Duration
}

6.2 Test Scenarios

Scenario 1: Worker Timeout During EC

  • Start EC task on 30GB volume
  • Kill worker at 50% progress
  • Verify task reassignment
  • Verify no duplicate EC operations

Scenario 2: Stuck Vacuum Task

  • Start vacuum on high-garbage volume
  • Simulate worker hanging at 75% progress
  • Verify timeout detection and cleanup
  • Verify volume state consistency

Scenario 3: Duplicate Task Prevention

  • Submit same EC task from multiple sources
  • Verify only one task executes
  • Verify proper conflict resolution

Scenario 4: Master-Admin State Divergence

  • Create in-progress EC task
  • Simulate master restart
  • Verify state reconciliation
  • Verify shard assignment accounts for in-progress work

7. Performance & Scalability

7.1 Metrics & Monitoring

type SystemMetrics struct {
    TasksPerSecond     float64
    WorkerUtilization  float64
    AverageTaskTime    time.Duration
    FailureRate        float64
    QueueDepth         int
    VolumeStatesSync   bool
}

7.2 Scalability Considerations

  • Horizontal Worker Scaling: Add workers without admin server changes
  • Admin Server HA: Master-slave admin servers for fault tolerance
  • Task Partitioning: Partition tasks by collection or datacenter
  • Batch Operations: Group similar tasks for efficiency

8. Implementation Plan

Phase 1: Core Infrastructure

  1. Admin server basic framework
  2. Worker registration and heartbeat
  3. Simple task assignment
  4. Basic progress tracking

Phase 2: Advanced Features

  1. Volume state reconciliation
  2. Sophisticated worker selection
  3. Failure detection and recovery
  4. Duplicate prevention

Phase 3: Optimization & Monitoring

  1. Performance metrics
  2. Load balancing algorithms
  3. Capacity planning integration
  4. Comprehensive monitoring

This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.