You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
459 lines
15 KiB
459 lines
15 KiB
package erasure_coding
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Detection implements the detection logic for erasure coding tasks
|
|
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
|
|
if !config.IsEnabled() {
|
|
return nil, nil
|
|
}
|
|
|
|
ecConfig := config.(*Config)
|
|
var results []*types.TaskDetectionResult
|
|
now := time.Now()
|
|
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
|
|
minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
|
|
|
|
debugCount := 0
|
|
skippedAlreadyEC := 0
|
|
skippedTooSmall := 0
|
|
skippedCollectionFilter := 0
|
|
skippedQuietTime := 0
|
|
skippedFullness := 0
|
|
|
|
for _, metric := range metrics {
|
|
// Skip if already EC volume
|
|
if metric.IsECVolume {
|
|
skippedAlreadyEC++
|
|
continue
|
|
}
|
|
|
|
// Check minimum size requirement
|
|
if metric.Size < minSizeBytes {
|
|
skippedTooSmall++
|
|
continue
|
|
}
|
|
|
|
// Check collection filter if specified
|
|
if ecConfig.CollectionFilter != "" {
|
|
// Parse comma-separated collections
|
|
allowedCollections := make(map[string]bool)
|
|
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
|
|
allowedCollections[strings.TrimSpace(collection)] = true
|
|
}
|
|
// Skip if volume's collection is not in the allowed list
|
|
if !allowedCollections[metric.Collection] {
|
|
skippedCollectionFilter++
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check quiet duration and fullness criteria
|
|
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
|
|
expectedShardSize := calculateExpectedShardSize(metric.Size)
|
|
result := &types.TaskDetectionResult{
|
|
TaskType: types.TaskTypeErasureCoding,
|
|
VolumeID: metric.VolumeID,
|
|
Server: metric.Server,
|
|
Collection: metric.Collection,
|
|
Priority: types.TaskPriorityLow, // EC is not urgent
|
|
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB), expected shard size=%.1fMB",
|
|
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
|
|
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB, float64(expectedShardSize)/(1024*1024)),
|
|
ScheduleAt: now,
|
|
}
|
|
|
|
// Plan EC destinations if ActiveTopology is available
|
|
if clusterInfo.ActiveTopology != nil {
|
|
multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
|
|
if err != nil {
|
|
glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
|
|
continue // Skip this volume if destination planning fails
|
|
}
|
|
|
|
// Find all volume replicas from topology
|
|
replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
|
|
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
|
|
|
|
// Create typed parameters with EC destination information and replicas
|
|
result.TypedParams = &worker_pb.TaskParams{
|
|
VolumeId: metric.VolumeID,
|
|
Server: metric.Server,
|
|
Collection: metric.Collection,
|
|
VolumeSize: metric.Size, // Store original volume size for tracking changes
|
|
Replicas: replicas, // Include all volume replicas for deletion
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: createECTaskParams(multiPlan),
|
|
},
|
|
}
|
|
|
|
glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs, expected shard size: %.2fMB",
|
|
metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs, float64(expectedShardSize)/(1024*1024))
|
|
} else {
|
|
glog.Warningf("No ActiveTopology available for destination planning in EC detection")
|
|
continue // Skip this volume if no topology available
|
|
}
|
|
|
|
results = append(results, result)
|
|
} else {
|
|
// Count debug reasons
|
|
if debugCount < 5 { // Limit to avoid spam
|
|
if metric.Age < quietThreshold {
|
|
skippedQuietTime++
|
|
}
|
|
if metric.FullnessRatio < ecConfig.FullnessRatio {
|
|
skippedFullness++
|
|
}
|
|
}
|
|
debugCount++
|
|
}
|
|
}
|
|
|
|
// Log debug summary if no tasks were created
|
|
if len(results) == 0 && len(metrics) > 0 {
|
|
totalVolumes := len(metrics)
|
|
glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
|
|
totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
|
|
|
|
// Show details for first few volumes
|
|
for i, metric := range metrics {
|
|
if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
|
|
continue
|
|
}
|
|
sizeMB := float64(metric.Size) / (1024 * 1024)
|
|
glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
|
|
metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
|
|
metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// planECDestinations plans the destinations for erasure coding operation
|
|
// This function implements EC destination planning logic directly in the detection phase
|
|
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
|
|
// Calculate expected shard size based on original volume size
|
|
// EC data is split across data shards, so each shard gets roughly volume_size / data_shards
|
|
expectedShardSize := calculateExpectedShardSize(metric.Size)
|
|
|
|
// Get source node information from topology
|
|
var sourceRack, sourceDC string
|
|
|
|
// Extract rack and DC from topology info
|
|
topologyInfo := activeTopology.GetTopologyInfo()
|
|
if topologyInfo != nil {
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, dataNodeInfo := range rack.DataNodeInfos {
|
|
if dataNodeInfo.Id == metric.Server {
|
|
sourceDC = dc.Id
|
|
sourceRack = rack.Id
|
|
break
|
|
}
|
|
}
|
|
if sourceRack != "" {
|
|
break
|
|
}
|
|
}
|
|
if sourceDC != "" {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get available disks for EC placement (include source node for EC)
|
|
availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
|
|
if len(availableDisks) < erasure_coding.MinTotalDisks {
|
|
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks))
|
|
}
|
|
|
|
// Select best disks for EC placement with rack/DC diversity
|
|
selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount, expectedShardSize)
|
|
if len(selectedDisks) < erasure_coding.MinTotalDisks {
|
|
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
|
|
}
|
|
|
|
var plans []*topology.DestinationPlan
|
|
rackCount := make(map[string]int)
|
|
dcCount := make(map[string]int)
|
|
|
|
for _, disk := range selectedDisks {
|
|
plan := &topology.DestinationPlan{
|
|
TargetNode: disk.NodeID,
|
|
TargetDisk: disk.DiskID,
|
|
TargetRack: disk.Rack,
|
|
TargetDC: disk.DataCenter,
|
|
ExpectedSize: expectedShardSize, // Use calculated shard size
|
|
PlacementScore: calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize),
|
|
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
|
|
}
|
|
plans = append(plans, plan)
|
|
|
|
// Count rack and DC diversity
|
|
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
rackCount[rackKey]++
|
|
dcCount[disk.DataCenter]++
|
|
}
|
|
|
|
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs",
|
|
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount))
|
|
|
|
return &topology.MultiDestinationPlan{
|
|
Plans: plans,
|
|
TotalShards: len(plans),
|
|
SuccessfulRack: len(rackCount),
|
|
SuccessfulDCs: len(dcCount),
|
|
}, nil
|
|
}
|
|
|
|
// createECTaskParams creates EC task parameters from the multi-destination plan
|
|
func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
|
|
var destinations []*worker_pb.ECDestination
|
|
|
|
for _, plan := range multiPlan.Plans {
|
|
destination := &worker_pb.ECDestination{
|
|
Node: plan.TargetNode,
|
|
DiskId: plan.TargetDisk,
|
|
Rack: plan.TargetRack,
|
|
DataCenter: plan.TargetDC,
|
|
PlacementScore: plan.PlacementScore,
|
|
}
|
|
destinations = append(destinations, destination)
|
|
}
|
|
|
|
// Collect placement conflicts from all destinations
|
|
var placementConflicts []string
|
|
for _, plan := range multiPlan.Plans {
|
|
placementConflicts = append(placementConflicts, plan.Conflicts...)
|
|
}
|
|
|
|
return &worker_pb.ErasureCodingTaskParams{
|
|
Destinations: destinations,
|
|
DataShards: erasure_coding.DataShardsCount, // Standard data shards
|
|
ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
|
|
PlacementConflicts: placementConflicts,
|
|
}
|
|
}
|
|
|
|
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
|
|
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int, expectedShardSize uint64) []*topology.DiskInfo {
|
|
if len(disks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Group disks by rack and DC for diversity
|
|
rackGroups := make(map[string][]*topology.DiskInfo)
|
|
for _, disk := range disks {
|
|
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
rackGroups[rackKey] = append(rackGroups[rackKey], disk)
|
|
}
|
|
|
|
var selected []*topology.DiskInfo
|
|
usedRacks := make(map[string]bool)
|
|
|
|
// First pass: select one disk from each rack for maximum diversity
|
|
for rackKey, rackDisks := range rackGroups {
|
|
if len(selected) >= shardsNeeded {
|
|
break
|
|
}
|
|
|
|
// Select best disk from this rack
|
|
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC, expectedShardSize)
|
|
if bestDisk != nil {
|
|
selected = append(selected, bestDisk)
|
|
usedRacks[rackKey] = true
|
|
}
|
|
}
|
|
|
|
// Second pass: if we need more disks, select from racks we've already used
|
|
if len(selected) < shardsNeeded {
|
|
for _, disk := range disks {
|
|
if len(selected) >= shardsNeeded {
|
|
break
|
|
}
|
|
|
|
// Skip if already selected
|
|
alreadySelected := false
|
|
for _, sel := range selected {
|
|
if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
|
|
alreadySelected = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !alreadySelected && isDiskSuitableForEC(disk) {
|
|
selected = append(selected, disk)
|
|
}
|
|
}
|
|
}
|
|
|
|
return selected
|
|
}
|
|
|
|
// selectBestFromRack selects the best disk from a rack for EC placement
|
|
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) *topology.DiskInfo {
|
|
if len(disks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var bestDisk *topology.DiskInfo
|
|
bestScore := -1.0
|
|
|
|
for _, disk := range disks {
|
|
if !isDiskSuitableForEC(disk) {
|
|
continue
|
|
}
|
|
|
|
score := calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize)
|
|
if score > bestScore {
|
|
bestScore = score
|
|
bestDisk = disk
|
|
}
|
|
}
|
|
|
|
return bestDisk
|
|
}
|
|
|
|
// calculateECScore calculates placement score for EC operations
|
|
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
|
|
if disk.DiskInfo == nil {
|
|
return 0.0
|
|
}
|
|
|
|
score := 0.0
|
|
|
|
// Prefer disks with available capacity
|
|
if disk.DiskInfo.MaxVolumeCount > 0 {
|
|
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
|
|
score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
|
|
}
|
|
|
|
// Prefer different racks for better distribution
|
|
if disk.Rack != sourceRack {
|
|
score += 30.0
|
|
}
|
|
|
|
// Prefer different data centers for better distribution
|
|
if disk.DataCenter != sourceDC {
|
|
score += 20.0
|
|
}
|
|
|
|
// Consider current load
|
|
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
|
|
|
|
return score
|
|
}
|
|
|
|
// calculateECScoreWithSize calculates placement score for EC operations with shard size consideration
|
|
func calculateECScoreWithSize(disk *topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) float64 {
|
|
baseScore := calculateECScore(disk, sourceRack, sourceDC)
|
|
|
|
// Additional scoring based on available space vs expected shard size
|
|
if disk.DiskInfo != nil && expectedShardSize > 0 {
|
|
// Estimate available space (this is a rough estimate)
|
|
availableSlots := disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount
|
|
if availableSlots > 0 {
|
|
// Bonus for having plenty of space for the expected shard
|
|
// This is a heuristic - each volume slot can theoretically hold any size
|
|
baseScore += float64(availableSlots) * 2.0 // Up to 2 points per available slot
|
|
}
|
|
}
|
|
|
|
return baseScore
|
|
}
|
|
|
|
// isDiskSuitableForEC checks if a disk is suitable for EC placement
|
|
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
|
|
if disk.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
// Check if disk has capacity
|
|
if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount {
|
|
return false
|
|
}
|
|
|
|
// Check if disk is not overloaded
|
|
if disk.LoadCount > 10 { // Arbitrary threshold
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// checkECPlacementConflicts checks for placement rule conflicts in EC operations
|
|
func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
|
|
var conflicts []string
|
|
|
|
// For EC, being on the same rack as source is often acceptable
|
|
// but we note it as potential conflict for monitoring
|
|
if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
|
|
conflicts = append(conflicts, "same_rack_as_source")
|
|
}
|
|
|
|
return conflicts
|
|
}
|
|
|
|
// findVolumeReplicas finds all servers that have replicas of the specified volume
|
|
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
|
|
if activeTopology == nil {
|
|
return []string{}
|
|
}
|
|
|
|
topologyInfo := activeTopology.GetTopologyInfo()
|
|
if topologyInfo == nil {
|
|
return []string{}
|
|
}
|
|
|
|
var replicaServers []string
|
|
|
|
// Iterate through all nodes to find volume replicas
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
for _, diskInfo := range nodeInfo.DiskInfos {
|
|
for _, volumeInfo := range diskInfo.VolumeInfos {
|
|
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
|
|
replicaServers = append(replicaServers, nodeInfo.Id)
|
|
break // Found volume on this node, move to next node
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return replicaServers
|
|
}
|
|
|
|
// calculateExpectedShardSize calculates the expected size of each EC shard based on the original volume size
|
|
func calculateExpectedShardSize(originalVolumeSize uint64) uint64 {
|
|
if originalVolumeSize == 0 {
|
|
return 0
|
|
}
|
|
|
|
// In erasure coding, the original data is split across data shards
|
|
// Each data shard gets approximately originalSize / dataShards
|
|
// Parity shards are similar in size to data shards
|
|
// Add some overhead for padding and metadata (typically ~5-10%)
|
|
baseShardSize := originalVolumeSize / uint64(erasure_coding.DataShardsCount)
|
|
overhead := baseShardSize / 10 // 10% overhead for padding and metadata
|
|
expectedShardSize := baseShardSize + overhead
|
|
|
|
glog.V(2).Infof("Calculated expected shard size: original=%d bytes, base_shard=%d bytes, with_overhead=%d bytes",
|
|
originalVolumeSize, baseShardSize, expectedShardSize)
|
|
|
|
return expectedShardSize
|
|
}
|