Browse Source

master server can reset the replicate replacement

pull/279/head
tnextday 10 years ago
parent
commit
3e304e51e9
  1. 14
      go/storage/volume.go
  2. 1
      go/storage/volume_vacuum.go
  3. 15
      go/topology/topology.go
  4. 1
      go/weed/weed_server/master_server.go
  5. 43
      go/weed/weed_server/master_server_handlers_admin.go
  6. 5
      go/weed/weed_server/volume_server_handlers_replicate.go

14
go/storage/volume.go

@ -437,3 +437,17 @@ func (v *Volume) SetReplica(replica *ReplicaPlacement) error {
v.ReplicaPlacement = replica
return v.writeSuperBlock()
}
func (v *Volume) SetReadOnly(isReadOnly bool) error {
if isReadOnly == false {
if fi, e := v.dataFile.Stat(); e != nil {
return e
} else {
if fi.Mode()&0200 == 0 {
return errors.New(v.FileName() + ".dat is READONLY")
}
}
}
v.readOnly = isReadOnly
return nil
}

1
go/storage/volume_vacuum.go

@ -30,6 +30,7 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...")
_ = v.dataFile.Close()
var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}

15
go/topology/topology.go

@ -187,3 +187,18 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
t.LinkChildNode(dc)
return dc
}
type DataNodeWalker func(dn *DataNode) (e error)
func (t *Topology) WalkDataNode(walker DataNodeWalker) error {
for _, c := range t.Children() {
for _, rack := range c.(*DataCenter).Children() {
for _, dn := range rack.(*Rack).Children() {
if e := walker(dn.(*DataNode)); e != nil {
return e
}
}
}
}
return nil
}

1
go/weed/weed_server/master_server.go

@ -71,6 +71,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/replica/set", ms.proxyToLeader(ms.guard.WhiteList(ms.setReplicaHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))

43
go/weed/weed_server/master_server_handlers_admin.go

@ -10,6 +10,9 @@ import (
"strconv"
"strings"
"net/url"
"sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/storage"
@ -184,3 +187,43 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
}
return volumeGrowOption, nil
}
//only proxy to each volume server
func (ms *MasterServer) setReplicaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
if _, e := storage.NewReplicaPlacementFromString(r.FormValue("replication")); e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
all, _ := strconv.ParseBool(r.FormValue("all"))
if !all && len(r.Form["volume"]) == 0 && len(r.Form["collection"]) == 0 {
writeJsonError(w, r, http.StatusBadRequest, errors.New("No available agrs found."))
return
}
result := make(map[string]interface{})
forms := r.Form
var wg sync.WaitGroup
ms.Topo.WalkDataNode(func(dn *topology.DataNode) (e error) {
wg.Add(1)
go func(server string, values url.Values) {
defer wg.Done()
jsonBlob, e := util.Post("http://"+server+"/admin/set_replica", values)
if e != nil {
result[server] = map[string]interface{}{
"error": e.Error() + " " + string(jsonBlob),
}
}
var ret interface{}
if e := json.Unmarshal(jsonBlob, ret); e == nil {
result[server] = ret
} else {
result[server] = map[string]interface{}{
"error": e.Error() + " " + string(jsonBlob),
}
}
}(dn.Url(), forms)
return nil
})
wg.Wait()
writeJson(w, r, http.StatusOK, result)
}

5
go/weed/weed_server/volume_server_handlers_replicate.go

@ -18,6 +18,9 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http
http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest)
return
}
//set read only when replicating
v.SetReadOnly(true)
defer v.SetReadOnly(false)
cr, e := v.GetVolumeCleanReader()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError)
@ -74,7 +77,7 @@ type VolumeOptError struct {
func (vs *VolumeServer) setVolumeReplicaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replica"))
replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
if e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return

Loading…
Cancel
Save