Browse Source

master: disable auto check replicate and add a manual api `/vol/check_replicate`

pull/279/head
tnextday 10 years ago
parent
commit
b39fedd3cc
  1. 2
      go/topology/topology_event_handling.go
  2. 78
      go/topology/topology_replicate.go
  3. 1
      go/weed/weed_server/master_server.go
  4. 5
      go/weed/weed_server/master_server_handlers_admin.go

2
go/topology/topology_event_handling.go

@ -23,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
if t.IsLeader() { if t.IsLeader() {
for range c { for range c {
t.Vacuum(garbageThreshold) t.Vacuum(garbageThreshold)
t.CheckReplicate()
// t.CheckReplicate()
} }
} }
}(garbageThreshold) }(garbageThreshold)

78
go/topology/topology_replicate.go

@ -9,6 +9,10 @@ import (
"github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/storage"
) )
var (
isReplicateCheckerRunning = false
)
const ReplicateTaskTimeout = time.Hour const ReplicateTaskTimeout = time.Hour
type ReplicateTask struct { type ReplicateTask struct {
@ -18,6 +22,41 @@ type ReplicateTask struct {
DstDN *DataNode DstDN *DataNode
} }
func (t *ReplicateTask) Run(topo *Topology) error {
//is lookup thread safe?
locationList := topo.Lookup(t.Collection, t.Vid)
rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection)
if locationList.CalcReplicaPlacement().Compare(rp) >= 0 {
glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String())
return nil
}
if !SetVolumeReadonly(locationList, t.Vid.String(), true) {
return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid)
}
defer SetVolumeReadonly(locationList, t.Vid.String(), false)
tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{
"volume": t.Vid.String(),
"source": t.SrcDN.Url(),
"collection": t.Collection,
})
if e != nil {
return e
}
if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil {
tc.Clean()
return e
}
e = tc.Commit()
return e
}
func (t *ReplicateTask) WorkingDataNodes() []*DataNode {
return []*DataNode{
t.SrcDN,
t.DstDN,
}
}
func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
for _, col := range t.collectionMap.Items { for _, col := range t.collectionMap.Items {
c := col.(*Collection) c := col.(*Collection)
@ -54,6 +93,10 @@ func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
} }
func (topo *Topology) CheckReplicate() { func (topo *Topology) CheckReplicate() {
isReplicateCheckerRunning = true
defer func() {
isReplicateCheckerRunning = false
}()
glog.V(1).Infoln("Start replicate checker on demand") glog.V(1).Infoln("Start replicate checker on demand")
busyDataNodes := make(map[*DataNode]int) busyDataNodes := make(map[*DataNode]int)
taskCount := 0 taskCount := 0
@ -102,37 +145,10 @@ func (topo *Topology) CheckReplicate() {
glog.V(1).Infoln("finish replicate check.") glog.V(1).Infoln("finish replicate check.")
} }
func (t *ReplicateTask) Run(topo *Topology) error {
//is lookup thread safe?
locationList := topo.Lookup(t.Collection, t.Vid)
rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection)
if locationList.CalcReplicaPlacement().Compare(rp) >= 0 {
glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String())
return nil
}
if !SetVolumeReadonly(locationList, t.Vid.String(), true) {
return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid)
}
defer SetVolumeReadonly(locationList, t.Vid.String(), false)
tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{
"volume": t.Vid.String(),
"source": t.SrcDN.Url(),
"collection": t.Collection,
})
if e != nil {
return e
}
if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil {
tc.Clean()
return e
func (topo *Topology) StartCheckReplicate() {
if isReplicateCheckerRunning {
return
} }
e = tc.Commit()
return e
}
go topo.CheckReplicate()
func (t *ReplicateTask) WorkingDataNodes() []*DataNode {
return []*DataNode{
t.SrcDN,
t.DstDN,
}
} }

1
go/weed/weed_server/master_server.go

@ -73,6 +73,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/vol/check_replicate", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeCheckReplicateHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))

5
go/weed/weed_server/master_server_handlers_admin.go

@ -84,6 +84,11 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
ms.dirStatusHandler(w, r) ms.dirStatusHandler(w, r)
} }
func (ms *MasterServer) volumeCheckReplicateHandler(w http.ResponseWriter, r *http.Request) {
ms.Topo.StartCheckReplicate()
ms.dirStatusHandler(w, r)
}
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0 count := 0
option, err := ms.getVolumeGrowOption(r) option, err := ms.getVolumeGrowOption(r)

Loading…
Cancel
Save