You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

145 lines
4.9 KiB

package ec_vacuum
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// Scheduling determines if an EC vacuum task should be scheduled for execution
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
ecVacuumConfig, ok := config.(*Config)
if !ok {
glog.Errorf("EC vacuum scheduling: invalid config type")
return false
}
// Count running EC vacuum tasks
runningCount := 0
for _, runningTask := range runningTasks {
if runningTask.Type == types.TaskType("ec_vacuum") {
runningCount++
}
}
// Check concurrency limit
if runningCount >= ecVacuumConfig.MaxConcurrent {
glog.V(2).Infof("EC vacuum scheduling: max concurrent limit reached (%d/%d)", runningCount, ecVacuumConfig.MaxConcurrent)
return false
}
// Check if any worker can handle EC vacuum tasks
hasCapableWorker := false
var selectedWorker *types.WorkerData
for _, worker := range availableWorkers {
if canWorkerHandleEcVacuum(worker, task) {
hasCapableWorker = true
selectedWorker = worker
break
}
}
if !hasCapableWorker {
glog.V(2).Infof("EC vacuum scheduling: no capable workers available for task %s", task.ID)
return false
}
// Check worker resource availability
if !hasEnoughResources(selectedWorker, task) {
glog.V(2).Infof("EC vacuum scheduling: worker %s doesn't have enough resources for task %s",
selectedWorker.ID, task.ID)
return false
}
// Additional checks for EC vacuum specific requirements
if !meetsEcVacuumRequirements(task, ecVacuumConfig) {
glog.V(2).Infof("EC vacuum scheduling: task %s doesn't meet EC vacuum requirements", task.ID)
return false
}
glog.V(1).Infof("EC vacuum scheduling: approved task %s for worker %s", task.ID, selectedWorker.ID)
return true
}
// canWorkerHandleEcVacuum checks if a worker can handle EC vacuum tasks
func canWorkerHandleEcVacuum(worker *types.WorkerData, task *types.TaskInput) bool {
// Check if worker has EC vacuum capability
for _, capability := range worker.Capabilities {
if capability == types.TaskType("ec_vacuum") {
return true
}
// Also accept workers with general erasure_coding capability
if capability == types.TaskType("erasure_coding") {
return true
}
}
glog.V(3).Infof("Worker %s lacks EC vacuum capability", worker.ID)
return false
}
// hasEnoughResources checks if a worker has sufficient resources for EC vacuum
func hasEnoughResources(worker *types.WorkerData, task *types.TaskInput) bool {
// Check current load using what's available in WorkerData
if worker.CurrentLoad >= 2 { // Conservative limit for EC vacuum
glog.V(3).Infof("Worker %s at capacity: load=%d", worker.ID, worker.CurrentLoad)
return false
}
// EC vacuum tasks require more resources than regular tasks
// because they involve decode/encode operations
// We'll assume workers have sufficient resources for now
// In a production system, these checks would be more sophisticated
return true
}
// meetsEcVacuumRequirements checks EC vacuum specific requirements
func meetsEcVacuumRequirements(task *types.TaskInput, config *Config) bool {
// Validate task has required parameters
if task.VolumeID == 0 {
glog.V(3).Infof("EC vacuum task %s missing volume ID", task.ID)
return false
}
// Check if this is during allowed time windows (if any restrictions)
// For now, we allow EC vacuum anytime, but this could be made configurable
// Validate collection filter if specified
if config.CollectionFilter != "" && task.Collection != config.CollectionFilter {
glog.V(3).Infof("EC vacuum task %s collection %s doesn't match filter %s",
task.ID, task.Collection, config.CollectionFilter)
return false
}
// Additional safety checks could be added here, such as:
// - Checking if volume is currently being written to
// - Verifying minimum deletion threshold is still met
// - Ensuring cluster health is good for such operations
return true
}
// GetResourceRequirements returns the resource requirements for EC vacuum tasks
func GetResourceRequirements() map[string]interface{} {
return map[string]interface{}{
"MinConcurrentSlots": 2, // Need extra slots for decode/encode
"MinDiskSpaceGB": 10, // Minimum 10GB free space
"MinMemoryMB": 1024, // 1GB memory for operations
"PreferredNetworkMbps": 100, // Good network for shard transfers
"RequiredCapabilities": []string{"ec_vacuum", "erasure_coding"},
"ConflictingTaskTypes": []string{"erasure_coding"}, // Don't run with regular EC tasks on same volume
}
}
// CalculateTaskPriority calculates priority for EC vacuum tasks
func CalculateTaskPriority(task *types.TaskInput, metrics *types.VolumeHealthMetrics) types.TaskPriority {
// Higher priority for larger volumes (more space to reclaim)
if task.VolumeID > 1000000 { // Rough size indicator
return types.TaskPriorityMedium
}
// Default priority
return types.TaskPriorityLow
}