diff --git a/weed/worker/tasks/ec_repair/plan.go b/weed/worker/tasks/ec_repair/plan.go index 8e648d7fb..c59e44cd6 100644 --- a/weed/worker/tasks/ec_repair/plan.go +++ b/weed/worker/tasks/ec_repair/plan.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" @@ -101,8 +102,22 @@ func BuildRepairPlan( } states := collectShardStates(topoInfo, "") + keys := make([]VolumeKey, 0, len(states)) + for key := range states { + keys = append(keys, key) + } + sort.Slice(keys, func(i, j int) bool { + if keys[i].VolumeID != keys[j].VolumeID { + return keys[i].VolumeID < keys[j].VolumeID + } + if keys[i].Collection != keys[j].Collection { + return keys[i].Collection < keys[j].Collection + } + return keys[i].DiskType < keys[j].DiskType + }) + var state *volumeShardState - for key, candidate := range states { + for _, key := range keys { if key.VolumeID != volumeID { continue } @@ -112,7 +127,7 @@ func BuildRepairPlan( if diskType != "" && key.DiskType != diskType { continue } - state = candidate + state = states[key] break } if state == nil { @@ -437,6 +452,7 @@ func groupDeleteByNode(locations []ShardLocation, activeTopology *topology.Activ } } if nodeAddress == "" { + glog.Warningf("EC Repair plan: unable to resolve node address for shard %d on node %s diskType=%s diskID=%d", loc.ShardID, loc.NodeID, loc.DiskType, loc.DiskID) continue } if result[nodeAddress] == nil { diff --git a/weed/worker/tasks/ec_repair/plan_test.go b/weed/worker/tasks/ec_repair/plan_test.go index 508ddec88..f759f313a 100644 --- a/weed/worker/tasks/ec_repair/plan_test.go +++ b/weed/worker/tasks/ec_repair/plan_test.go @@ -11,6 +11,12 @@ import ( "github.com/stretchr/testify/require" ) +const ( + planTestVolumeID = uint32(999) + planTestCollection = "test-collection" + recentTaskWindowSize = 10 +) + type planTestNode struct { id string address string @@ -19,6 +25,16 @@ type planTestNode struct { shards map[uint32]int64 } +type multiVolumeSpec struct { + nodeID string + address string + diskType string + diskID uint32 + volumeID uint32 + collection string + shards map[uint32]int64 +} + func TestDetectReturnsExtraShardCandidates(t *testing.T) { nodes := []planTestNode{ { @@ -34,7 +50,7 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) { diskType: "hdd", diskID: 0, shards: map[uint32]int64{ - 0: 90, // mismatched size + 0: 90, // mismatched size 2: 100, 3: 100, 4: 100, @@ -44,7 +60,7 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) { }, } - topo := buildPlanTopology(nodes) + topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection) candidates, hasMore, err := Detect(topo, "", 0) require.NoError(t, err) require.False(t, hasMore) @@ -55,13 +71,85 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) { require.Greater(t, candidate.MismatchedShards, 0) } +func TestDetectHonorsMaxResults(t *testing.T) { + specs := []multiVolumeSpec{ + { + nodeID: "volume100-node1", + address: "127.0.0.1:9200", + diskType: "hdd", + diskID: 0, + volumeID: 100, + collection: planTestCollection, + shards: makeShardMap(0, 9, 100), + }, + { + nodeID: "volume100-node2", + address: "127.0.0.1:9201", + diskType: "hdd", + diskID: 1, + volumeID: 100, + collection: planTestCollection, + shards: map[uint32]int64{ + 0: 50, + }, + }, + { + nodeID: "volume101-node1", + address: "127.0.0.1:9300", + diskType: "hdd", + diskID: 0, + volumeID: 101, + collection: planTestCollection, + shards: makeShardMap(0, 9, 100), + }, + { + nodeID: "volume101-node2", + address: "127.0.0.1:9301", + diskType: "hdd", + diskID: 1, + volumeID: 101, + collection: planTestCollection, + shards: map[uint32]int64{ + 1: 90, + }, + }, + { + nodeID: "volume102-node1", + address: "127.0.0.1:9400", + diskType: "hdd", + diskID: 0, + volumeID: 102, + collection: planTestCollection, + shards: makeShardMap(0, 9, 100), + }, + { + nodeID: "volume102-node2", + address: "127.0.0.1:9401", + diskType: "hdd", + diskID: 1, + volumeID: 102, + collection: planTestCollection, + shards: map[uint32]int64{ + 2: 50, + }, + }, + } + + topo := buildMultiVolumeTopology(specs) + + candidates, hasMore, err := Detect(topo, "", 2) + require.NoError(t, err) + require.True(t, hasMore) + require.Len(t, candidates, 2) +} + func TestBuildRepairPlanRequiresEnoughShards(t *testing.T) { nodes := []planTestNode{ {id: "nodeA", address: "n1", diskType: "hdd", diskID: 0, shards: makeShardMap(0, 4, 100)}, } - topo := buildPlanTopology(nodes) + topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection) - _, err := BuildRepairPlan(topo, nil, 300, "", "hdd") + _, err := BuildRepairPlan(topo, nil, planTestVolumeID, planTestCollection, "hdd") require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf("need at least %d", erasure_coding.DataShardsCount)) } @@ -69,12 +157,12 @@ func TestBuildRepairPlanRequiresEnoughShards(t *testing.T) { func TestBuildRepairPlanIncludesTargetsAndDeletes(t *testing.T) { nodes := []planTestNode{ {id: "nodeA", address: "n1", diskType: "hdd", diskID: 0, shards: makeShardMap(0, 9, 100)}, - {id: "nodeB", address: "n2", diskType: "hdd", diskID: 0, shards: map[uint32]int64{11: 100}}, + {id: "nodeB", address: "n2", diskType: "hdd", diskID: 0, shards: map[uint32]int64{0: 50}}, } - topo := buildPlanTopology(nodes) + topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection) - activeTopo := buildActiveTopology(t, []string{"n3", "n4", "n5"}) - plan, err := BuildRepairPlan(topo, activeTopo, 500, "collection", "hdd") + activeTopo := buildActiveTopology(t, []string{"n3", "n4", "n5", "n6"}) + plan, err := BuildRepairPlan(topo, activeTopo, planTestVolumeID, planTestCollection, "hdd") require.NoError(t, err) require.NotEmpty(t, plan.MissingShards) require.Contains(t, plan.MissingShards, uint32(10)) @@ -90,7 +178,7 @@ func makeShardMap(from, to int, size int64) map[uint32]int64 { return shards } -func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo { +func buildPlanTopology(nodes []planTestNode, volumeID uint32, collection string) *master_pb.TopologyInfo { dataNodes := make([]*master_pb.DataNodeInfo, 0, len(nodes)) for _, node := range nodes { diskInfo := &master_pb.DiskInfo{ @@ -100,7 +188,7 @@ func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo { } if len(node.shards) > 0 { diskInfo.EcShardInfos = []*master_pb.VolumeEcShardInformationMessage{ - buildEcShardInfo(uint32(999), fmt.Sprintf("%s-coll", node.id), node.diskType, node.diskID, node.shards), + buildEcShardInfo(volumeID, collection, node.diskType, node.diskID, node.shards), } } dataNodes = append(dataNodes, &master_pb.DataNodeInfo{ @@ -124,6 +212,40 @@ func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo { } } +func buildMultiVolumeTopology(specs []multiVolumeSpec) *master_pb.TopologyInfo { + dataNodes := make([]*master_pb.DataNodeInfo, 0, len(specs)) + for _, spec := range specs { + diskInfo := &master_pb.DiskInfo{ + DiskId: spec.diskID, + MaxVolumeCount: 100, + VolumeCount: 10, + } + if len(spec.shards) > 0 { + diskInfo.EcShardInfos = []*master_pb.VolumeEcShardInformationMessage{ + buildEcShardInfo(spec.volumeID, spec.collection, spec.diskType, spec.diskID, spec.shards), + } + } + dataNodes = append(dataNodes, &master_pb.DataNodeInfo{ + Id: spec.nodeID, + Address: spec.address, + DiskInfos: map[string]*master_pb.DiskInfo{spec.diskType: diskInfo}, + }) + } + return &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: dataNodes, + }, + }, + }, + }, + } +} + func buildActiveTopology(t *testing.T, nodeIDs []string) *topology.ActiveTopology { t.Helper() info := &master_pb.TopologyInfo{