Browse Source

Volume server add "/admin/set_replica" handle to update volume replica placement setting

pull/279/head
tnextday 10 years ago
parent
commit
3bfb2d09cb
  1. 9
      go/storage/volume.go
  2. 11
      go/storage/volume_super_block.go
  3. 1
      go/weed/weed_server/volume_server.go
  4. 40
      go/weed/weed_server/volume_server_handlers_replicate.go

9
go/storage/volume.go

@ -426,3 +426,12 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
}
return false
}
func (v *Volume) SetReplica(replica *ReplicaPlacement) error{
if v.ReplicaPlacement.String() == replica.String(){
return nil
}
v.ReplicaPlacement = replica
return v.writeSuperBlock()
}

11
go/storage/volume_super_block.go

@ -59,6 +59,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
if _, err = v.dataFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err)
@ -70,6 +71,16 @@ func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = ParseSuperBlock(header)
return err
}
func (v *Volume) writeSuperBlock() (err error) {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if _, e := v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0); e != nil {
return fmt.Errorf("cannot write volume %d super block: %v", v.Id, e)
}
return nil
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.version = Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {

1
go/weed/weed_server/volume_server.go

@ -58,6 +58,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler))
adminMux.HandleFunc("/admin/set_replica", vs.guard.WhiteList(vs.setVolumeReplicaHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

40
go/weed/weed_server/volume_server_handlers_replicate.go

@ -7,6 +7,7 @@ import (
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/pierrec/lz4"
)
@ -64,3 +65,42 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http
}
lz4w.Close()
}
type VolumeOptError struct {
Volume string `json:"volume"`
Err string `json:"err"`
}
func (vs *VolumeServer) setVolumeReplicaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
replica, e := storage.NewReplicaPlacementFromString(r.FormValue("replica"))
if e != nil {
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
errs := []VolumeOptError{}
for _, volume := range r.Form["volume"] {
if vid, e := storage.NewVolumeId(volume); e == nil {
if v := vs.store.GetVolume(vid); v != nil {
if e := v.SetReplica(replica); e != nil {
errs = append(errs, VolumeOptError{
Volume: volume,
Err: e.Error(),
})
}
}
} else {
errs = append(errs, VolumeOptError{
Volume: volume,
Err: e.Error(),
})
}
}
result := make(map[string]interface{})
if len(errs) > 0 {
result["error"] = "set volume replica error."
result["errors"] = errs
}
writeJson(w, r, http.StatusAccepted, result)
}
Loading…
Cancel
Save