diff --git a/go/storage/volume.go b/go/storage/volume.go index 5c6b12e9b..bae2e4aee 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -426,3 +426,12 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { } return false } + + +func (v *Volume) SetReplica(replica *ReplicaPlacement) error{ + if v.ReplicaPlacement.String() == replica.String(){ + return nil + } + v.ReplicaPlacement = replica + return v.writeSuperBlock() +} \ No newline at end of file diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index e37360075..e0fbd9e9a 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -59,6 +59,7 @@ func (v *Volume) maybeWriteSuperBlock() error { } return e } + func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) @@ -70,6 +71,16 @@ func (v *Volume) readSuperBlock() (err error) { v.SuperBlock, err = ParseSuperBlock(header) return err } + +func (v *Volume) writeSuperBlock() (err error) { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil { + return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e) + } + return nil +} + func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.version = Version(header[0]) if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index b8472235e..3480ad09b 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -58,6 +58,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) + adminMux.HandleFunc("/admin/set_replica", vs.guard.WhiteList(vs.setVolumeReplicaHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go index 411967326..866c808f8 100644 --- a/go/weed/weed_server/volume_server_handlers_replicate.go +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/pierrec/lz4" ) @@ -64,3 +65,42 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http } lz4w.Close() } + +type VolumeOptError struct { + Volume string `json:"volume"` + Err string `json:"err"` +} + +func (vs *VolumeServer) setVolumeReplicaHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replica")) + if e != nil { + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + errs := []VolumeOptError{} + for _, volume := range r.Form["volume"] { + if vid, e := storage.NewVolumeId(volume); e == nil { + if v := vs.store.GetVolume(vid); v != nil { + if e := v.SetReplica(replica); e != nil { + errs = append(errs, VolumeOptError{ + Volume: volume, + Err: e.Error(), + }) + } + } + } else { + errs = append(errs, VolumeOptError{ + Volume: volume, + Err: e.Error(), + }) + } + } + result := make(map[string]interface{}) + if len(errs) > 0 { + result["error"] = "set volume replica error." + result["errors"] = errs + } + + writeJson(w, r, http.StatusAccepted, result) +}