You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
114 lines
3.0 KiB
114 lines
3.0 KiB
package erasure_coding
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Scheduler implements erasure coding task scheduling
|
|
type Scheduler struct {
|
|
maxConcurrent int
|
|
enabled bool
|
|
}
|
|
|
|
// NewScheduler creates a new erasure coding scheduler
|
|
func NewScheduler() *Scheduler {
|
|
return &Scheduler{
|
|
maxConcurrent: 1, // Conservative default
|
|
enabled: false, // Conservative default
|
|
}
|
|
}
|
|
|
|
// GetTaskType returns the task type
|
|
func (s *Scheduler) GetTaskType() types.TaskType {
|
|
return types.TaskTypeErasureCoding
|
|
}
|
|
|
|
// CanScheduleNow determines if an erasure coding task can be scheduled now
|
|
func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
|
|
if !s.enabled {
|
|
return false
|
|
}
|
|
|
|
// Check if we have available workers
|
|
if len(availableWorkers) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Count running EC tasks
|
|
runningCount := 0
|
|
for _, runningTask := range runningTasks {
|
|
if runningTask.Type == types.TaskTypeErasureCoding {
|
|
runningCount++
|
|
}
|
|
}
|
|
|
|
// Check concurrency limit
|
|
if runningCount >= s.maxConcurrent {
|
|
glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
|
|
return false
|
|
}
|
|
|
|
// Check if any worker can handle EC tasks
|
|
for _, worker := range availableWorkers {
|
|
for _, capability := range worker.Capabilities {
|
|
if capability == types.TaskTypeErasureCoding {
|
|
glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetMaxConcurrent returns the maximum number of concurrent tasks
|
|
func (s *Scheduler) GetMaxConcurrent() int {
|
|
return s.maxConcurrent
|
|
}
|
|
|
|
// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
|
|
func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
|
|
return 24 * time.Hour // Don't repeat EC for 24 hours
|
|
}
|
|
|
|
// GetPriority returns the priority for this task
|
|
func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
|
|
return types.TaskPriorityLow // EC is not urgent
|
|
}
|
|
|
|
// WasTaskRecentlyCompleted checks if a similar task was recently completed
|
|
func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
|
|
// Don't repeat EC for 24 hours
|
|
interval := 24 * time.Hour
|
|
cutoff := now.Add(-interval)
|
|
|
|
for _, completedTask := range completedTasks {
|
|
if completedTask.Type == types.TaskTypeErasureCoding &&
|
|
completedTask.VolumeID == task.VolumeID &&
|
|
completedTask.Server == task.Server &&
|
|
completedTask.Status == types.TaskStatusCompleted &&
|
|
completedTask.CompletedAt != nil &&
|
|
completedTask.CompletedAt.After(cutoff) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IsEnabled returns whether this task type is enabled
|
|
func (s *Scheduler) IsEnabled() bool {
|
|
return s.enabled
|
|
}
|
|
|
|
// Configuration setters
|
|
|
|
func (s *Scheduler) SetEnabled(enabled bool) {
|
|
s.enabled = enabled
|
|
}
|
|
|
|
func (s *Scheduler) SetMaxConcurrent(max int) {
|
|
s.maxConcurrent = max
|
|
}
|