You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
135 lines
3.6 KiB
135 lines
3.6 KiB
package balance
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Detection implements the detection logic for balance tasks
|
|
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
|
|
if !config.IsEnabled() {
|
|
return nil, nil
|
|
}
|
|
|
|
balanceConfig := config.(*Config)
|
|
|
|
// Skip if cluster is too small
|
|
minVolumeCount := 10
|
|
if len(metrics) < minVolumeCount {
|
|
return nil, nil
|
|
}
|
|
|
|
// Analyze volume distribution across servers
|
|
serverVolumeCounts := make(map[string]int)
|
|
for _, metric := range metrics {
|
|
serverVolumeCounts[metric.Server]++
|
|
}
|
|
|
|
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
|
|
return nil, nil
|
|
}
|
|
|
|
// Calculate balance metrics
|
|
totalVolumes := len(metrics)
|
|
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
|
|
|
|
maxVolumes := 0
|
|
minVolumes := totalVolumes
|
|
maxServer := ""
|
|
minServer := ""
|
|
|
|
for server, count := range serverVolumeCounts {
|
|
if count > maxVolumes {
|
|
maxVolumes = count
|
|
maxServer = server
|
|
}
|
|
if count < minVolumes {
|
|
minVolumes = count
|
|
minServer = server
|
|
}
|
|
}
|
|
|
|
// Check if imbalance exceeds threshold
|
|
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
|
|
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
|
|
return nil, nil
|
|
}
|
|
|
|
// Create balance task
|
|
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
|
|
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
|
|
|
|
task := &types.TaskDetectionResult{
|
|
TaskType: types.TaskTypeBalance,
|
|
Priority: types.TaskPriorityNormal,
|
|
Reason: reason,
|
|
ScheduleAt: time.Now(),
|
|
Parameters: map[string]interface{}{
|
|
"imbalance_ratio": imbalanceRatio,
|
|
"threshold": balanceConfig.ImbalanceThreshold,
|
|
"max_volumes": maxVolumes,
|
|
"min_volumes": minVolumes,
|
|
"avg_volumes_per_server": avgVolumesPerServer,
|
|
"max_server": maxServer,
|
|
"min_server": minServer,
|
|
"total_servers": len(serverVolumeCounts),
|
|
},
|
|
}
|
|
|
|
return []*types.TaskDetectionResult{task}, nil
|
|
}
|
|
|
|
// Scheduling implements the scheduling logic for balance tasks
|
|
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
|
|
balanceConfig := config.(*Config)
|
|
|
|
// Count running balance tasks
|
|
runningBalanceCount := 0
|
|
for _, runningTask := range runningTasks {
|
|
if runningTask.Type == types.TaskTypeBalance {
|
|
runningBalanceCount++
|
|
}
|
|
}
|
|
|
|
// Check concurrency limit
|
|
if runningBalanceCount >= balanceConfig.MaxConcurrent {
|
|
return false
|
|
}
|
|
|
|
// Check if we have available workers
|
|
availableWorkerCount := 0
|
|
for _, worker := range availableWorkers {
|
|
for _, capability := range worker.Capabilities {
|
|
if capability == types.TaskTypeBalance {
|
|
availableWorkerCount++
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return availableWorkerCount > 0
|
|
}
|
|
|
|
// CreateTask creates a new balance task instance
|
|
func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
|
|
// Extract configuration from params
|
|
var config *Config
|
|
if configData, ok := params.Parameters["config"]; ok {
|
|
if configMap, ok := configData.(map[string]interface{}); ok {
|
|
config = &Config{}
|
|
if err := config.FromMap(configMap); err != nil {
|
|
return nil, fmt.Errorf("failed to parse balance config: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if config == nil {
|
|
config = NewDefaultConfig()
|
|
}
|
|
|
|
// Create and return the balance task using existing Task type
|
|
return NewTask(params.Server, params.VolumeID, params.Collection), nil
|
|
}
|