diff --git a/go/topology/collection.go b/go/topology/collection.go index 3d7bb7e0e..f8217a7ff 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -35,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout { return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList { for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/go/topology/topology.go b/go/topology/topology.go index 6cdd1e1fa..410a1c70e 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -90,7 +90,7 @@ func (t *Topology) loadConfiguration(configurationFile string) error { return nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 2bb2a9d66..8ad8fc67e 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -23,6 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { if t.IsLeader() { for range c { t.Vacuum(garbageThreshold) + t.CheckReplicate() } } }(garbageThreshold) diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 000d6ef4f..5a0186be9 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -1,38 +1,138 @@ package topology import ( + "container/list" + "fmt" + "time" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" ) -func (t *Topology) CheckReplicate() int { - glog.V(0).Infoln("Start replicate checker on demand") +const ReplicateTaskTimeout = time.Hour + +type ReplicateTask struct { + Vid storage.VolumeId + Collection string + SrcDN *DataNode + DstDN *DataNode +} + +func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { for _, col := range t.collectionMap.Items { c := col.(*Collection) glog.V(0).Infoln("checking replicate on collection:", c.Name) + growOption := &VolumeGrowOption{ReplicaPlacement: c.rp} for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { volumeLayout := vl.(*VolumeLayout) - copyCount := volumeLayout.rp.GetCopyCount() for vid, locationList := range volumeLayout.vid2location { - if locationList.Length() < copyCount { - //set volume readonly - glog.V(0).Infoln("replicate volume :", vid) - SetVolumeReadonly(locationList, vid.String(), true) - + rp1 := locationList.CalcReplicaPlacement() + if rp1.Compare(volumeLayout.rp) >= 0 { + continue + } + if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil { + for _, s := range additionServers { + s.UpAdjustPlannedVolumeCountDelta(1) + rt := &ReplicateTask{ + Vid: vid, + Collection: c.Name, + SrcDN: locationList.PickForRead(), + DstDN: s, + } + tasks = append(tasks, rt) + glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url()) + } + } else { + glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e) } } } } } - return 0 + return } -func (t *Topology) doReplicate(vl *VolumeLayout, vid storage.VolumeId) { - locationList := vl.vid2location[vid] - if !SetVolumeReadonly(locationList, vid.String(), true) { - return +func (topo *Topology) CheckReplicate() { + glog.V(1).Infoln("Start replicate checker on demand") + busyDataNodes := make(map[*DataNode]int) + taskCount := 0 + taskQueue := list.New() + for _, t := range planReplicateTasks(topo) { + taskQueue.PushBack(t) + taskCount++ } - defer SetVolumeReadonly(locationList, vid.String(), false) + taskChan := make(chan *ReplicateTask) + for taskCount > 0 { + TaskQueueLoop: + for e := taskQueue.Front(); e != nil; e = e.Next() { + task := e.Value.(*ReplicateTask) + //only one task will run on the data node + dns := task.WorkingDataNodes() + for _, dn := range dns { + if busyDataNodes[dn] > 0 { + continue TaskQueueLoop + } + } + for _, dn := range dns { + busyDataNodes[dn]++ + } + go func(t *ReplicateTask) { + if e := t.Run(topo); e != nil { + glog.V(0).Infof("ReplicateTask run error, vid: %v, dst: %s. %v", t.Vid, t.DstDN.Url(), e) + } else { + glog.V(2).Infof("ReplicateTask finished, vid: %v, dst: %s", t.Vid, t.DstDN.Url()) + + } + taskChan <- t + }(task) + taskQueue.Remove(e) + + } + finishedTask := <-taskChan + for _, dn := range finishedTask.WorkingDataNodes() { + if busyDataNodes[dn] > 0 { + busyDataNodes[dn]-- + } + } + taskCount-- + finishedTask.DstDN.UpAdjustPlannedVolumeCountDelta(-1) + } + glog.V(1).Infoln("finish replicate check.") +} + +func (t *ReplicateTask) Run(topo *Topology) error { + //is lookup thread safe? + locationList := topo.Lookup(t.Collection, t.Vid) + rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection) + if locationList.CalcReplicaPlacement().Compare(rp) >= 0 { + glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String()) + return nil + } + if !SetVolumeReadonly(locationList, t.Vid.String(), true) { + return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid) + } + defer SetVolumeReadonly(locationList, t.Vid.String(), false) + tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{ + "volume": t.Vid.String(), + "source": t.SrcDN.Url(), + "collection": t.Collection, + }) + if e != nil { + return e + } + if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil { + tc.Clean() + return e + } + e = tc.Commit() + return e +} + +func (t *ReplicateTask) WorkingDataNodes() []*DataNode { + return []*DataNode{ + t.SrcDN, + t.DstDN, + } } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 8a922f945..2a38fda99 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -73,9 +73,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { !v.ReadOnly } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList { if location := vl.vid2location[vid]; location != nil { - return location.list + return location } return nil } diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 9c7f60881..929d0c8a3 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/go/storage" + "math/rand" ) type VolumeLocationList struct { @@ -23,6 +24,14 @@ func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } +func (dnll *VolumeLocationList) PickForRead() *DataNode { + return dnll.list[rand.Intn(len(dnll.list))] +} + +func (dnll *VolumeLocationList) AllDataNode() []*DataNode { + return dnll.list +} + func (dnll *VolumeLocationList) Length() int { return len(dnll.list) } diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 6a5b06c3c..a61dd765d 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -23,10 +23,10 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume } volumeId, err := storage.NewVolumeId(vid) if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { + locationList := ms.Topo.Lookup(collection, volumeId) + if locationList != nil { var ret operation.Locations - for _, dn := range machines { + for _, dn := range locationList.AllDataNode() { ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) } volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 4b7f809ec..4c883ae2a 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io/ioutil" - "math/rand" "net/http" "strconv" "strings" @@ -124,9 +123,9 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) debug("parsing error:", err, r.URL.Path) return } - machines := ms.Topo.Lookup("", volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl)+r.URL.Path, http.StatusMovedPermanently) + locations := ms.Topo.Lookup("", volumeId) + if locations != nil && locations.Length() > 0 { + http.Redirect(w, r, util.NormalizeUrl(locations.PickForRead().PublicUrl)+r.URL.Path, http.StatusMovedPermanently) } else { writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId)) }