Browse Source

address comments

codex/ec-repair-worker
Chris Lu 2 weeks ago
parent
commit
5400fdca3a
  1. 20
      weed/worker/tasks/ec_repair/plan.go
  2. 142
      weed/worker/tasks/ec_repair/plan_test.go

20
weed/worker/tasks/ec_repair/plan.go

@ -6,6 +6,7 @@ import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
@ -101,8 +102,22 @@ func BuildRepairPlan(
}
states := collectShardStates(topoInfo, "")
keys := make([]VolumeKey, 0, len(states))
for key := range states {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
if keys[i].VolumeID != keys[j].VolumeID {
return keys[i].VolumeID < keys[j].VolumeID
}
if keys[i].Collection != keys[j].Collection {
return keys[i].Collection < keys[j].Collection
}
return keys[i].DiskType < keys[j].DiskType
})
var state *volumeShardState
for key, candidate := range states {
for _, key := range keys {
if key.VolumeID != volumeID {
continue
}
@ -112,7 +127,7 @@ func BuildRepairPlan(
if diskType != "" && key.DiskType != diskType {
continue
}
state = candidate
state = states[key]
break
}
if state == nil {
@ -437,6 +452,7 @@ func groupDeleteByNode(locations []ShardLocation, activeTopology *topology.Activ
}
}
if nodeAddress == "" {
glog.Warningf("EC Repair plan: unable to resolve node address for shard %d on node %s diskType=%s diskID=%d", loc.ShardID, loc.NodeID, loc.DiskType, loc.DiskID)
continue
}
if result[nodeAddress] == nil {

142
weed/worker/tasks/ec_repair/plan_test.go

@ -11,6 +11,12 @@ import (
"github.com/stretchr/testify/require"
)
const (
planTestVolumeID = uint32(999)
planTestCollection = "test-collection"
recentTaskWindowSize = 10
)
type planTestNode struct {
id string
address string
@ -19,6 +25,16 @@ type planTestNode struct {
shards map[uint32]int64
}
type multiVolumeSpec struct {
nodeID string
address string
diskType string
diskID uint32
volumeID uint32
collection string
shards map[uint32]int64
}
func TestDetectReturnsExtraShardCandidates(t *testing.T) {
nodes := []planTestNode{
{
@ -34,7 +50,7 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) {
diskType: "hdd",
diskID: 0,
shards: map[uint32]int64{
0: 90, // mismatched size
0: 90, // mismatched size
2: 100,
3: 100,
4: 100,
@ -44,7 +60,7 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) {
},
}
topo := buildPlanTopology(nodes)
topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection)
candidates, hasMore, err := Detect(topo, "", 0)
require.NoError(t, err)
require.False(t, hasMore)
@ -55,13 +71,85 @@ func TestDetectReturnsExtraShardCandidates(t *testing.T) {
require.Greater(t, candidate.MismatchedShards, 0)
}
func TestDetectHonorsMaxResults(t *testing.T) {
specs := []multiVolumeSpec{
{
nodeID: "volume100-node1",
address: "127.0.0.1:9200",
diskType: "hdd",
diskID: 0,
volumeID: 100,
collection: planTestCollection,
shards: makeShardMap(0, 9, 100),
},
{
nodeID: "volume100-node2",
address: "127.0.0.1:9201",
diskType: "hdd",
diskID: 1,
volumeID: 100,
collection: planTestCollection,
shards: map[uint32]int64{
0: 50,
},
},
{
nodeID: "volume101-node1",
address: "127.0.0.1:9300",
diskType: "hdd",
diskID: 0,
volumeID: 101,
collection: planTestCollection,
shards: makeShardMap(0, 9, 100),
},
{
nodeID: "volume101-node2",
address: "127.0.0.1:9301",
diskType: "hdd",
diskID: 1,
volumeID: 101,
collection: planTestCollection,
shards: map[uint32]int64{
1: 90,
},
},
{
nodeID: "volume102-node1",
address: "127.0.0.1:9400",
diskType: "hdd",
diskID: 0,
volumeID: 102,
collection: planTestCollection,
shards: makeShardMap(0, 9, 100),
},
{
nodeID: "volume102-node2",
address: "127.0.0.1:9401",
diskType: "hdd",
diskID: 1,
volumeID: 102,
collection: planTestCollection,
shards: map[uint32]int64{
2: 50,
},
},
}
topo := buildMultiVolumeTopology(specs)
candidates, hasMore, err := Detect(topo, "", 2)
require.NoError(t, err)
require.True(t, hasMore)
require.Len(t, candidates, 2)
}
func TestBuildRepairPlanRequiresEnoughShards(t *testing.T) {
nodes := []planTestNode{
{id: "nodeA", address: "n1", diskType: "hdd", diskID: 0, shards: makeShardMap(0, 4, 100)},
}
topo := buildPlanTopology(nodes)
topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection)
_, err := BuildRepairPlan(topo, nil, 300, "", "hdd")
_, err := BuildRepairPlan(topo, nil, planTestVolumeID, planTestCollection, "hdd")
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("need at least %d", erasure_coding.DataShardsCount))
}
@ -69,12 +157,12 @@ func TestBuildRepairPlanRequiresEnoughShards(t *testing.T) {
func TestBuildRepairPlanIncludesTargetsAndDeletes(t *testing.T) {
nodes := []planTestNode{
{id: "nodeA", address: "n1", diskType: "hdd", diskID: 0, shards: makeShardMap(0, 9, 100)},
{id: "nodeB", address: "n2", diskType: "hdd", diskID: 0, shards: map[uint32]int64{11: 100}},
{id: "nodeB", address: "n2", diskType: "hdd", diskID: 0, shards: map[uint32]int64{0: 50}},
}
topo := buildPlanTopology(nodes)
topo := buildPlanTopology(nodes, planTestVolumeID, planTestCollection)
activeTopo := buildActiveTopology(t, []string{"n3", "n4", "n5"})
plan, err := BuildRepairPlan(topo, activeTopo, 500, "collection", "hdd")
activeTopo := buildActiveTopology(t, []string{"n3", "n4", "n5", "n6"})
plan, err := BuildRepairPlan(topo, activeTopo, planTestVolumeID, planTestCollection, "hdd")
require.NoError(t, err)
require.NotEmpty(t, plan.MissingShards)
require.Contains(t, plan.MissingShards, uint32(10))
@ -90,7 +178,7 @@ func makeShardMap(from, to int, size int64) map[uint32]int64 {
return shards
}
func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo {
func buildPlanTopology(nodes []planTestNode, volumeID uint32, collection string) *master_pb.TopologyInfo {
dataNodes := make([]*master_pb.DataNodeInfo, 0, len(nodes))
for _, node := range nodes {
diskInfo := &master_pb.DiskInfo{
@ -100,7 +188,7 @@ func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo {
}
if len(node.shards) > 0 {
diskInfo.EcShardInfos = []*master_pb.VolumeEcShardInformationMessage{
buildEcShardInfo(uint32(999), fmt.Sprintf("%s-coll", node.id), node.diskType, node.diskID, node.shards),
buildEcShardInfo(volumeID, collection, node.diskType, node.diskID, node.shards),
}
}
dataNodes = append(dataNodes, &master_pb.DataNodeInfo{
@ -124,6 +212,40 @@ func buildPlanTopology(nodes []planTestNode) *master_pb.TopologyInfo {
}
}
func buildMultiVolumeTopology(specs []multiVolumeSpec) *master_pb.TopologyInfo {
dataNodes := make([]*master_pb.DataNodeInfo, 0, len(specs))
for _, spec := range specs {
diskInfo := &master_pb.DiskInfo{
DiskId: spec.diskID,
MaxVolumeCount: 100,
VolumeCount: 10,
}
if len(spec.shards) > 0 {
diskInfo.EcShardInfos = []*master_pb.VolumeEcShardInformationMessage{
buildEcShardInfo(spec.volumeID, spec.collection, spec.diskType, spec.diskID, spec.shards),
}
}
dataNodes = append(dataNodes, &master_pb.DataNodeInfo{
Id: spec.nodeID,
Address: spec.address,
DiskInfos: map[string]*master_pb.DiskInfo{spec.diskType: diskInfo},
})
}
return &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: dataNodes,
},
},
},
},
}
}
func buildActiveTopology(t *testing.T, nodeIDs []string) *topology.ActiveTopology {
t.Helper()
info := &master_pb.TopologyInfo{

Loading…
Cancel
Save