diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 2923b52c1..2aede2d50 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -71,17 +71,6 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { - if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for " + option.String()) - } - vl.AddGrowRequest() - ms.vgCh <- &topology.VolumeGrowRequest{ - Option: option, - Count: int(req.WritableVolumeCount), - } - } - var ( lastErr error maxTimeout = time.Second * 10 @@ -89,9 +78,20 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest ) for time.Now().Sub(startTime) < maxTimeout { - fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) + fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl) + if shouldGrow && !vl.HasGrowRequest() { + // if picked volume is almost full, trigger a volume-grow request + if ms.Topo.AvailableSpaceFor(option) <= 0 { + return nil, fmt.Errorf("no free volumes left for " + option.String()) + } + vl.AddGrowRequest() + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: int(req.WritableVolumeCount), + } + } if err != nil { - glog.Warningf("PickForWrite %+v: %v", req, err) + // glog.Warningf("PickForWrite %+v: %v", req, err) lastErr = err time.Sleep(200 * time.Millisecond) continue diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 4fa6406a7..ba18ce649 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -71,6 +71,8 @@ func (ms *MasterServer) ProcessGrowRequest() { } else { glog.V(4).Infoln("discard volume grow request") + time.Sleep(time.Millisecond * 211) + vl.DoneGrowRequest() } } }() diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 2f2fa199d..6ade9402f 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -119,12 +119,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { + fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl) + if shouldGrow && !vl.HasGrowRequest() { + // if picked volume is almost full, trigger a volume-grow request glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) if ms.Topo.AvailableSpaceFor(option) <= 0 { writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) return } + errCh := make(chan error, 1) vl.AddGrowRequest() ms.vgCh <- &topology.VolumeGrowRequest{ @@ -137,10 +140,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } } - fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { ms.maybeAddJwtAuthorization(w, fid, true) dn := dnList.Head() + writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 1397a0288..03d7570c1 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -201,16 +201,18 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { return next, nil } -func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) +func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) { + var vid needle.VolumeId + vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option) if err != nil { - return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) + return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } - if datanodes.Length() == 0 { - return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) + if volumeLocationList.Length() == 0 { + return "", 0, nil, shouldGrow, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } - fileId := t.Sequence.NextFileId(count) - return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil + nextFileId := t.Sequence.NextFileId(requestedCount) + fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() + return fileId, count, volumeLocationList, shouldGrow, nil } func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 21f5b7267..1f7511cc7 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -106,7 +106,6 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { growRequestCount int32 - growRequestTime time.Time rp *super_block.ReplicaPlacement ttl *needle.TTL diskType types.DiskType @@ -281,28 +280,41 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vid needle.VolumeId, counter uint64, locationList *VolumeLocationList, shouldGrow bool, err error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() lenWriters := len(vl.writables) if lenWriters <= 0 { //glog.V(0).Infoln("No more writable volumes!") - return nil, 0, nil, errors.New("No more writable volumes!") + shouldGrow = true + return 0, 0, nil, shouldGrow, errors.New("No more writable volumes!") } if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { vid := vl.writables[rand.Intn(lenWriters)] - locationList := vl.vid2location[vid] - if locationList != nil { - return &vid, count, locationList, nil + locationList = vl.vid2location[vid] + if locationList != nil && locationList.Length() > 0 { + // check whether picked file is close to full + dn := locationList.Head() + info, _ := dn.GetVolumesById(vid) + if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { + shouldGrow = true + } + return vid, count, locationList, shouldGrow, nil } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid needle.VolumeId - var locationList *VolumeLocationList - counter := 0 - for _, v := range vl.writables { - volumeLocationList := vl.vid2location[v] + + // clone vl.writables + writables := make([]needle.VolumeId, len(vl.writables)) + copy(writables, vl.writables) + // randomize the writables + rand.Shuffle(len(writables), func(i, j int) { + writables[i], writables[j] = writables[j], writables[i] + }) + + for _, writableVolumeId := range writables { + volumeLocationList := vl.vid2location[writableVolumeId] for _, dn := range volumeLocationList.list { if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) { continue @@ -313,29 +325,26 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { continue } - counter++ - if rand.Intn(counter) < 1 { - vid, locationList = v, volumeLocationList.Copy() + vid, locationList = writableVolumeId, volumeLocationList.Copy() + // check whether picked file is close to full + info, _ := dn.GetVolumesById(writableVolumeId) + if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { + shouldGrow = true } + return } } - return &vid, count, locationList, nil + return vid, count, locationList, shouldGrow, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode) } func (vl *VolumeLayout) HasGrowRequest() bool { - if atomic.LoadInt32(&vl.growRequestCount) > 0 && - vl.growRequestTime.Add(time.Minute).After(time.Now()) { - return true - } - return false + return atomic.LoadInt32(&vl.growRequestCount) > 0 } func (vl *VolumeLayout) AddGrowRequest() { - vl.growRequestTime = time.Now() atomic.AddInt32(&vl.growRequestCount, 1) } func (vl *VolumeLayout) DoneGrowRequest() { - vl.growRequestTime = time.Unix(0, 0) - atomic.StoreInt32(&vl.growRequestCount, 0) + atomic.AddInt32(&vl.growRequestCount, -1) } func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {