Browse Source

checkECEncodingCandidate use ecDetector

worker-execute-ec-tasks
chrislu 4 months ago
parent
commit
e1036f1b37
  1. 63
      weed/admin/task/master_sync.go

63
weed/admin/task/master_sync.go

@ -4,11 +4,13 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@ -267,41 +269,70 @@ func (ms *MasterSynchronizer) detectMaintenanceCandidates(data *master_pb.Volume
return candidates
}
// EC encoding criteria - using size limit from master
// EC encoding criteria - using configuration from EC detector
func (ms *MasterSynchronizer) checkECEncodingCandidate(volumeID uint32, state *VolumeState) *VolumeMaintenanceCandidate {
volume := state.CurrentState
if volume == nil {
return nil
}
// Skip EC encoding detection if no volume size limit is set from master
if ms.volumeSizeLimitMB <= 0 {
// Get the current configuration from the EC detector
ecDetector, _ := erasure_coding.GetSharedInstances()
if ecDetector == nil || !ecDetector.IsEnabled() {
return nil
}
// EC encoding criteria:
// 1. Volume is read-only or large enough
// 2. Not already EC encoded
// 3. Size threshold met
// Get configuration values from the detector
fullnessThreshold := ecDetector.GetFullnessRatio()
quietForSeconds := ecDetector.GetQuietForSeconds()
collectionFilter := ecDetector.GetCollectionFilter()
// Convert MB to bytes and use a fraction for EC threshold (e.g., 50% of size limit)
ecSizeThreshold := (ms.volumeSizeLimitMB * 1024 * 1024) / 2
// EC encoding criteria:
// 1. Volume meets fullness ratio threshold
// 2. Volume has been quiet for required duration
// 3. Collection filter matches (if specified)
// 4. Not already EC encoded
// Check if volume is already EC encoded by checking if we have any EC shards for this volume
// For simplicity, assume no EC encoding for now since we don't have direct access to EC shard state
isCandidate := volume.Size > ecSizeThreshold &&
volume.Size > 1024*1024 // At least 1MB
// Check fullness ratio (if we have size info)
if volume.Size == 0 {
return nil
}
if !isCandidate {
// Calculate fullness ratio (assuming total capacity is close to actual size for near-full volumes)
// For a more accurate calculation, we'd need the volume's max capacity
fullnessRatio := float64(volume.Size-volume.DeletedByteCount) / float64(volume.Size)
if fullnessRatio < fullnessThreshold {
return nil
}
// Check collection filter if specified
if collectionFilter != "" {
// Parse comma-separated collections
allowedCollections := make(map[string]bool)
for _, collection := range strings.Split(collectionFilter, ",") {
allowedCollections[strings.TrimSpace(collection)] = true
}
// Skip if volume's collection is not in the allowed list
if !allowedCollections[volume.Collection] {
return nil
}
}
// Check quiet duration using volume's last modification time
now := time.Now()
lastModified := time.Unix(volume.ModifiedAtSecond, 0)
timeSinceModification := now.Sub(lastModified)
if timeSinceModification < time.Duration(quietForSeconds)*time.Second {
return nil // Volume hasn't been quiet long enough
}
return &VolumeMaintenanceCandidate{
VolumeID: volumeID,
Server: volume.Server,
TaskType: "ec_encode",
Priority: types.TaskPriorityNormal,
Reason: fmt.Sprintf("Volume size %d bytes exceeds EC threshold %d", volume.Size, ecSizeThreshold),
Priority: types.TaskPriorityLow, // EC is typically low priority
Reason: fmt.Sprintf("Volume meets EC criteria: fullness=%.1f%% (>%.1f%%), quiet for %s (>%ds), collection='%s'", fullnessRatio*100, fullnessThreshold*100, timeSinceModification.Truncate(time.Second), quietForSeconds, volume.Collection),
VolumeInfo: volume,
}
}

Loading…
Cancel
Save