diff --git a/go/storage/volume.go b/go/storage/volume.go index 124990341..44d80a6be 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -437,3 +437,17 @@ func (v *Volume) SetReplica(replica *ReplicaPlacement) error { v.ReplicaPlacement = replica return v.writeSuperBlock() } + +func (v *Volume) SetReadOnly(isReadOnly bool) error { + if isReadOnly == false { + if fi, e := v.dataFile.Stat(); e != nil { + return e + } else { + if fi.Mode()&0200 == 0 { + return errors.New(v.FileName() + ".dat is READONLY") + } + } + } + v.readOnly = isReadOnly + return nil +} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7377afdc9..a2b7cdf76 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -30,6 +30,7 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") _ = v.dataFile.Close() var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { return e } diff --git a/go/topology/topology.go b/go/topology/topology.go index ee1477cd2..c329b5837 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -187,3 +187,18 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { t.LinkChildNode(dc) return dc } + +type DataNodeWalker func(dn *DataNode) (e error) + +func (t *Topology) WalkDataNode(walker DataNodeWalker) error { + for _, c := range t.Children() { + for _, rack := range c.(*DataCenter).Children() { + for _, dn := range rack.(*Rack).Children() { + if e := walker(dn.(*DataNode)); e != nil { + return e + } + } + } + } + return nil +} diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index db70ca6b1..3cac1873a 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -71,6 +71,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/replica/set", ms.proxyToLeader(ms.guard.WhiteList(ms.setReplicaHandler))) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index fb2b18983..72962517d 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -10,6 +10,9 @@ import ( "strconv" "strings" + "net/url" + "sync" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/storage" @@ -184,3 +187,43 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr } return volumeGrowOption, nil } + +//only proxy to each volume server +func (ms *MasterServer) setReplicaHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + if _, e := storage.NewReplicaPlacementFromString(r.FormValue("replication")); e != nil { + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + all, _ := strconv.ParseBool(r.FormValue("all")) + if !all && len(r.Form["volume"]) == 0 && len(r.Form["collection"]) == 0 { + writeJsonError(w, r, http.StatusBadRequest, errors.New("No available agrs found.")) + return + } + result := make(map[string]interface{}) + forms := r.Form + var wg sync.WaitGroup + ms.Topo.WalkDataNode(func(dn *topology.DataNode) (e error) { + wg.Add(1) + go func(server string, values url.Values) { + defer wg.Done() + jsonBlob, e := util.Post("http://"+server+"/admin/set_replica", values) + if e != nil { + result[server] = map[string]interface{}{ + "error": e.Error() + " " + string(jsonBlob), + } + } + var ret interface{} + if e := json.Unmarshal(jsonBlob, ret); e == nil { + result[server] = ret + } else { + result[server] = map[string]interface{}{ + "error": e.Error() + " " + string(jsonBlob), + } + } + }(dn.Url(), forms) + return nil + }) + wg.Wait() + writeJson(w, r, http.StatusOK, result) +} diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go index 6efd2546c..cc421dc88 100644 --- a/go/weed/weed_server/volume_server_handlers_replicate.go +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -18,6 +18,9 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest) return } + //set read only when replicating + v.SetReadOnly(true) + defer v.SetReadOnly(false) cr, e := v.GetVolumeCleanReader() if e != nil { http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError) @@ -74,7 +77,7 @@ type VolumeOptError struct { func (vs *VolumeServer) setVolumeReplicaHandler(w http.ResponseWriter, r *http.Request) { r.ParseForm() - replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replica")) + replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replication")) if e != nil { writeJsonError(w, r, http.StatusBadRequest, e) return