diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 1122d2721..b40298f81 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -61,15 +61,16 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + expectedShardSize := calculateExpectedShardSize(metric.Size) result := &types.TaskDetectionResult{ TaskType: types.TaskTypeErasureCoding, VolumeID: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, Priority: types.TaskPriorityLow, // EC is not urgent - Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)", + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB), expected shard size=%.1fMB", metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, - float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB), + float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB, float64(expectedShardSize)/(1024*1024)), ScheduleAt: now, } @@ -97,8 +98,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI }, } - glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs", - metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs) + glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs, expected shard size: %.2fMB", + metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs, float64(expectedShardSize)/(1024*1024)) } else { glog.Warningf("No ActiveTopology available for destination planning in EC detection") continue // Skip this volume if no topology available @@ -143,6 +144,10 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // planECDestinations plans the destinations for erasure coding operation // This function implements EC destination planning logic directly in the detection phase func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + // Calculate expected shard size based on original volume size + // EC data is split across data shards, so each shard gets roughly volume_size / data_shards + expectedShardSize := calculateExpectedShardSize(metric.Size) + // Get source node information from topology var sourceRack, sourceDC string @@ -178,9 +183,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V } // Select best disks for EC placement with rack/DC diversity - selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount) - if len(selectedDisks) < minTotalDisks { - return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks) + selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount, expectedShardSize) + if len(selectedDisks) < erasure_coding.MinTotalDisks { + return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks) } var plans []*topology.DestinationPlan @@ -193,8 +198,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetDisk: disk.DiskID, TargetRack: disk.Rack, TargetDC: disk.DataCenter, - ExpectedSize: 0, // EC shards don't have predetermined size - PlacementScore: calculateECScore(disk, sourceRack, sourceDC), + ExpectedSize: expectedShardSize, // Use calculated shard size + PlacementScore: calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize), Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } plans = append(plans, plan) @@ -205,6 +210,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V dcCount[disk.DataCenter]++ } + glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs", + metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount)) + return &topology.MultiDestinationPlan{ Plans: plans, TotalShards: len(plans), @@ -243,7 +251,7 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era } // selectBestECDestinations selects multiple disks for EC shard placement with diversity -func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { +func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int, expectedShardSize uint64) []*topology.DiskInfo { if len(disks) == 0 { return nil } @@ -265,7 +273,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s } // Select best disk from this rack - bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) + bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC, expectedShardSize) if bestDisk != nil { selected = append(selected, bestDisk) usedRacks[rackKey] = true @@ -298,7 +306,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s } // selectBestFromRack selects the best disk from a rack for EC placement -func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { +func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) *topology.DiskInfo { if len(disks) == 0 { return nil } @@ -311,7 +319,7 @@ func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) continue } - score := calculateECScore(disk, sourceRack, sourceDC) + score := calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize) if score > bestScore { bestScore = score bestDisk = disk @@ -351,6 +359,24 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa return score } +// calculateECScoreWithSize calculates placement score for EC operations with shard size consideration +func calculateECScoreWithSize(disk *topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) float64 { + baseScore := calculateECScore(disk, sourceRack, sourceDC) + + // Additional scoring based on available space vs expected shard size + if disk.DiskInfo != nil && expectedShardSize > 0 { + // Estimate available space (this is a rough estimate) + availableSlots := disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount + if availableSlots > 0 { + // Bonus for having plenty of space for the expected shard + // This is a heuristic - each volume slot can theoretically hold any size + baseScore += float64(availableSlots) * 2.0 // Up to 2 points per available slot + } + } + + return baseScore +} + // isDiskSuitableForEC checks if a disk is suitable for EC placement func isDiskSuitableForEC(disk *topology.DiskInfo) bool { if disk.DiskInfo == nil { @@ -414,3 +440,23 @@ func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32 return replicaServers } + +// calculateExpectedShardSize calculates the expected size of each EC shard based on the original volume size +func calculateExpectedShardSize(originalVolumeSize uint64) uint64 { + if originalVolumeSize == 0 { + return 0 + } + + // In erasure coding, the original data is split across data shards + // Each data shard gets approximately originalSize / dataShards + // Parity shards are similar in size to data shards + // Add some overhead for padding and metadata (typically ~5-10%) + baseShardSize := originalVolumeSize / uint64(erasure_coding.DataShardsCount) + overhead := baseShardSize / 10 // 10% overhead for padding and metadata + expectedShardSize := baseShardSize + overhead + + glog.V(2).Infof("Calculated expected shard size: original=%d bytes, base_shard=%d bytes, with_overhead=%d bytes", + originalVolumeSize, baseShardSize, expectedShardSize) + + return expectedShardSize +}