You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
556 lines
20 KiB
556 lines
20 KiB
package erasure_coding
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
)
|
|
|
|
// Detection implements the detection logic for erasure coding tasks
|
|
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
|
|
if !config.IsEnabled() {
|
|
return nil, nil
|
|
}
|
|
|
|
ecConfig := config.(*Config)
|
|
var results []*types.TaskDetectionResult
|
|
now := time.Now()
|
|
quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
|
|
minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
|
|
|
|
debugCount := 0
|
|
skippedAlreadyEC := 0
|
|
skippedTooSmall := 0
|
|
skippedCollectionFilter := 0
|
|
skippedQuietTime := 0
|
|
skippedFullness := 0
|
|
|
|
for _, metric := range metrics {
|
|
// Skip if already EC volume
|
|
if metric.IsECVolume {
|
|
skippedAlreadyEC++
|
|
continue
|
|
}
|
|
|
|
// Check minimum size requirement
|
|
if metric.Size < minSizeBytes {
|
|
skippedTooSmall++
|
|
continue
|
|
}
|
|
|
|
// Check collection filter if specified
|
|
if ecConfig.CollectionFilter != "" {
|
|
// Parse comma-separated collections
|
|
allowedCollections := make(map[string]bool)
|
|
for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
|
|
allowedCollections[strings.TrimSpace(collection)] = true
|
|
}
|
|
// Skip if volume's collection is not in the allowed list
|
|
if !allowedCollections[metric.Collection] {
|
|
skippedCollectionFilter++
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Check quiet duration and fullness criteria
|
|
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
|
|
// Generate task ID for ActiveTopology integration
|
|
taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
|
|
|
|
result := &types.TaskDetectionResult{
|
|
TaskID: taskID, // Link to ActiveTopology pending task
|
|
TaskType: types.TaskTypeErasureCoding,
|
|
VolumeID: metric.VolumeID,
|
|
Server: metric.Server,
|
|
Collection: metric.Collection,
|
|
Priority: types.TaskPriorityLow, // EC is not urgent
|
|
Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
|
|
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
|
|
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
|
|
ScheduleAt: now,
|
|
}
|
|
|
|
// Plan EC destinations if ActiveTopology is available
|
|
if clusterInfo.ActiveTopology != nil {
|
|
multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
|
|
if err != nil {
|
|
glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
|
|
continue // Skip this volume if destination planning fails
|
|
}
|
|
|
|
// Calculate expected shard size for EC operation
|
|
// Each data shard will be approximately volumeSize / dataShards
|
|
expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
|
|
|
|
// Add pending EC shard task to ActiveTopology for capacity management
|
|
|
|
// Extract shard destinations from multiPlan
|
|
var shardDestinations []string
|
|
var shardDiskIDs []uint32
|
|
for _, plan := range multiPlan.Plans {
|
|
shardDestinations = append(shardDestinations, plan.TargetNode)
|
|
shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
|
|
}
|
|
|
|
// Find all volume replica locations (server + disk) from topology
|
|
replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
|
|
if len(replicaLocations) == 0 {
|
|
glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
|
|
continue
|
|
}
|
|
|
|
// Find existing EC shards from previous failed attempts
|
|
existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
|
|
|
|
// Combine volume replicas and existing EC shards for cleanup
|
|
var allSourceLocations []topology.TaskSourceLocation
|
|
|
|
// Add volume replicas (will free volume slots)
|
|
for _, replica := range replicaLocations {
|
|
allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
|
|
ServerID: replica.ServerID,
|
|
DiskID: replica.DiskID,
|
|
CleanupType: topology.CleanupVolumeReplica,
|
|
})
|
|
}
|
|
|
|
// Add existing EC shards (will free shard slots)
|
|
duplicateCheck := make(map[string]bool)
|
|
for _, replica := range replicaLocations {
|
|
key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
|
|
duplicateCheck[key] = true
|
|
}
|
|
|
|
for _, shard := range existingECShards {
|
|
key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
|
|
if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
|
|
allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
|
|
ServerID: shard.ServerID,
|
|
DiskID: shard.DiskID,
|
|
CleanupType: topology.CleanupECShards,
|
|
})
|
|
duplicateCheck[key] = true
|
|
}
|
|
}
|
|
|
|
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
|
|
len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
|
|
|
|
// Convert TaskSourceLocation to TaskSourceSpec
|
|
sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
|
|
for i, srcLoc := range allSourceLocations {
|
|
sources[i] = topology.TaskSourceSpec{
|
|
ServerID: srcLoc.ServerID,
|
|
DiskID: srcLoc.DiskID,
|
|
CleanupType: srcLoc.CleanupType,
|
|
}
|
|
}
|
|
|
|
// Convert shard destinations to TaskDestinationSpec
|
|
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
|
|
shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
|
|
shardSize := int64(expectedShardSize)
|
|
for i, dest := range shardDestinations {
|
|
destinations[i] = topology.TaskDestinationSpec{
|
|
ServerID: dest,
|
|
DiskID: shardDiskIDs[i],
|
|
StorageImpact: &shardImpact,
|
|
EstimatedSize: &shardSize,
|
|
}
|
|
}
|
|
|
|
err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
|
|
TaskID: taskID,
|
|
TaskType: topology.TaskTypeErasureCoding,
|
|
VolumeID: metric.VolumeID,
|
|
VolumeSize: int64(metric.Size),
|
|
Sources: sources,
|
|
Destinations: destinations,
|
|
})
|
|
if err != nil {
|
|
glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
|
|
continue // Skip this volume if topology task addition fails
|
|
}
|
|
|
|
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
|
|
taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
|
|
|
|
// Find all volume replicas from topology (for legacy worker compatibility)
|
|
var replicas []string
|
|
serverSet := make(map[string]struct{})
|
|
for _, loc := range replicaLocations {
|
|
if _, found := serverSet[loc.ServerID]; !found {
|
|
replicas = append(replicas, loc.ServerID)
|
|
serverSet[loc.ServerID] = struct{}{}
|
|
}
|
|
}
|
|
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
|
|
|
|
// Create typed parameters with EC destination information and replicas
|
|
result.TypedParams = &worker_pb.TaskParams{
|
|
TaskId: taskID, // Link to ActiveTopology pending task
|
|
VolumeId: metric.VolumeID,
|
|
Server: metric.Server,
|
|
Collection: metric.Collection,
|
|
VolumeSize: metric.Size, // Store original volume size for tracking changes
|
|
Replicas: replicas, // Include all volume replicas for deletion
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: createECTaskParams(multiPlan),
|
|
},
|
|
}
|
|
|
|
glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
|
|
metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
|
|
} else {
|
|
glog.Warningf("No ActiveTopology available for destination planning in EC detection")
|
|
continue // Skip this volume if no topology available
|
|
}
|
|
|
|
results = append(results, result)
|
|
} else {
|
|
// Count debug reasons
|
|
if debugCount < 5 { // Limit to avoid spam
|
|
if metric.Age < quietThreshold {
|
|
skippedQuietTime++
|
|
}
|
|
if metric.FullnessRatio < ecConfig.FullnessRatio {
|
|
skippedFullness++
|
|
}
|
|
}
|
|
debugCount++
|
|
}
|
|
}
|
|
|
|
// Log debug summary if no tasks were created
|
|
if len(results) == 0 && len(metrics) > 0 {
|
|
totalVolumes := len(metrics)
|
|
glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
|
|
totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
|
|
|
|
// Show details for first few volumes
|
|
for i, metric := range metrics {
|
|
if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
|
|
continue
|
|
}
|
|
sizeMB := float64(metric.Size) / (1024 * 1024)
|
|
glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
|
|
metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
|
|
metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// planECDestinations plans the destinations for erasure coding operation
|
|
// This function implements EC destination planning logic directly in the detection phase
|
|
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
|
|
// Calculate expected shard size for EC operation
|
|
expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
|
|
|
|
// Get source node information from topology
|
|
var sourceRack, sourceDC string
|
|
|
|
// Extract rack and DC from topology info
|
|
topologyInfo := activeTopology.GetTopologyInfo()
|
|
if topologyInfo != nil {
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, dataNodeInfo := range rack.DataNodeInfos {
|
|
if dataNodeInfo.Id == metric.Server {
|
|
sourceDC = dc.Id
|
|
sourceRack = rack.Id
|
|
break
|
|
}
|
|
}
|
|
if sourceRack != "" {
|
|
break
|
|
}
|
|
}
|
|
if sourceDC != "" {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get available disks for EC placement with effective capacity consideration (includes pending tasks)
|
|
// For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
|
|
// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
|
|
availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
|
|
if len(availableDisks) < erasure_coding.MinTotalDisks {
|
|
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
|
|
}
|
|
|
|
// Select best disks for EC placement with rack/DC diversity
|
|
selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
|
|
if len(selectedDisks) < erasure_coding.MinTotalDisks {
|
|
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
|
|
}
|
|
|
|
var plans []*topology.DestinationPlan
|
|
rackCount := make(map[string]int)
|
|
dcCount := make(map[string]int)
|
|
|
|
for _, disk := range selectedDisks {
|
|
plan := &topology.DestinationPlan{
|
|
TargetNode: disk.NodeID,
|
|
TargetDisk: disk.DiskID,
|
|
TargetRack: disk.Rack,
|
|
TargetDC: disk.DataCenter,
|
|
ExpectedSize: expectedShardSize, // Set calculated EC shard size
|
|
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
|
|
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
|
|
}
|
|
plans = append(plans, plan)
|
|
|
|
// Count rack and DC diversity
|
|
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
rackCount[rackKey]++
|
|
dcCount[disk.DataCenter]++
|
|
}
|
|
|
|
// Log capacity utilization information using ActiveTopology's encapsulated logic
|
|
totalEffectiveCapacity := int64(0)
|
|
for _, plan := range plans {
|
|
effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
|
|
totalEffectiveCapacity += effectiveCapacity
|
|
}
|
|
|
|
glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
|
|
metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
|
|
|
|
// Log storage impact for EC task (source only - EC has multiple targets handled individually)
|
|
sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
|
|
glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
|
|
sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
|
|
glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
|
|
|
|
return &topology.MultiDestinationPlan{
|
|
Plans: plans,
|
|
TotalShards: len(plans),
|
|
SuccessfulRack: len(rackCount),
|
|
SuccessfulDCs: len(dcCount),
|
|
}, nil
|
|
}
|
|
|
|
// createECTaskParams creates EC task parameters from the multi-destination plan
|
|
func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
|
|
var destinations []*worker_pb.ECDestination
|
|
|
|
for _, plan := range multiPlan.Plans {
|
|
destination := &worker_pb.ECDestination{
|
|
Node: plan.TargetNode,
|
|
DiskId: plan.TargetDisk,
|
|
Rack: plan.TargetRack,
|
|
DataCenter: plan.TargetDC,
|
|
PlacementScore: plan.PlacementScore,
|
|
}
|
|
destinations = append(destinations, destination)
|
|
}
|
|
|
|
// Collect placement conflicts from all destinations
|
|
var placementConflicts []string
|
|
for _, plan := range multiPlan.Plans {
|
|
placementConflicts = append(placementConflicts, plan.Conflicts...)
|
|
}
|
|
|
|
return &worker_pb.ErasureCodingTaskParams{
|
|
Destinations: destinations,
|
|
DataShards: erasure_coding.DataShardsCount, // Standard data shards
|
|
ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
|
|
PlacementConflicts: placementConflicts,
|
|
}
|
|
}
|
|
|
|
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
|
|
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
|
|
if len(disks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Group disks by rack and DC for diversity
|
|
rackGroups := make(map[string][]*topology.DiskInfo)
|
|
for _, disk := range disks {
|
|
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
rackGroups[rackKey] = append(rackGroups[rackKey], disk)
|
|
}
|
|
|
|
var selected []*topology.DiskInfo
|
|
usedRacks := make(map[string]bool)
|
|
|
|
// First pass: select one disk from each rack for maximum diversity
|
|
for rackKey, rackDisks := range rackGroups {
|
|
if len(selected) >= shardsNeeded {
|
|
break
|
|
}
|
|
|
|
// Select best disk from this rack
|
|
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
|
|
if bestDisk != nil {
|
|
selected = append(selected, bestDisk)
|
|
usedRacks[rackKey] = true
|
|
}
|
|
}
|
|
|
|
// Second pass: if we need more disks, select from racks we've already used
|
|
if len(selected) < shardsNeeded {
|
|
for _, disk := range disks {
|
|
if len(selected) >= shardsNeeded {
|
|
break
|
|
}
|
|
|
|
// Skip if already selected
|
|
alreadySelected := false
|
|
for _, sel := range selected {
|
|
if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
|
|
alreadySelected = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !alreadySelected && isDiskSuitableForEC(disk) {
|
|
selected = append(selected, disk)
|
|
}
|
|
}
|
|
}
|
|
|
|
return selected
|
|
}
|
|
|
|
// selectBestFromRack selects the best disk from a rack for EC placement
|
|
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
|
|
if len(disks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var bestDisk *topology.DiskInfo
|
|
bestScore := -1.0
|
|
|
|
for _, disk := range disks {
|
|
if !isDiskSuitableForEC(disk) {
|
|
continue
|
|
}
|
|
|
|
score := calculateECScore(disk, sourceRack, sourceDC)
|
|
if score > bestScore {
|
|
bestScore = score
|
|
bestDisk = disk
|
|
}
|
|
}
|
|
|
|
return bestDisk
|
|
}
|
|
|
|
// calculateECScore calculates placement score for EC operations
|
|
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
|
|
if disk.DiskInfo == nil {
|
|
return 0.0
|
|
}
|
|
|
|
score := 0.0
|
|
|
|
// Prefer disks with available capacity
|
|
if disk.DiskInfo.MaxVolumeCount > 0 {
|
|
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
|
|
score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
|
|
}
|
|
|
|
// Prefer different racks for better distribution
|
|
if disk.Rack != sourceRack {
|
|
score += 30.0
|
|
}
|
|
|
|
// Prefer different data centers for better distribution
|
|
if disk.DataCenter != sourceDC {
|
|
score += 20.0
|
|
}
|
|
|
|
// Consider current load
|
|
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
|
|
|
|
return score
|
|
}
|
|
|
|
// isDiskSuitableForEC checks if a disk is suitable for EC placement
|
|
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
|
|
if disk.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
// Check if disk is not overloaded with tasks
|
|
if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// checkECPlacementConflicts checks for placement rule conflicts in EC operations
|
|
func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
|
|
var conflicts []string
|
|
|
|
// For EC, being on the same rack as source is often acceptable
|
|
// but we note it as potential conflict for monitoring
|
|
if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
|
|
conflicts = append(conflicts, "same_rack_as_source")
|
|
}
|
|
|
|
return conflicts
|
|
}
|
|
|
|
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
|
|
// Uses O(1) indexed lookup for optimal performance on large clusters.
|
|
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
|
|
if activeTopology == nil {
|
|
return nil
|
|
}
|
|
return activeTopology.GetVolumeLocations(volumeID, collection)
|
|
}
|
|
|
|
// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
|
|
// Uses O(1) indexed lookup for optimal performance on large clusters.
|
|
func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
|
|
if activeTopology == nil {
|
|
return nil
|
|
}
|
|
return activeTopology.GetECShardLocations(volumeID, collection)
|
|
}
|
|
|
|
// findVolumeReplicas finds all servers that have replicas of the specified volume
|
|
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
|
|
if activeTopology == nil {
|
|
return []string{}
|
|
}
|
|
|
|
topologyInfo := activeTopology.GetTopologyInfo()
|
|
if topologyInfo == nil {
|
|
return []string{}
|
|
}
|
|
|
|
var replicaServers []string
|
|
|
|
// Iterate through all nodes to find volume replicas
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
for _, diskInfo := range nodeInfo.DiskInfos {
|
|
for _, volumeInfo := range diskInfo.VolumeInfos {
|
|
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
|
|
replicaServers = append(replicaServers, nodeInfo.Id)
|
|
break // Found volume on this node, move to next node
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return replicaServers
|
|
}
|