From 4a7833f1bf5c98745b06362223ba589f371446be Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 12 Sep 2012 01:07:23 -0700 Subject: [PATCH] replication related work --- weed-fs/note/replication.txt | 22 +++++++++------ weed-fs/src/cmd/weed/volume.go | 28 ++++++++++++------- weed-fs/src/pkg/admin/storage.go | 31 +++++++++++++++++++++ weed-fs/src/pkg/storage/volume_info.go | 15 ++++++++++ weed-fs/src/pkg/topology/data_node.go | 6 ++-- weed-fs/src/pkg/topology/volume_location.go | 2 +- 6 files changed, 81 insertions(+), 23 deletions(-) create mode 100644 weed-fs/src/pkg/admin/storage.go diff --git a/weed-fs/note/replication.txt b/weed-fs/note/replication.txt index e2d895e6e..be5bea446 100644 --- a/weed-fs/note/replication.txt +++ b/weed-fs/note/replication.txt @@ -58,22 +58,26 @@ If any "assign" request comes in 2. if not found, grow the volumes with the right replication level 3. return a writable volume to the user + +Plan: + Step 1. implement one copy(no replication), automatically assign volume ids + Step 2. add replication + For the above operations, here are the todo list: for data node: 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE 2. accept command to grow a volume( id + replication level) DONE /admin/assign_volume?volume=some_id&replicationType=01 - 3. accept status for a volumeLocationList if replication > 1 DONE - /admin/set_volume_locations?volumeLocations=[{Vid:xxx,Locations:[loc1,loc2,loc3]}] - 4. for each write, pass the write to the next location + 3. accept setting volumeLocationList DONE + /admin/set_volume_locations_list?volumeLocationsList=[{Vid:xxx,Locations:[loc1,loc2,loc3]}] + 4. for each write, pass the write to the next location, (Step 2) POST method should accept an index, like ttl, get decremented every hop for master: - 1. accept data node's report of existing volumes and maxVolumeCount + 1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join 2. periodically refresh for active data nodes, and adjust writable volumes - 3. send command to grow a volume(id + replication level) + 3. send command to grow a volume(id + replication level) DONE 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info to other data nodes. BECAUSE the master will stop sending writes to these data nodes - - - - \ No newline at end of file + 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup + + diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index deddf913a..c8a890c56 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -1,7 +1,7 @@ package main import ( - "encoding/json" + "encoding/json" "log" "math/rand" "mime" @@ -40,6 +40,9 @@ func statusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, store.Status()) } func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + if *IsDebug { + log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType")) + } err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) if err == nil { writeJson(w, r, map[string]string{"error": ""}) @@ -48,14 +51,19 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { } } func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) { - volumeLocationsList := new([]storage.VolumeLocations) - json.Unmarshal([]byte(r.FormValue("volumeLocations")), volumeLocationsList) - err := store.SetVolumeLocations(*volumeLocationsList) - if err == nil { - writeJson(w, r, map[string]string{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } + if *IsDebug { + log.Println("volumeLocationsList =", r.FormValue("volumeLocationsList")) + } + volumeLocationsList := new([]storage.VolumeLocations) + err := json.Unmarshal([]byte(r.FormValue("volumeLocationsList")), volumeLocationsList) + if err == nil { + err = store.SetVolumeLocations(*volumeLocationsList) + } + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } } func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -173,7 +181,7 @@ func runVolume(cmd *Command, args []string) bool { http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler) - http.HandleFunc("/admin/set_volume_locations", setVolumeLocationsHandler) + http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler) go func() { for { diff --git a/weed-fs/src/pkg/admin/storage.go b/weed-fs/src/pkg/admin/storage.go new file mode 100644 index 000000000..a8b9d185a --- /dev/null +++ b/weed-fs/src/pkg/admin/storage.go @@ -0,0 +1,31 @@ +package admin + +import ( + "encoding/json" + "errors" + "strconv" + "net/url" + "pkg/util" + "pkg/storage" + "pkg/topology" +) + +type AllocateVolumeResult struct { + error string +} + +func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{ + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values) + var ret AllocateVolumeResult + err := json.Unmarshal(jsonBlob, &ret) + if err != nil { + return err + } + if ret.error != "" { + return errors.New(ret.error) + } + return nil +} diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index eca87f8c3..4bf2b2171 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -33,6 +33,21 @@ func NewReplicationType(t string) ReplicationType { } return Copy00 } +func (r *ReplicationType) String() string { + switch *r { + case Copy00: + return "00" + case Copy01: + return "01" + case Copy10: + return "10" + case Copy11: + return "11" + case Copy20: + return "20" + } + return "00" +} func GetReplicationLevelIndex(v *VolumeInfo) int { switch v.RepType { diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index ea74dd8e0..4d6f18bea 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -8,9 +8,9 @@ import ( type DataNode struct { NodeImpl volumes map[storage.VolumeId]*storage.VolumeInfo - ip string - port int - publicUrl string + Ip string + Port int + PublicUrl string lastSeen int64 // unix time in seconds } diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index d814a07ca..be8218898 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -12,7 +12,7 @@ func NewDataNodeLocationList() *DataNodeLocationList { func (dnll *DataNodeLocationList) Add(loc *DataNode) { for _, dnl := range dnll.list { - if loc.ip == dnl.ip && loc.port == dnl.port { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { break } }