From 30746b7ec6228f13e0158b240d5df1d0e5f14a28 Mon Sep 17 00:00:00 2001 From: tnextday Date: Thu, 10 Dec 2015 11:43:00 +0800 Subject: [PATCH] set_replica allow set all or by collection --- go/storage/store.go | 13 +++++++ go/storage/volume.go | 10 +++-- .../volume_server_handlers_replicate.go | 39 ++++++++++++++----- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/go/storage/store.go b/go/storage/store.go index ebf01d09f..6c7871084 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -387,3 +387,16 @@ func (s *Store) HasVolume(i VolumeId) bool { v := s.findVolume(i) return v != nil } + +type VolumeWalker func(v *Volume) (e error) + +func (s *Store) WalkVolume(walker VolumeWalker) error{ + for _, location := range s.Locations { + for _, v := range location.volumes { + if e := walker(v); e != nil { + return e + } + } + } + return nil +} diff --git a/go/storage/volume.go b/go/storage/volume.go index bae2e4aee..124990341 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -427,11 +427,13 @@ func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { return false } - -func (v *Volume) SetReplica(replica *ReplicaPlacement) error{ - if v.ReplicaPlacement.String() == replica.String(){ +func (v *Volume) SetReplica(replica *ReplicaPlacement) error { + if replica == nil { + replica, _ = NewReplicaPlacementFromString("000") + } + if v.ReplicaPlacement.String() == replica.String() { return nil } v.ReplicaPlacement = replica return v.writeSuperBlock() -} \ No newline at end of file +} diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go index 866c808f8..6efd2546c 100644 --- a/go/weed/weed_server/volume_server_handlers_replicate.go +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" "github.com/pierrec/lz4" + "strings" ) func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { @@ -79,23 +80,43 @@ func (vs *VolumeServer) setVolumeReplicaHandler(w http.ResponseWriter, r *http.R return } errs := []VolumeOptError{} - for _, volume := range r.Form["volume"] { - if vid, e := storage.NewVolumeId(volume); e == nil { - if v := vs.store.GetVolume(vid); v != nil { + all, _ := strconv.ParseBool(r.FormValue("all")) + if all { + vs.store.WalkVolume(func(v *storage.Volume) (e error) { + if e := v.SetReplica(replica); e != nil { + errs = append(errs, VolumeOptError{ + Volume: v.Id.String(), + Err: e.Error(), + }) + } + return nil + }) + } else { + volumesSet := make(map[string]bool) + for _, volume := range r.Form["volume"] { + volumesSet[strings.TrimSpace(volume)] = true + } + collectionsSet := make(map[string]bool) + for _, c := range r.Form["collection"] { + collectionsSet[strings.TrimSpace(c)] = true + } + if len(collectionsSet) > 0 || len(volumesSet) > 0 { + vs.store.WalkVolume(func(v *storage.Volume) (e error) { + if !collectionsSet[v.Collection] && !volumesSet[v.Id.String()] { + return nil + } if e := v.SetReplica(replica); e != nil { errs = append(errs, VolumeOptError{ - Volume: volume, + Volume: v.Id.String(), Err: e.Error(), }) } - } - } else { - errs = append(errs, VolumeOptError{ - Volume: volume, - Err: e.Error(), + return nil }) } + } + result := make(map[string]interface{}) if len(errs) > 0 { result["error"] = "set volume replica error."