Browse Source

master: check volume ReplicatePlacement and auto replicate the lost

pull/279/head
tnextday 10 years ago
parent
commit
8b32d23304
  1. 2
      go/topology/collection.go
  2. 2
      go/topology/topology.go
  3. 1
      go/topology/topology_event_handling.go
  4. 128
      go/topology/topology_replicate.go
  5. 4
      go/topology/volume_layout.go
  6. 9
      go/topology/volume_location_list.go
  7. 6
      go/weed/weed_server/master_server_handlers.go
  8. 7
      go/weed/weed_server/master_server_handlers_admin.go

2
go/topology/collection.go

@ -35,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout {
return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {

2
go/topology/topology.go

@ -90,7 +90,7 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil
}
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items {

1
go/topology/topology_event_handling.go

@ -23,6 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
if t.IsLeader() {
for range c {
t.Vacuum(garbageThreshold)
t.CheckReplicate()
}
}
}(garbageThreshold)

128
go/topology/topology_replicate.go

@ -1,38 +1,138 @@
package topology
import (
"container/list"
"fmt"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
)
func (t *Topology) CheckReplicate() int {
glog.V(0).Infoln("Start replicate checker on demand")
const ReplicateTaskTimeout = time.Hour
type ReplicateTask struct {
Vid storage.VolumeId
Collection string
SrcDN *DataNode
DstDN *DataNode
}
func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
glog.V(0).Infoln("checking replicate on collection:", c.Name)
growOption := &VolumeGrowOption{ReplicaPlacement: c.rp}
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
copyCount := volumeLayout.rp.GetCopyCount()
for vid, locationList := range volumeLayout.vid2location {
if locationList.Length() < copyCount {
//set volume readonly
glog.V(0).Infoln("replicate volume :", vid)
SetVolumeReadonly(locationList, vid.String(), true)
rp1 := locationList.CalcReplicaPlacement()
if rp1.Compare(volumeLayout.rp) >= 0 {
continue
}
if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil {
for _, s := range additionServers {
s.UpAdjustPlannedVolumeCountDelta(1)
rt := &ReplicateTask{
Vid: vid,
Collection: c.Name,
SrcDN: locationList.PickForRead(),
DstDN: s,
}
tasks = append(tasks, rt)
glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url())
}
} else {
glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e)
}
}
}
}
}
return 0
return
}
func (t *Topology) doReplicate(vl *VolumeLayout, vid storage.VolumeId) {
locationList := vl.vid2location[vid]
if !SetVolumeReadonly(locationList, vid.String(), true) {
return
func (topo *Topology) CheckReplicate() {
glog.V(1).Infoln("Start replicate checker on demand")
busyDataNodes := make(map[*DataNode]int)
taskCount := 0
taskQueue := list.New()
for _, t := range planReplicateTasks(topo) {
taskQueue.PushBack(t)
taskCount++
}
defer SetVolumeReadonly(locationList, vid.String(), false)
taskChan := make(chan *ReplicateTask)
for taskCount > 0 {
TaskQueueLoop:
for e := taskQueue.Front(); e != nil; e = e.Next() {
task := e.Value.(*ReplicateTask)
//only one task will run on the data node
dns := task.WorkingDataNodes()
for _, dn := range dns {
if busyDataNodes[dn] > 0 {
continue TaskQueueLoop
}
}
for _, dn := range dns {
busyDataNodes[dn]++
}
go func(t *ReplicateTask) {
if e := t.Run(topo); e != nil {
glog.V(0).Infof("ReplicateTask run error, vid: %v, dst: %s. %v", t.Vid, t.DstDN.Url(), e)
} else {
glog.V(2).Infof("ReplicateTask finished, vid: %v, dst: %s", t.Vid, t.DstDN.Url())
}
taskChan <- t
}(task)
taskQueue.Remove(e)
}
finishedTask := <-taskChan
for _, dn := range finishedTask.WorkingDataNodes() {
if busyDataNodes[dn] > 0 {
busyDataNodes[dn]--
}
}
taskCount--
finishedTask.DstDN.UpAdjustPlannedVolumeCountDelta(-1)
}
glog.V(1).Infoln("finish replicate check.")
}
func (t *ReplicateTask) Run(topo *Topology) error {
//is lookup thread safe?
locationList := topo.Lookup(t.Collection, t.Vid)
rp := topo.CollectionSettings.GetReplicaPlacement(t.Collection)
if locationList.CalcReplicaPlacement().Compare(rp) >= 0 {
glog.V(0).Infof("volume [%v] has right replica placement, rp: %s", t.Vid, rp.String())
return nil
}
if !SetVolumeReadonly(locationList, t.Vid.String(), true) {
return fmt.Errorf("set volume readonly failed, vid=%v", t.Vid)
}
defer SetVolumeReadonly(locationList, t.Vid.String(), false)
tc, e := storage.NewTaskCli(t.DstDN.Url(), storage.TaskReplica, storage.TaskParams{
"volume": t.Vid.String(),
"source": t.SrcDN.Url(),
"collection": t.Collection,
})
if e != nil {
return e
}
if e = tc.WaitAndQueryResult(ReplicateTaskTimeout); e != nil {
tc.Clean()
return e
}
e = tc.Commit()
return e
}
func (t *ReplicateTask) WorkingDataNodes() []*DataNode {
return []*DataNode{
t.SrcDN,
t.DstDN,
}
}

4
go/topology/volume_layout.go

@ -73,9 +73,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
!v.ReadOnly
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList {
if location := vl.vid2location[vid]; location != nil {
return location.list
return location
}
return nil
}

9
go/topology/volume_location_list.go

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/go/storage"
"math/rand"
)
type VolumeLocationList struct {
@ -23,6 +24,14 @@ func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0]
}
func (dnll *VolumeLocationList) PickForRead() *DataNode {
return dnll.list[rand.Intn(len(dnll.list))]
}
func (dnll *VolumeLocationList) AllDataNode() []*DataNode {
return dnll.list
}
func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}

6
go/weed/weed_server/master_server_handlers.go

@ -23,10 +23,10 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
}
volumeId, err := storage.NewVolumeId(vid)
if err == nil {
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
locationList := ms.Topo.Lookup(collection, volumeId)
if locationList != nil {
var ret operation.Locations
for _, dn := range machines {
for _, dn := range locationList.AllDataNode() {
ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
}
volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}

7
go/weed/weed_server/master_server_handlers_admin.go

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"strings"
@ -124,9 +123,9 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
debug("parsing error:", err, r.URL.Path)
return
}
machines := ms.Topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
locations := ms.Topo.Lookup("", volumeId)
if locations != nil && locations.Length() > 0 {
http.Redirect(w, r, util.NormalizeUrl(locations.PickForRead().PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found", volumeId))
}

Loading…
Cancel
Save