Browse Source

Resolve replica placement for EC volumes from master server defaults.

A stopgap solution to balance EC shards based on system topology,
as Seaweed currently has no support for shard placement strategy
options.
pull/6303/head
Lisandro Pin 2 months ago
parent
commit
29a83aa9d2
  1. 26
      weed/shell/command_ec_common.go
  2. 25
      weed/shell/command_ec_common_test.go

26
weed/shell/command_ec_common.go

@ -39,6 +39,11 @@ type EcRack struct {
freeEcSlot int freeEcSlot int
} }
var (
// Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
@ -840,15 +845,24 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
return vidLocations return vidLocations
} }
// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default?
func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
// TODO: EC volumes have no replica placement info :( We need a better solution to resolve topology, and balancing, for those.
func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
defaultReplicaPlacement, err := getDefaultReplicaPlacement(commandEnv)
if err != nil {
return nil, err
}
for _, ecNode := range nodes { for _, ecNode := range nodes {
for _, diskInfo := range ecNode.info.DiskInfos { for _, diskInfo := range ecNode.info.DiskInfos {
for _, volumeInfo := range diskInfo.VolumeInfos { for _, volumeInfo := range diskInfo.VolumeInfos {
if needle.VolumeId(volumeInfo.Id) != vid {
continue
if needle.VolumeId(volumeInfo.Id) == vid {
return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
}
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(ecShardInfo.Id) == vid {
return defaultReplicaPlacement, nil
} }
return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
} }
} }
} }
@ -856,7 +870,7 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl
return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
} }
func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse var resp *master_pb.GetMasterConfigurationResponse
var err error var err error

25
weed/shell/command_ec_common_test.go

@ -55,26 +55,41 @@ func TestEcDistribution(t *testing.T) {
} }
func TestVolumeIdToReplicaPlacement(t *testing.T) { func TestVolumeIdToReplicaPlacement(t *testing.T) {
getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement
getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
return super_block.NewReplicaPlacementFromString("123")
}
defer func() {
getDefaultReplicaPlacement = getDefaultReplicaPlacementOrig
}()
testCases := []struct { testCases := []struct {
topology *master_pb.TopologyInfo topology *master_pb.TopologyInfo
vid string vid string
want string want string
wantErr string wantErr string
}{ }{
{topology1, "", "", "failed to resolve replica placement for volume ID 0"},
{topology1, "0", "", "failed to resolve replica placement for volume ID 0"},
{topology1, "", "", "failed to resolve replica placement"},
{topology1, "0", "", "failed to resolve replica placement"},
{topology1, "1", "100", ""}, {topology1, "1", "100", ""},
{topology1, "296", "100", ""}, {topology1, "296", "100", ""},
{topology2, "", "", "failed to resolve replica placement for volume ID 0"},
{topology2, "19012", "", "failed to resolve replica placement for volume ID 19012"},
{topology2, "", "", "failed to resolve replica placement"},
{topology2, "19012", "", "failed to resolve replica placement"},
{topology2, "6271", "002", ""}, {topology2, "6271", "002", ""},
{topology2, "17932", "002", ""}, {topology2, "17932", "002", ""},
{topologyEc, "", "", "failed to resolve replica placement"},
{topologyEc, "0", "", "failed to resolve replica placement"},
{topologyEc, "6225", "002", ""},
{topologyEc, "6241", "002", ""},
{topologyEc, "9577", "123", ""}, // EC volume
{topologyEc, "12737", "123", ""}, // EC volume
} }
for _, tc := range testCases { for _, tc := range testCases {
commandEnv := &CommandEnv{}
vid, _ := needle.NewVolumeId(tc.vid) vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "") ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes)
got, gotErr := volumeIdToReplicaPlacement(commandEnv, vid, ecNodes)
if err := errorCheck(gotErr, tc.wantErr); err != nil { if err := errorCheck(gotErr, tc.wantErr); err != nil {
t.Errorf("volume %q: %s", tc.vid, err.Error()) t.Errorf("volume %q: %s", tc.vid, err.Error())

Loading…
Cancel
Save