You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
126 lines
3.6 KiB
126 lines
3.6 KiB
package erasure_coding
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Detection implements the detection logic for erasure coding tasks
|
|
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
|
|
if !config.IsEnabled() {
|
|
return nil, nil
|
|
}
|
|
|
|
ecConfig := config.(*Config)
|
|
var results []*types.TaskDetectionResult
|
|
now := time.Now()
|
|
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
|
|
minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
|
|
|
|
for _, metric := range metrics {
|
|
// Skip if already EC volume
|
|
if metric.IsECVolume {
|
|
continue
|
|
}
|
|
|
|
// Check minimum size requirement
|
|
if metric.Size < minSizeBytes {
|
|
continue
|
|
}
|
|
|
|
// Check collection filter if specified
|
|
if ecConfig.CollectionFilter != "" {
|
|
// Parse comma-separated collections
|
|
allowedCollections := make(map[string]bool)
|
|
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
|
|
allowedCollections[strings.TrimSpace(collection)] = true
|
|
}
|
|
// Skip if volume's collection is not in the allowed list
|
|
if !allowedCollections[metric.Collection] {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check quiet duration and fullness criteria
|
|
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
|
|
result := &types.TaskDetectionResult{
|
|
TaskType: types.TaskTypeErasureCoding,
|
|
VolumeID: metric.VolumeID,
|
|
Server: metric.Server,
|
|
Collection: metric.Collection,
|
|
Priority: types.TaskPriorityLow, // EC is not urgent
|
|
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
|
|
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
|
|
float64(metric.Size)/(1024*1024)),
|
|
Parameters: map[string]interface{}{
|
|
"age_seconds": int(metric.Age.Seconds()),
|
|
"fullness_ratio": metric.FullnessRatio,
|
|
"size_mb": int(metric.Size / (1024 * 1024)),
|
|
},
|
|
ScheduleAt: now,
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// Scheduling implements the scheduling logic for erasure coding tasks
|
|
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
|
|
ecConfig := config.(*Config)
|
|
|
|
// Check if we have available workers
|
|
if len(availableWorkers) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Count running EC tasks
|
|
runningCount := 0
|
|
for _, runningTask := range runningTasks {
|
|
if runningTask.Type == types.TaskTypeErasureCoding {
|
|
runningCount++
|
|
}
|
|
}
|
|
|
|
// Check concurrency limit
|
|
if runningCount >= ecConfig.MaxConcurrent {
|
|
return false
|
|
}
|
|
|
|
// Check if any worker can handle EC tasks
|
|
for _, worker := range availableWorkers {
|
|
for _, capability := range worker.Capabilities {
|
|
if capability == types.TaskTypeErasureCoding {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// CreateTask creates a new erasure coding task instance
|
|
func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
|
|
// Extract configuration from params
|
|
var config *Config
|
|
if configData, ok := params.Parameters["config"]; ok {
|
|
if configMap, ok := configData.(map[string]interface{}); ok {
|
|
config = &Config{}
|
|
if err := config.FromMap(configMap); err != nil {
|
|
return nil, fmt.Errorf("failed to parse erasure coding config: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if config == nil {
|
|
config = NewDefaultConfig()
|
|
}
|
|
|
|
// Create and return the erasure coding task using existing Task type
|
|
return NewTask(params.Server, params.VolumeID), nil
|
|
}
|