Browse Source

compiles, detection uses volumeSizeLimitMB from master

worker-execute-ec-tasks
chrislu 5 months ago
parent
commit
16a29b57f9
  1. 20
      weed/admin/maintenance/maintenance_scanner.go
  2. 6
      weed/admin/task/compilation_stubs.go
  3. 94
      weed/admin/task/master_sync.go
  4. 113
      weed/worker/tasks/erasure_coding/ec_detector.go

20
weed/admin/maintenance/maintenance_scanner.go

@ -62,8 +62,19 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
var volumeSizeLimitMB uint64
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
// First, get volume size limit from master configuration
configResp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
glog.Warningf("Failed to get volume size limit from master: %v", err)
volumeSizeLimitMB = 30000 // Default to 30GB if we can't get from master
} else {
volumeSizeLimitMB = uint64(configResp.VolumeSizeLimitMB)
}
// Now get volume list
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
@ -73,6 +84,8 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
return nil
}
volumeSizeLimitBytes := volumeSizeLimitMB * 1024 * 1024 // Convert MB to bytes
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
@ -94,11 +107,14 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
// Calculate fullness ratio (would need volume size limit)
// metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
// Calculate fullness ratio using actual volume size limit from master
metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
}
metric.Age = time.Since(metric.LastModified)
glog.V(3).Infof("Volume %d: size=%d, limit=%d, fullness=%.2f",
metric.VolumeID, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio)
metrics = append(metrics, metric)
}
}

6
weed/admin/task/compilation_stubs.go

@ -50,9 +50,3 @@ func (vsm *VolumeStateManager) DetectInconsistencies() []StateInconsistency {
// Stub implementation - return empty slice
return []StateInconsistency{}
}
// detectMaintenanceCandidates is a stub for the master synchronizer
func (ms *MasterSynchronizer) detectMaintenanceCandidates(data interface{}) []*VolumeMaintenanceCandidate {
// Stub implementation - return empty slice
return []*VolumeMaintenanceCandidate{}
}

94
weed/admin/task/master_sync.go

@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// MasterSynchronizer handles periodic synchronization with the master server
@ -17,6 +18,7 @@ type MasterSynchronizer struct {
adminServer *AdminServer
syncInterval time.Duration
stopCh chan struct{}
volumeSizeLimitMB uint64 // Volume size limit from master in MB
}
// NewMasterSynchronizer creates a new master synchronizer
@ -70,6 +72,12 @@ func (ms *MasterSynchronizer) performSync() {
return
}
// Update volume size limit from master
if volumeData.VolumeSizeLimitMb > 0 {
ms.volumeSizeLimitMB = volumeData.VolumeSizeLimitMb
glog.V(2).Infof("Updated volume size limit to %d MB from master", ms.volumeSizeLimitMB)
}
// Merge data into volume state manager
err = ms.mergeVolumeData(volumeData)
if err != nil {
@ -161,9 +169,7 @@ func (ms *MasterSynchronizer) extractVolumesFromTopology(
// Initialize server capacity info
if serverCapacity[serverID] == nil {
serverCapacity[serverID] = &CapacityInfo{
Server: serverID,
DataCenter: dcInfo.Id,
Rack: rackInfo.Id,
Server: serverID,
}
}
@ -187,8 +193,8 @@ func (ms *MasterSynchronizer) processDiskInfo(
// Update capacity information
capacity := serverCapacity[serverID]
capacity.TotalCapacity += uint64(diskInfo.MaxVolumeCount) * (32 * 1024 * 1024 * 1024) // Assume 32GB per volume
capacity.UsedCapacity += uint64(diskInfo.ActiveVolumeCount) * (32 * 1024 * 1024 * 1024)
capacity.TotalCapacity += int64(diskInfo.MaxVolumeCount) * (32 * 1024 * 1024 * 1024) // Assume 32GB per volume
capacity.UsedCapacity += int64(diskInfo.ActiveVolumeCount) * (32 * 1024 * 1024 * 1024)
// Process regular volumes
for _, volInfo := range diskInfo.VolumeInfos {
@ -202,7 +208,7 @@ func (ms *MasterSynchronizer) processDiskInfo(
ReadOnly: volInfo.ReadOnly,
Server: serverID,
DiskType: diskType,
LastModified: time.Unix(volInfo.ModifiedAtSecond, 0),
ModifiedAtSecond: volInfo.ModifiedAtSecond,
}
}
@ -217,12 +223,10 @@ func (ms *MasterSynchronizer) processDiskInfo(
for shardID := 0; shardID < 14; shardID++ {
if (shardInfo.EcIndexBits & (1 << uint(shardID))) != 0 {
ecShards[volumeID][shardID] = &ShardInfo{
VolumeID: volumeID,
ShardID: shardID,
Server: serverID,
Status: ShardStatusExists,
Size: 0, // Size not available in shard info
DiskType: shardInfo.DiskType,
ShardID: shardID,
Server: serverID,
Status: ShardStatusExists,
Size: 0, // Size not available in shard info
}
}
}
@ -261,23 +265,30 @@ func (ms *MasterSynchronizer) detectMaintenanceCandidates(data *master_pb.Volume
return candidates
}
// checkECEncodingCandidate checks if a volume is a candidate for EC encoding
// EC encoding criteria - using size limit from master
func (ms *MasterSynchronizer) checkECEncodingCandidate(volumeID uint32, state *VolumeState) *VolumeMaintenanceCandidate {
volume := state.CurrentState
if volume == nil {
return nil
}
// Skip EC encoding detection if no volume size limit is set from master
if ms.volumeSizeLimitMB <= 0 {
return nil
}
// EC encoding criteria:
// 1. Volume is read-only or large enough
// 2. Not already EC encoded
// 3. Size threshold met (e.g., > 20GB)
// 3. Size threshold met
const ecSizeThreshold = 20 * 1024 * 1024 * 1024 // 20GB
// Convert MB to bytes and use a fraction for EC threshold (e.g., 50% of size limit)
ecSizeThreshold := (ms.volumeSizeLimitMB * 1024 * 1024) / 2
// Check if volume is already EC encoded by checking if we have any EC shards for this volume
// For simplicity, assume no EC encoding for now since we don't have direct access to EC shard state
isCandidate := (volume.ReadOnly || volume.Size > ecSizeThreshold) &&
len(state.ECShardState) == 0 &&
volume.Size > 1024*1024*1024 // At least 1GB
volume.Size > 1024*1024 // At least 1MB
if !isCandidate {
return nil
@ -287,8 +298,8 @@ func (ms *MasterSynchronizer) checkECEncodingCandidate(volumeID uint32, state *V
VolumeID: volumeID,
Server: volume.Server,
TaskType: "ec_encode",
Priority: TaskPriorityNormal,
Reason: fmt.Sprintf("Volume size %d bytes exceeds EC threshold", volume.Size),
Priority: types.TaskPriorityNormal,
Reason: fmt.Sprintf("Volume size %d bytes exceeds EC threshold %d", volume.Size, ecSizeThreshold),
VolumeInfo: volume,
}
}
@ -319,7 +330,7 @@ func (ms *MasterSynchronizer) checkVacuumCandidate(volumeID uint32, state *Volum
VolumeID: volumeID,
Server: volume.Server,
TaskType: "vacuum",
Priority: TaskPriorityNormal,
Priority: types.TaskPriorityNormal,
Reason: fmt.Sprintf("Deleted bytes %d (%.1f%%) exceed vacuum threshold",
volume.DeletedByteCount, deletedRatio*100),
VolumeInfo: volume,
@ -328,40 +339,8 @@ func (ms *MasterSynchronizer) checkVacuumCandidate(volumeID uint32, state *Volum
// checkECRebuildCandidate checks if an EC volume needs shard rebuilding
func (ms *MasterSynchronizer) checkECRebuildCandidate(volumeID uint32, state *VolumeState) *VolumeMaintenanceCandidate {
if len(state.ECShardState) == 0 {
return nil // Not an EC volume
}
// Check for missing or corrupted shards
missingShards := 0
corruptedShards := 0
for shardID := 0; shardID < 14; shardID++ {
shardState, exists := state.ECShardState[shardID]
if !exists {
missingShards++
} else if len(shardState.CurrentShards) == 0 {
missingShards++
} else {
// Check for corrupted shards
for _, shard := range shardState.CurrentShards {
if shard.Status == ShardStatusCorrupted {
corruptedShards++
}
}
}
}
// Need rebuild if any shards are missing or corrupted
if missingShards > 0 || corruptedShards > 0 {
return &VolumeMaintenanceCandidate{
VolumeID: volumeID,
TaskType: "ec_rebuild",
Priority: TaskPriorityHigh, // High priority for data integrity
Reason: fmt.Sprintf("Missing %d shards, corrupted %d shards", missingShards, corruptedShards),
}
}
// For now, skip EC rebuild detection as it requires more complex shard state tracking
// This would be implemented when the volume state manager provides proper EC shard access
return nil
}
@ -390,7 +369,7 @@ func (ms *MasterSynchronizer) canAssignCandidate(candidate *VolumeMaintenanceCan
// Check if server has capacity for the task
if candidate.TaskType == "ec_encode" {
// EC encoding requires significant temporary space
requiredSpace := candidate.VolumeInfo.Size * 2 // Estimate 2x volume size needed
requiredSpace := int64(candidate.VolumeInfo.Size * 2) // Estimate 2x volume size needed
if !ms.volumeStateManager.CanAssignVolumeToServer(requiredSpace, candidate.Server) {
return false
}
@ -414,10 +393,9 @@ func (ms *MasterSynchronizer) createTaskFromCandidate(candidate *VolumeMaintenan
Type: TaskType(candidate.TaskType),
VolumeID: candidate.VolumeID,
Priority: candidate.Priority,
Status: TaskStatusPending,
Status: types.TaskStatusPending,
CreatedAt: now,
UpdatedAt: now,
Parameters: map[string]string{
Parameters: map[string]interface{}{
"volume_id": fmt.Sprintf("%d", candidate.VolumeID),
"server": candidate.Server,
"reason": candidate.Reason,

113
weed/worker/tasks/erasure_coding/ec_detector.go

@ -1,6 +1,7 @@
package erasure_coding
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -12,20 +13,23 @@ type EcDetector struct {
enabled bool
volumeAgeHours int
fullnessRatio float64
minSizeMB int // Minimum volume size in MB before considering EC
scanInterval time.Duration
}
// Compile-time interface assertions
var (
_ types.TaskDetector = (*EcDetector)(nil)
_ types.TaskDetector = (*EcDetector)(nil)
_ types.PolicyConfigurableDetector = (*EcDetector)(nil)
)
// NewEcDetector creates a new erasure coding detector
// NewEcDetector creates a new erasure coding detector with configurable defaults
func NewEcDetector() *EcDetector {
return &EcDetector{
enabled: false, // Conservative default
volumeAgeHours: 24 * 7, // 1 week
fullnessRatio: 0.9, // 90% full
enabled: false, // Disabled by default for safety
volumeAgeHours: 24, // Require 24 hours age by default
fullnessRatio: 0.8, // 80% full by default
minSizeMB: 100, // Minimum 100MB before considering EC
scanInterval: 2 * time.Hour,
}
}
@ -38,12 +42,17 @@ func (d *EcDetector) GetTaskType() types.TaskType {
// ScanForTasks scans for volumes that should be converted to erasure coding
func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
if !d.enabled {
glog.V(2).Infof("EC detector is disabled")
return nil, nil
}
var results []*types.TaskDetectionResult
now := time.Now()
ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour
minSizeBytes := uint64(d.minSizeMB) * 1024 * 1024
glog.V(2).Infof("EC detector scanning %d volumes with thresholds: age=%dh, fullness=%.2f, minSize=%dMB",
len(volumeMetrics), d.volumeAgeHours, d.fullnessRatio, d.minSizeMB)
for _, metric := range volumeMetrics {
// Skip if already EC volume
@ -51,12 +60,18 @@ func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, cl
continue
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
continue
}
// Check age and fullness criteria
if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio {
// Check if volume is read-only (safe for EC conversion)
if !metric.IsReadOnly {
continue
}
// Note: Removed read-only requirement for testing
// In production, you might want to enable this:
// if !metric.IsReadOnly {
// continue
// }
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
@ -64,18 +79,23 @@ func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, cl
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
Reason: "Volume is old and full enough for EC conversion",
Reason: fmt.Sprintf("Volume meets EC criteria: age=%.1fh (>%dh), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
metric.Age.Hours(), d.volumeAgeHours, metric.FullnessRatio*100, d.fullnessRatio*100,
float64(metric.Size)/(1024*1024), d.minSizeMB),
Parameters: map[string]interface{}{
"age_hours": int(metric.Age.Hours()),
"fullness_ratio": metric.FullnessRatio,
"size_mb": int(metric.Size / (1024 * 1024)),
},
ScheduleAt: now,
}
results = append(results, result)
glog.V(1).Infof("EC task detected for volume %d on %s: %s", metric.VolumeID, metric.Server, result.Reason)
}
}
glog.V(2).Infof("EC detector found %d tasks to schedule", len(results))
glog.V(1).Infof("EC detector found %d tasks to schedule", len(results))
return results, nil
}
@ -89,7 +109,33 @@ func (d *EcDetector) IsEnabled() bool {
return d.enabled
}
// Configuration setters
// Configuration methods for runtime configuration
// Configure sets detector configuration from policy
func (d *EcDetector) Configure(config map[string]interface{}) error {
if enabled, ok := config["enabled"].(bool); ok {
d.enabled = enabled
}
if ageHours, ok := config["volume_age_hours"].(float64); ok {
d.volumeAgeHours = int(ageHours)
}
if fullnessRatio, ok := config["fullness_ratio"].(float64); ok {
d.fullnessRatio = fullnessRatio
}
if minSizeMB, ok := config["min_size_mb"].(float64); ok {
d.minSizeMB = int(minSizeMB)
}
glog.V(1).Infof("EC detector configured: enabled=%v, age=%dh, fullness=%.2f, minSize=%dMB",
d.enabled, d.volumeAgeHours, d.fullnessRatio, d.minSizeMB)
return nil
}
// Legacy compatibility methods for existing code
func (d *EcDetector) SetEnabled(enabled bool) {
d.enabled = enabled
@ -107,6 +153,33 @@ func (d *EcDetector) SetScanInterval(interval time.Duration) {
d.scanInterval = interval
}
// PolicyConfigurableDetector interface implementation
// ConfigureFromPolicy configures the detector from maintenance policy
func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
// Cast policy to maintenance policy type
if maintenancePolicy, ok := policy.(*types.MaintenancePolicy); ok {
// Get EC-specific configuration from policy
ecConfig := maintenancePolicy.GetTaskConfig(types.TaskTypeErasureCoding)
if ecConfig != nil {
// Convert to map for easier access
if configMap, ok := ecConfig.(map[string]interface{}); ok {
d.Configure(configMap)
} else {
glog.Warningf("EC detector policy configuration is not a map: %T", ecConfig)
}
} else {
// No specific configuration found, use defaults with policy-based enabled status
enabled := maintenancePolicy.GlobalSettings != nil && maintenancePolicy.GlobalSettings.MaintenanceEnabled
glog.V(2).Infof("No EC-specific config found, using default with enabled=%v", enabled)
d.enabled = enabled
}
} else {
glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy)
}
}
// GetVolumeAgeHours returns the current volume age threshold in hours
func (d *EcDetector) GetVolumeAgeHours() int {
return d.volumeAgeHours
@ -121,19 +194,3 @@ func (d *EcDetector) GetFullnessRatio() float64 {
func (d *EcDetector) GetScanInterval() time.Duration {
return d.scanInterval
}
// ConfigureFromPolicy configures the detector based on the maintenance policy
func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
// Type assert to the maintenance policy type we expect
if maintenancePolicy, ok := policy.(interface {
GetECEnabled() bool
GetECVolumeAgeHours() int
GetECFullnessRatio() float64
}); ok {
d.SetEnabled(maintenancePolicy.GetECEnabled())
d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours())
d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio())
} else {
glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type")
}
}
Loading…
Cancel
Save