Browse Source

master: avoid creating too many volumes

fix https://github.com/chrislusf/seaweedfs/issues/2062
pull/2074/head
Chris Lu 4 years ago
parent
commit
d2d36a3f9d
  1. 4
      weed/topology/node.go
  2. 8
      weed/topology/topology.go
  3. 4
      weed/topology/topology_event_handling.go
  4. 6
      weed/topology/volume_layout.go

4
weed/topology/node.go

@ -242,9 +242,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
for _, v := range dn.GetVolumes() { for _, v := range dn.GetVolumes() {
if v.Size >= volumeSizeLimit { if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- &v
n.GetTopology().chanFullVolumes <- v
}else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold { }else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold {
n.GetTopology().chanCrowdedVolumes <- &v
n.GetTopology().chanCrowdedVolumes <- v
} }
} }
} }

8
weed/topology/topology.go

@ -34,8 +34,8 @@ type Topology struct {
Sequence sequence.Sequencer Sequence sequence.Sequencer
chanFullVolumes chan *storage.VolumeInfo
chanCrowdedVolumes chan *storage.VolumeInfo
chanFullVolumes chan storage.VolumeInfo
chanCrowdedVolumes chan storage.VolumeInfo
Configuration *Configuration Configuration *Configuration
@ -57,8 +57,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.Sequence = seq t.Sequence = seq
t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.chanCrowdedVolumes = make(chan *storage.VolumeInfo)
t.chanFullVolumes = make(chan storage.VolumeInfo)
t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
t.Configuration = &Configuration{} t.Configuration = &Configuration{}

4
weed/topology/topology_event_handling.go

@ -39,7 +39,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
} }
}() }()
} }
func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
diskType := types.ToDiskType(volumeInfo.DiskType) diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) { if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
@ -68,7 +68,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
return true return true
} }
func (t *Topology) SetVolumeCrowded(volumeInfo *storage.VolumeInfo) {
func (t *Topology) SetVolumeCrowded(volumeInfo storage.VolumeInfo) {
diskType := types.ToDiskType(volumeInfo.DiskType) diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
vl.SetVolumeCrowded(volumeInfo.Id) vl.SetVolumeCrowded(volumeInfo.Id)

6
weed/topology/volume_layout.go

@ -108,7 +108,7 @@ type VolumeLayout struct {
diskType types.DiskType diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id writables []needle.VolumeId // transient array of writable volume id
crowded map[needle.VolumeId]interface{}
crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64 volumeSizeLimit uint64
@ -129,7 +129,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
diskType: diskType, diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList), vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId), writables: *new([]needle.VolumeId),
crowded: make(map[needle.VolumeId]interface{}),
crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
volumeSizeLimit: volumeSizeLimit, volumeSizeLimit: volumeSizeLimit,
@ -421,7 +421,7 @@ func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {
func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) { func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) {
if _, ok := vl.crowded[vid]; !ok { if _, ok := vl.crowded[vid]; !ok {
vl.crowded[vid] = nil
vl.crowded[vid] = struct{}{}
glog.V(0).Infoln("Volume", vid, "becomes crowded") glog.V(0).Infoln("Volume", vid, "becomes crowded")
} }
} }

Loading…
Cancel
Save