diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 8ad8fc67e..65792a08a 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -23,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { if t.IsLeader() { for range c { t.Vacuum(garbageThreshold) - t.CheckReplicate() + // t.CheckReplicate() } } }(garbageThreshold) diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 5a0186be9..13400e0d7 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -9,6 +9,10 @@ import ( "github.com/chrislusf/seaweedfs/go/storage" ) +var ( + isReplicateCheckerRunning = false +) + const ReplicateTaskTimeout = time.Hour type ReplicateTask struct { @@ -18,6 +22,41 @@ type ReplicateTask struct { DstDN *DataNode } +func (t *ReplicateTask) Run(topo *Topology) error { + //is lookup thread safe? + locationList := topo.Lookup(t.Collection, t.Vid) + rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection) + if locationList.CalcReplicaPlacement().Compare(rp) >= 0 { + glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String()) + return nil + } + if !SetVolumeReadonly(locationList, t.Vid.String(), true) { + return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid) + } + defer SetVolumeReadonly(locationList, t.Vid.String(), false) + tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{ + "volume": t.Vid.String(), + "source": t.SrcDN.Url(), + "collection": t.Collection, + }) + if e != nil { + return e + } + if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil { + tc.Clean() + return e + } + e = tc.Commit() + return e +} + +func (t *ReplicateTask) WorkingDataNodes() []*DataNode { + return []*DataNode{ + t.SrcDN, + t.DstDN, + } +} + func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { for _, col := range t.collectionMap.Items { c := col.(*Collection) @@ -54,6 +93,10 @@ func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { } func (topo *Topology) CheckReplicate() { + isReplicateCheckerRunning = true + defer func() { + isReplicateCheckerRunning = false + }() glog.V(1).Infoln("Start replicate checker on demand") busyDataNodes := make(map[*DataNode]int) taskCount := 0 @@ -102,37 +145,10 @@ func (topo *Topology) CheckReplicate() { glog.V(1).Infoln("finish replicate check.") } -func (t *ReplicateTask) Run(topo *Topology) error { - //is lookup thread safe? - locationList := topo.Lookup(t.Collection, t.Vid) - rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection) - if locationList.CalcReplicaPlacement().Compare(rp) >= 0 { - glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String()) - return nil - } - if !SetVolumeReadonly(locationList, t.Vid.String(), true) { - return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid) - } - defer SetVolumeReadonly(locationList, t.Vid.String(), false) - tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{ - "volume": t.Vid.String(), - "source": t.SrcDN.Url(), - "collection": t.Collection, - }) - if e != nil { - return e - } - if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil { - tc.Clean() - return e +func (topo *Topology) StartCheckReplicate() { + if isReplicateCheckerRunning { + return } - e = tc.Commit() - return e -} + go topo.CheckReplicate() -func (t *ReplicateTask) WorkingDataNodes() []*DataNode { - return []*DataNode{ - t.SrcDN, - t.DstDN, - } } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 1adb8820e..62ec5c9aa 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -73,6 +73,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/vol/check_replicate", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeCheckReplicateHandler))) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 4c883ae2a..129734692 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -84,6 +84,11 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque ms.dirStatusHandler(w, r) } +func (ms *MasterServer) volumeCheckReplicateHandler(w http.ResponseWriter, r *http.Request) { + ms.Topo.StartCheckReplicate() + ms.dirStatusHandler(w, r) +} + func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { count := 0 option, err := ms.getVolumeGrowOption(r)