Konstantin Lebedev 23 hours ago
committed by GitHub
parent
commit
72fb6f0156
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 182
      weed/shell/command_volume_balance.go
  2. 7
      weed/shell/command_volume_balance_test.go
  3. 11
      weed/shell/command_volume_fix_replication.go
  4. 7
      weed/shell/command_volume_server_evacuate.go
  5. 9
      weed/shell/command_volume_tier_move.go
  6. 1
      weed/shell/commands.go

182
weed/shell/command_volume_balance.go

@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -25,11 +26,14 @@ func init() {
Commands = append(Commands, &commandVolumeBalance{})
}
const thresholdVolumeSize = 1.01
type commandVolumeBalance struct {
volumeSizeLimitMb uint64
commandEnv *CommandEnv
writable bool
capacityByFunc CapacityByFunc
volumeByActive *bool
applyBalancing bool
volumeSizeLimitMb uint64
}
func (c *commandVolumeBalance) Name() string {
@ -84,22 +88,56 @@ func (c *commandVolumeBalance) HasTag(CommandTag) bool {
}
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
allowedCapacityBy := map[string]CapacityByFunc{
"MAX_VOLUME_COUNT": capacityByMaxVolumeCount,
"FREE_VOLUME_COUNT": capacityByFreeVolumeCount,
"MIN_VOLUME_DENSITY": capacityByMinVolumeDensity,
}
allowedVolumeBy := map[string]*bool{
"ALL": nil,
"ACTIVE": new(bool),
"FULL": new(bool),
}
*allowedVolumeBy["ACTIVE"] = true
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := balanceCommand.Bool("v", false, "verbose mode")
collection := balanceCommand.String("collection", "ALL_COLLECTIONS", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
racks := balanceCommand.String("racks", "", "only apply the balancing for this racks")
nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes")
writable := balanceCommand.Bool("writable", false, "only apply the balancing for writable volumes")
noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan.")
// TODO: remove this alias
applyBalancingAlias := balanceCommand.Bool("force", false, "apply the balancing plan (alias for -apply)")
balanceCommand.Func("volumeBy", "only apply the balancing for ALL volumes and ACTIVE or FULL", func(flagValue string) error {
if flagValue == "" {
return nil
}
for allowed, volumeBy := range allowedVolumeBy {
if flagValue == allowed {
c.volumeByActive = volumeBy
return nil
}
}
return fmt.Errorf("use \"ALL\", \"ACTIVE\" or \"FULL\"")
})
c.capacityByFunc = capacityByMaxVolumeCount
balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error {
if flagValue == "" {
return nil
}
for allowed, allowedCapacityByFunc := range allowedCapacityBy {
if flagValue == allowed {
c.capacityByFunc = allowedCapacityByFunc
return nil
}
}
return fmt.Errorf("use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" or \"MIN_VOLUME_DENSITY\"")
})
if err = balanceCommand.Parse(args); err != nil {
return nil
}
handleDeprecatedForceFlag(writer, balanceCommand, applyBalancingAlias, applyBalancing)
c.writable = *writable
c.applyBalancing = *applyBalancing
infoAboutSimulationMode(writer, c.applyBalancing, "-apply")
@ -111,6 +149,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return
}
}
commandEnv.verbose = *verbose
c.commandEnv = commandEnv
// collect topology information
@ -182,13 +221,10 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis
if v.DiskType != string(diskType) {
return false
}
if c.writable && v.Size > c.volumeSizeLimitMb {
return false
}
return true
return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb)
})
}
if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing); err != nil {
if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.capacityByFunc); err != nil {
return err
}
@ -245,42 +281,92 @@ type Node struct {
rack string
}
type CapacityFunc func(*master_pb.DataNodeInfo) float64
type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64, bool)
type CapacityByFunc func(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc
func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc {
return func(info *master_pb.DataNodeInfo) float64 {
func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc {
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) {
diskInfo, found := info.DiskInfos[string(diskType)]
if !found {
return 0
return 0, 0, false
}
var volumeSizes uint64
for _, volumeInfo := range diskInfo.VolumeInfos {
volumeSizes += volumeInfo.Size
}
var ecShardCount int
for _, ecShardInfo := range diskInfo.EcShardInfos {
ecShardCount += erasure_coding.GetShardCount(ecShardInfo)
}
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false
}
}
func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc {
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) {
diskInfo, found := info.DiskInfos[string(diskType)]
if !found {
return 0, 0, true
}
var volumeSizes uint64
for _, volumeInfo := range diskInfo.VolumeInfos {
volumeSizes += volumeInfo.Size
}
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true
}
}
func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc {
return func(info *master_pb.DataNodeInfo) float64 {
func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc {
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) {
diskInfo, found := info.DiskInfos[string(diskType)]
if !found {
return 0
return 0, 0, false
}
var volumeSizes uint64
for _, volumeInfo := range diskInfo.VolumeInfos {
volumeSizes += volumeInfo.Size
}
var ecShardCount int
for _, ecShardInfo := range diskInfo.EcShardInfos {
ecShardCount += erasure_coding.GetShardCount(ecShardInfo)
}
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false
}
}
func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 {
return float64(len(n.selectedVolumes)) / capacityFunc(n.info)
capacity, used, isDensityBased := capacityFunc(n.info)
if capacity == 0 {
return 0
}
if isDensityBased {
return float64(used) / capacity
}
return float64(len(n.selectedVolumes)) / capacity
}
func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 {
return float64(len(n.selectedVolumes)+1) / capacityFunc(n.info)
capacity, used, isDensityBased := capacityFunc(n.info)
if capacity == 0 {
return 0
}
if isDensityBased {
return float64(used+1) / capacity
}
return float64(len(n.selectedVolumes)+1) / capacity
}
func (n *Node) isOneVolumeOnly() bool {
@ -312,24 +398,42 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
selectedVolumeCount, volumeMaxCount := 0, float64(0)
func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLimitMb uint64) bool {
if volumeByActive == nil {
return true
}
if uint64(float64(volumeSize)*thresholdVolumeSize) < volumeSizeLimitMb*util.MiByte {
return *volumeByActive
} else {
return !(*volumeByActive)
}
}
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, capacityByFunc CapacityByFunc) (err error) {
ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0)
var nodesWithCapacity []*Node
capacityFunc := capacityByMaxVolumeCount(diskType)
capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb)
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
capacity := capacityFunc(dn.info)
capacity, volumeUsed, isDensityBased := capacityFunc(dn.info)
if capacity > 0 {
nodesWithCapacity = append(nodesWithCapacity, dn)
}
volumeMaxCount += capacity
if isDensityBased {
ratioVolumeCount += volumeUsed
} else {
ratioVolumeCount += uint64(len(dn.selectedVolumes))
}
volumeCapacities += capacity
}
if volumeCapacities > 0 {
idealVolumeRatio = float64(ratioVolumeCount) / volumeCapacities
}
idealVolumeRatio := float64(selectedVolumeCount) / volumeMaxCount
hasMoved := true
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
if commandEnv != nil && commandEnv.verbose {
fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, idealVolumeRatio %f\n", len(nodesWithCapacity), ratioVolumeCount, int64(volumeCapacities), idealVolumeRatio)
}
for hasMoved {
hasMoved = false
@ -337,7 +441,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
return cmp.Compare(a.localVolumeRatio(capacityFunc), b.localVolumeRatio(capacityFunc))
})
if len(nodesWithCapacity) == 0 {
fmt.Printf("no volume server found with capacity for %s", diskType.ReadableString())
if commandEnv != nil && commandEnv.verbose {
fmt.Fprintf(os.Stdout, "no volume server found with capacity for %s", diskType.ReadableString())
}
return nil
}
@ -345,6 +451,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
var fullNodeIndex int
for fullNodeIndex = len(nodesWithCapacity) - 1; fullNodeIndex >= 0; fullNodeIndex-- {
fullNode = nodesWithCapacity[fullNodeIndex]
if len(fullNode.selectedVolumes) == 0 {
continue
}
if !fullNode.isOneVolumeOnly() {
break
}
@ -356,12 +465,19 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
sortCandidatesFn(candidateVolumes)
for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] {
if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) {
// no more volume servers with empty slots
if commandEnv != nil && commandEnv.verbose {
fmt.Printf("no more volume servers with empty slots %s, idealVolumeRatio %f\n", emptyNode.info.Id, idealVolumeRatio)
}
break
}
fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc))
if commandEnv != nil && commandEnv.verbose {
fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc))
}
hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
if err != nil {
if commandEnv != nil && commandEnv.verbose {
fmt.Fprintf(os.Stdout, "attempt to move one volume error %+v\n", err)
}
return
}
if hasMoved {

7
weed/shell/command_volume_balance_test.go

@ -255,11 +255,14 @@ func TestBalance(t *testing.T) {
volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "")
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
diskTypes := collectVolumeDiskTypes(topologyInfo)
c := &commandVolumeBalance{}
c := &commandVolumeBalance{capacityByFunc: capacityByMaxVolumeCount}
if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil {
t.Errorf("balance: %v", err)
}
c = &commandVolumeBalance{capacityByFunc: capacityByMinVolumeDensity, volumeByActive: new(bool)}
if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil {
t.Errorf("balance: %v", err)
}
}
func TestVolumeSelection(t *testing.T) {

11
weed/shell/command_volume_fix_replication.go

@ -357,10 +357,11 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
foundNewLocation := false
hasSkippedCollection := false
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType), 0)
for _, dst := range allLocations {
freeVolumeCount, _, _ := fn(dst.dataNode)
// check whether data nodes satisfy the constraints
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
if freeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern
if *c.collectionPattern != "" {
var matched bool
@ -439,9 +440,11 @@ func addVolumeCount(info *master_pb.DiskInfo, count int) {
}
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
fn := capacityByFreeVolumeCount(diskType)
fn := capacityByFreeVolumeCount(diskType, 0)
slices.SortFunc(dataNodes, func(a, b location) int {
return int(fn(b.dataNode) - fn(a.dataNode))
freeVolumeCountA, _, _ := fn(a.dataNode)
freeVolumeCountB, _, _ := fn(b.dataNode)
return int(freeVolumeCountB - freeVolumeCountA)
})
}

7
weed/shell/command_volume_server_evacuate.go

@ -261,8 +261,8 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
}
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType))
freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType), 0)
maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType), 0)
for _, n := range otherNodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
return v.DiskType == vol.DiskType
@ -274,7 +274,8 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
if freeVolumeCountfn(emptyNode.info) <= 0 {
freeVolumeCount, _, _ := freeVolumeCountfn(emptyNode.info)
if freeVolumeCount <= 0 {
continue
}
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)

9
weed/shell/command_volume_tier_move.go

@ -151,7 +151,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
for _, vid := range volumeIds {
collection := volumeIdToCollection[vid]
if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil {
if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations, volumeSizeLimitMb); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
allLocations = rotateDataNodes(allLocations)
@ -200,7 +200,7 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false
}
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) {
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location, volumeSizeLimitMb uint64) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !found {
@ -209,9 +209,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
// find one server with the most empty volume slots with target disk type
hasFoundTarget := false
fn := capacityByFreeVolumeCount(toDiskType)
fn := capacityByFreeVolumeCount(toDiskType, volumeSizeLimitMb)
for _, dst := range allLocations {
if fn(dst.dataNode) > 0 && !hasFoundTarget {
freeVolumeCount, _, _ := fn(dst.dataNode)
if freeVolumeCount > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
if isOneOf(dst.dataNode.Id, locations) {
continue

1
weed/shell/commands.go

@ -38,6 +38,7 @@ type CommandEnv struct {
option *ShellOptions
locker *exclusive_locks.ExclusiveLocker
noLock bool
verbose bool
}
func NewCommandEnv(options *ShellOptions) *CommandEnv {

Loading…
Cancel
Save