You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
145 lines
4.9 KiB
145 lines
4.9 KiB
package ec_vacuum
|
|
|
|
import (
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Scheduling determines if an EC vacuum task should be scheduled for execution
|
|
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
|
|
ecVacuumConfig, ok := config.(*Config)
|
|
if !ok {
|
|
glog.Errorf("EC vacuum scheduling: invalid config type")
|
|
return false
|
|
}
|
|
|
|
// Count running EC vacuum tasks
|
|
runningCount := 0
|
|
for _, runningTask := range runningTasks {
|
|
if runningTask.Type == types.TaskType("ec_vacuum") {
|
|
runningCount++
|
|
}
|
|
}
|
|
|
|
// Check concurrency limit
|
|
if runningCount >= ecVacuumConfig.MaxConcurrent {
|
|
glog.V(2).Infof("EC vacuum scheduling: max concurrent limit reached (%d/%d)", runningCount, ecVacuumConfig.MaxConcurrent)
|
|
return false
|
|
}
|
|
|
|
// Check if any worker can handle EC vacuum tasks
|
|
hasCapableWorker := false
|
|
var selectedWorker *types.WorkerData
|
|
|
|
for _, worker := range availableWorkers {
|
|
if canWorkerHandleEcVacuum(worker, task) {
|
|
hasCapableWorker = true
|
|
selectedWorker = worker
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasCapableWorker {
|
|
glog.V(2).Infof("EC vacuum scheduling: no capable workers available for task %s", task.ID)
|
|
return false
|
|
}
|
|
|
|
// Check worker resource availability
|
|
if !hasEnoughResources(selectedWorker, task) {
|
|
glog.V(2).Infof("EC vacuum scheduling: worker %s doesn't have enough resources for task %s",
|
|
selectedWorker.ID, task.ID)
|
|
return false
|
|
}
|
|
|
|
// Additional checks for EC vacuum specific requirements
|
|
if !meetsEcVacuumRequirements(task, ecVacuumConfig) {
|
|
glog.V(2).Infof("EC vacuum scheduling: task %s doesn't meet EC vacuum requirements", task.ID)
|
|
return false
|
|
}
|
|
|
|
glog.V(1).Infof("EC vacuum scheduling: approved task %s for worker %s", task.ID, selectedWorker.ID)
|
|
return true
|
|
}
|
|
|
|
// canWorkerHandleEcVacuum checks if a worker can handle EC vacuum tasks
|
|
func canWorkerHandleEcVacuum(worker *types.WorkerData, task *types.TaskInput) bool {
|
|
// Check if worker has EC vacuum capability
|
|
for _, capability := range worker.Capabilities {
|
|
if capability == types.TaskType("ec_vacuum") {
|
|
return true
|
|
}
|
|
// Also accept workers with general erasure_coding capability
|
|
if capability == types.TaskType("erasure_coding") {
|
|
return true
|
|
}
|
|
}
|
|
|
|
glog.V(3).Infof("Worker %s lacks EC vacuum capability", worker.ID)
|
|
return false
|
|
}
|
|
|
|
// hasEnoughResources checks if a worker has sufficient resources for EC vacuum
|
|
func hasEnoughResources(worker *types.WorkerData, task *types.TaskInput) bool {
|
|
// Check current load using what's available in WorkerData
|
|
if worker.CurrentLoad >= 2 { // Conservative limit for EC vacuum
|
|
glog.V(3).Infof("Worker %s at capacity: load=%d", worker.ID, worker.CurrentLoad)
|
|
return false
|
|
}
|
|
|
|
// EC vacuum tasks require more resources than regular tasks
|
|
// because they involve decode/encode operations
|
|
// We'll assume workers have sufficient resources for now
|
|
// In a production system, these checks would be more sophisticated
|
|
|
|
return true
|
|
}
|
|
|
|
// meetsEcVacuumRequirements checks EC vacuum specific requirements
|
|
func meetsEcVacuumRequirements(task *types.TaskInput, config *Config) bool {
|
|
// Validate task has required parameters
|
|
if task.VolumeID == 0 {
|
|
glog.V(3).Infof("EC vacuum task %s missing volume ID", task.ID)
|
|
return false
|
|
}
|
|
|
|
// Check if this is during allowed time windows (if any restrictions)
|
|
// For now, we allow EC vacuum anytime, but this could be made configurable
|
|
|
|
// Validate collection filter if specified
|
|
if config.CollectionFilter != "" && task.Collection != config.CollectionFilter {
|
|
glog.V(3).Infof("EC vacuum task %s collection %s doesn't match filter %s",
|
|
task.ID, task.Collection, config.CollectionFilter)
|
|
return false
|
|
}
|
|
|
|
// Additional safety checks could be added here, such as:
|
|
// - Checking if volume is currently being written to
|
|
// - Verifying minimum deletion threshold is still met
|
|
// - Ensuring cluster health is good for such operations
|
|
|
|
return true
|
|
}
|
|
|
|
// GetResourceRequirements returns the resource requirements for EC vacuum tasks
|
|
func GetResourceRequirements() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"MinConcurrentSlots": 2, // Need extra slots for decode/encode
|
|
"MinDiskSpaceGB": 10, // Minimum 10GB free space
|
|
"MinMemoryMB": 1024, // 1GB memory for operations
|
|
"PreferredNetworkMbps": 100, // Good network for shard transfers
|
|
"RequiredCapabilities": []string{"ec_vacuum", "erasure_coding"},
|
|
"ConflictingTaskTypes": []string{"erasure_coding"}, // Don't run with regular EC tasks on same volume
|
|
}
|
|
}
|
|
|
|
// CalculateTaskPriority calculates priority for EC vacuum tasks
|
|
func CalculateTaskPriority(task *types.TaskInput, metrics *types.VolumeHealthMetrics) types.TaskPriority {
|
|
// Higher priority for larger volumes (more space to reclaim)
|
|
if task.VolumeID > 1000000 { // Rough size indicator
|
|
return types.TaskPriorityMedium
|
|
}
|
|
|
|
// Default priority
|
|
return types.TaskPriorityLow
|
|
}
|