Browse Source

ec: update helper functions to use configurable diskType

Update the following functions to accept/use diskType parameter:
- findEcVolumeShards()
- addEcVolumeShards()
- deleteEcVolumeShards()
- moveMountedShardToEcNode()
- countShardsByRack()
- pickNEcShardsToMoveFrom()

All ecBalancer methods now use ecb.diskType instead of hardcoded
types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate
and ec.rebuild) use types.HardDriveType as the default.

Update all test files to pass diskType where needed.
ec-disk-type-support
chrislusf 3 days ago
parent
commit
90b134f830
  1. 56
      weed/shell/command_ec_common.go
  2. 3
      weed/shell/command_ec_common_test.go
  3. 2
      weed/shell/command_ec_rebuild.go
  4. 8
      weed/shell/command_ec_test.go
  5. 2
      weed/shell/command_volume_server_evacuate.go

56
weed/shell/command_ec_common.go

@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
}
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
return nil
@ -551,9 +551,9 @@ func ceilDivide(a, b int) int {
return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
return 0
}
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode {
foundVolume := false
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
}
} else {
diskInfo = &master_pb.DiskInfo{
Type: string(types.HardDriveType),
Type: string(diskType),
}
ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
ecNode.info.DiskInfos[string(diskType)] = diskInfo
}
if !foundVolume {
@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
DiskType: string(types.HardDriveType),
DiskType: string(diskType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
return ecNode
}
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
@ -706,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
@ -729,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType)
}
}
return nil
@ -749,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
return ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
return string(ecNode.rack), shardBits.ShardIdCount()
})
}
@ -760,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
racks := ecb.racks()
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
@ -780,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
continue
}
possibleEcNodes := rackEcNodesWithVid[rackId]
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) {
ecShardsToMove[shardId] = ecNode
}
}
@ -857,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@ -866,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
if _, found := n.info.DiskInfos[string(ecb.diskType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
@ -883,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
@ -974,7 +974,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
@ -1004,7 +1004,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount()
}
targets := []*EcNode{}
@ -1135,14 +1135,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co
} else {
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
}
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode {
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid)
shardBits := findEcVolumeShards(ecNode, vid, diskType)
if shardBits.ShardIdCount() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
@ -1156,13 +1156,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType)
if shardBits > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType)
break
}
break

3
weed/shell/command_ec_common_test.go

@ -155,10 +155,11 @@ func TestPickRackToBalanceShardsInto(t *testing.T) {
ecb := &ecBalancer{
ecNodes: ecNodes,
replicaPlacement: rp,
diskType: types.HardDriveType,
}
racks := ecb.racks()
rackToShardCount := countShardsByRack(vid, ecNodes)
rackToShardCount := countShardsByRack(vid, ecNodes, types.HardDriveType)
got, gotErr := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
if err := errorCheck(gotErr, tc.wantErr); err != nil {

2
weed/shell/command_ec_rebuild.go

@ -295,7 +295,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
// ensure ECNode updates are atomic
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, types.HardDriveType)
return nil
}

8
weed/shell/command_ec_test.go

@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestCommandEcBalanceSmall(t *testing.T) {
@ -14,6 +15,7 @@ func TestCommandEcBalanceSmall(t *testing.T) {
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -30,6 +32,7 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) {
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -48,6 +51,7 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) {
newEcNode("dc1", "rack1", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -66,6 +70,7 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) {
newEcNode("dc1", "rack2", "dn4", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -109,6 +114,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
newEcNode("dc1", "rack1", "dn3", 100),
},
applyBalancing: false,
diskType: types.HardDriveType,
}
ecb.balanceEcVolumes("c1")
@ -128,5 +134,5 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
}
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds)
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType)
}

2
weed/shell/command_volume_server_evacuate.go

@ -204,7 +204,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
} else {
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, types.HardDriveType)
if err != nil {
return
} else {

Loading…
Cancel
Save