Browse Source

volume.fix.replication: fix misplaced volumes

fix https://github.com/chrislusf/seaweedfs/issues/2416
pull/2500/head
chrislu 3 years ago
parent
commit
e6c026db65
  1. 60
      weed/shell/command_volume_fix_replication.go
  2. 138
      weed/shell/command_volume_fix_replication_test.go

60
weed/shell/command_volume_fix_replication.go

@ -89,7 +89,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
} }
// find all under replicated volumes // find all under replicated volumes
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
for vid, replicas := range volumeReplicas { for vid, replicas := range volumeReplicas {
replica := replicas[0] replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
@ -98,11 +98,20 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
} else if replicaPlacement.GetCopyCount() < len(replicas) { } else if replicaPlacement.GetCopyCount() < len(replicas) {
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
} else if isMisplaced(replicas, replicaPlacement) {
misplacedVolumeIds = append(misplacedVolumeIds, vid)
fmt.Fprintf(writer, "volume %d replication %s is not well placed %+v\n", replica.info.Id, replicaPlacement, replicas)
} }
} }
if len(overReplicatedVolumeIds) > 0 { if len(overReplicatedVolumeIds) > 0 {
if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil {
if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
return err
}
}
if len(misplacedVolumeIds) > 0 {
if err := c.deleteOneVolume(commandEnv, writer, takeAction, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil {
return err return err
} }
} }
@ -171,12 +180,14 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui
return volumeReplicas, allLocations return volumeReplicas, allLocations
} }
func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
for _, vid := range overReplicatedVolumeIds { for _, vid := range overReplicatedVolumeIds {
replicas := volumeReplicas[vid] replicas := volumeReplicas[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
replica := pickOneReplicaToDelete(replicas, replicaPlacement)
replica := selectOneVolumeFn(replicas, replicaPlacement)
// check collection name pattern // check collection name pattern
if *c.collectionPattern != "" { if *c.collectionPattern != "" {
@ -495,3 +506,44 @@ func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_b
return replicas[0] return replicas[0]
} }
// check and fix misplaced volumes
func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
for i := 0; i < len(replicas); i++ {
others := otherThan(replicas, i)
if satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
return false
}
}
return true
}
func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
for i := 0; i < len(replicas); i++ {
if index != i {
others = append(others, replicas[i])
}
}
return
}
func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
var deletionCandidates []*VolumeReplica
for i := 0; i < len(replicas); i++ {
others := otherThan(replicas, i)
if !isMisplaced(others, replicaPlacement) {
deletionCandidates = append(deletionCandidates, replicas[i])
}
}
if len(deletionCandidates) > 0 {
return pickOneReplicaToDelete(deletionCandidates, replicaPlacement)
}
return pickOneReplicaToDelete(replicas, replicaPlacement)
}

138
weed/shell/command_volume_fix_replication_test.go

@ -294,3 +294,141 @@ func runTests(tests []testcase, t *testing.T) {
} }
} }
} }
func TestMisplacedChecking(t *testing.T) {
var tests = []testcase{
{
name: "test 001",
replication: "001",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: true,
},
{
name: "test 010",
replication: "010",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: false,
},
{
name: "test 011",
replication: "011",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: false,
},
{
name: "test 110",
replication: "110",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: false,
},
{
name: "test 100",
replication: "100",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: true,
},
}
for _, tt := range tests {
replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
if isMisplaced(tt.replicas, replicaPlacement) != tt.expected {
t.Errorf("%s: expect %v %v %+v",
tt.name, tt.expected, tt.replication, tt.replicas)
}
}
}
func TestPickingMisplacedVolumeToDelete(t *testing.T) {
var tests = []testcase{
{
name: "test 001",
replication: "001",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
info: &master_pb.VolumeInformationMessage{
Size: 100,
},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
info: &master_pb.VolumeInformationMessage{
Size: 99,
},
},
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
{
name: "test 100",
replication: "100",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
info: &master_pb.VolumeInformationMessage{
Size: 100,
},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
info: &master_pb.VolumeInformationMessage{
Size: 99,
},
},
},
possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
}
for _, tt := range tests {
replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
println("replication:", tt.replication, "name:", tt.name)
if x := pickOneMisplacedVolume(tt.replicas, replicaPlacement); x.location.dataNode.Id != tt.possibleLocation.dataNode.Id {
t.Errorf("%s: picked %+v for replication %v",
tt.name, x.location.dataNode.Id, tt.replication)
} else {
t.Logf("%s: picked %+v %v",
tt.name, x.location.dataNode.Id, tt.replication)
}
}
}
Loading…
Cancel
Save