Browse Source

refactor

pull/2380/head
Chris Lu 3 years ago
parent
commit
96119eab00
  1. 9
      weed/server/master_grpc_server_volume.go
  2. 4
      weed/server/master_server_handlers.go
  3. 7
      weed/server/master_server_handlers_admin.go
  4. 6
      weed/topology/volume_layout.go

9
weed/server/master_grpc_server_volume.go

@ -42,8 +42,11 @@ func (ms *MasterServer) ProcessGrowRequest() {
return !found return !found
}) })
option := req.Option
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
// not atomic but it's okay // not atomic but it's okay
if !found && ms.shouldVolumeGrow(req.Option) {
if !found && vl.ShouldGrowVolumes(option) {
filter.Store(req, nil) filter.Store(req, nil)
// we have lock called inside vg // we have lock called inside vg
go func() { go func() {
@ -130,7 +133,9 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
} }
if ms.shouldVolumeGrow(option) {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
if vl.ShouldGrowVolumes(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 { if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for " + option.String()) return nil, fmt.Errorf("no free volumes left for " + option.String())
} }

4
weed/server/master_server_handlers.go

@ -113,7 +113,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return return
} }
if ms.shouldVolumeGrow(option) {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
if vl.ShouldGrowVolumes(option) {
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if ms.Topo.AvailableSpaceFor(option) <= 0 { if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})

7
weed/server/master_server_handlers_admin.go

@ -132,13 +132,6 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
} }
} }
func (ms *MasterServer) shouldVolumeGrow(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
active, high := vl.GetActiveVolumeCount(option)
//glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
return active <= high
}
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication") replicationString := r.FormValue("replication")
if replicationString == "" { if replicationString == "" {

6
weed/topology/volume_layout.go

@ -310,6 +310,12 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
return &vid, count, locationList, nil return &vid, count, locationList, nil
} }
func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {
active, crowded := vl.GetActiveVolumeCount(option)
//glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
return active <= crowded
}
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) { func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) {
vl.accessLock.RLock() vl.accessLock.RLock()
defer vl.accessLock.RUnlock() defer vl.accessLock.RUnlock()

Loading…
Cancel
Save