Browse Source

decrease complex: writables slice to map

https://github.com/seaweedfs/seaweedfs/issues/5135
pull/5163/head
Konstantin Lebedev 1 year ago
parent
commit
3c952f87c7
  1. 72
      weed/topology/volume_layout.go

72
weed/topology/volume_layout.go

@ -3,7 +3,6 @@ package topology
import (
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
@ -110,7 +109,7 @@ type VolumeLayout struct {
ttl *needle.TTL
diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id
writables map[needle.VolumeId]struct{} // transient array of writable volume id
crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes
@ -132,7 +131,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
ttl: ttl,
diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
writables: make(map[needle.VolumeId]struct{}),
crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
@ -291,30 +290,26 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
return 0, 0, nil, shouldGrow, errors.New("No more writable volumes!")
}
if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.Intn(lenWriters)]
locationList = vl.vid2location[vid]
var randVid needle.VolumeId
for writableVid := range vl.writables {
randVid = writableVid
break
}
locationList = vl.vid2location[randVid]
if locationList != nil && locationList.Length() > 0 {
// check whether picked file is close to full
dn := locationList.Head()
info, _ := dn.GetVolumesById(vid)
info, _ := dn.GetVolumesById(randVid)
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
shouldGrow = true
}
return vid, count, locationList, shouldGrow, nil
return randVid, count, locationList, shouldGrow, nil
}
return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
// clone vl.writables
writables := make([]needle.VolumeId, len(vl.writables))
copy(writables, vl.writables)
// randomize the writables
rand.Shuffle(len(writables), func(i, j int) {
writables[i], writables[j] = writables[j], writables[i]
})
for _, writableVolumeId := range writables {
volumeLocationList := vl.vid2location[writableVolumeId]
for randWritableVolumeId := range vl.writables {
volumeLocationList := vl.vid2location[randWritableVolumeId]
for _, dn := range volumeLocationList.list {
if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) {
continue
@ -325,9 +320,9 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue
}
vid, locationList = writableVolumeId, volumeLocationList.Copy()
vid, locationList = randWritableVolumeId, volumeLocationList.Copy()
// check whether picked file is close to full
info, _ := dn.GetVolumesById(writableVolumeId)
info, _ := dn.GetVolumesById(randWritableVolumeId)
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
shouldGrow = true
}
@ -360,7 +355,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active,
if option.DataCenter == "" {
return len(vl.writables), len(vl.crowded)
}
for _, v := range vl.writables {
for v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
@ -381,30 +376,21 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active,
}
func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
toDeleteIndex = k
break
}
}
if toDeleteIndex >= 0 {
if _, ok := vl.writables[vid]; !ok {
glog.V(0).Infoln("Volume", vid, "becomes unwritable")
vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...)
vl.removeFromCrowded(vid)
delete(vl.writables, vid)
return true
}
return false
}
func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
}
if _, ok := vl.writables[vid]; ok {
glog.V(0).Infoln("Volume", vid, "becomes writable")
vl.crowded[vid] = struct{}{}
return true
}
glog.V(0).Infoln("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid)
return true
return false
}
func (vl *VolumeLayout) SetVolumeReadOnly(dn *DataNode, vid needle.VolumeId) bool {
@ -504,11 +490,8 @@ func (vl *VolumeLayout) SetVolumeCrowded(vid needle.VolumeId) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
for _, v := range vl.writables {
if v == vid {
vl.setVolumeCrowded(vid)
break
}
if _, ok := vl.writables[vid]; ok {
vl.setVolumeCrowded(vid)
}
}
@ -523,9 +506,10 @@ type VolumeLayoutInfo struct {
func (vl *VolumeLayout) ToInfo() (info VolumeLayoutInfo) {
info.Replication = vl.rp.String()
info.TTL = vl.ttl.String()
info.Writables = vl.writables
for v := range vl.writables {
info.Writables = append(info.Writables, v)
}
info.DiskType = vl.diskType.ReadableString()
//m["locations"] = vl.vid2location
return
}

Loading…
Cancel
Save