From e4c0693b038983da37723e8fe039c015739f0d47 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 10 Sep 2012 17:08:52 -0700 Subject: [PATCH] replication related work on data nodes --- weed-fs/note/replication.txt | 60 +++++++++----- weed-fs/src/cmd/weed/volume.go | 24 +++++- weed-fs/src/pkg/storage/store.go | 64 +++++++++------ weed-fs/src/pkg/storage/volume.go | 11 ++- weed-fs/src/pkg/storage/volume_info.go | 88 ++++++++++++--------- weed-fs/src/pkg/topology/volume_layout.go | 28 ++++--- weed-fs/src/pkg/topology/volume_location.go | 40 +++++++--- 7 files changed, 204 insertions(+), 111 deletions(-) diff --git a/weed-fs/note/replication.txt b/weed-fs/note/replication.txt index 2a203f6e8..e2d895e6e 100644 --- a/weed-fs/note/replication.txt +++ b/weed-fs/note/replication.txt @@ -1,14 +1,15 @@ 1. each file can choose the replication factor 2. replication granularity is in volume level 3. if not enough spaces, we can automatically decrease some volume's the replication factor, especially for cold data -4. support migrating data to cheaper storage -5. manual volume placement, access-based volume placement, auction based volume placement +4. plan to support migrating data to cheaper storage +5. plan to manual volume placement, access-based volume placement, auction based volume placement When a new volume server is started, it reports 1. how many volumes it can hold - 2. current list of existing volumes + 2. current list of existing volumes and each volume's replication type Each volume server remembers: - 1. current volume ids, replica locations + 1. current volume ids + 2. replica locations are read from the master The master assign volume ids based on 1. replication factor @@ -17,12 +18,13 @@ The master assign volume ids based on On master, stores the replication configuration { replication:{ - {factor:1, min_volume_count:3, weight:10}, - {factor:2, min_volume_count:2, weight:20}, - {factor:3, min_volume_count:3, weight:30} + {type:"00", min_volume_count:3, weight:10}, + {type:"01", min_volume_count:2, weight:20}, + {type:"10", min_volume_count:2, weight:20}, + {type:"11", min_volume_count:3, weight:30}, + {type:"20", min_volume_count:2, weight:20} }, port:9333, - } Or manually via command line 1. add volume with specified replication factor @@ -35,8 +37,6 @@ if less than the replication factor, the volume is in readonly mode if more than the replication factor, the volume will purge the smallest/oldest volume if equal, the volume will function as usual -maybe use gossip to send the volumeServer~volumes information - Use cases: on volume server @@ -47,13 +47,33 @@ Use cases: Bootstrap 1. at the very beginning, the system has no volumes at all. - 2. if maxReplicationFactor==1, always initialize volumes right away - 3. if nServersHasFreeSpaces >= maxReplicationFactor, auto initialize - 4. if maxReplicationFactor>1 - weed shell - > disable_auto_initialize - > enable_auto_initialize - > assign_free_volume vid "server1:port","server2:port","server3:port" - > status - 5. - \ No newline at end of file +When data node starts: + 1. each data node send to master its existing volumes and max volume blocks + 2. master remembers the topology/data_center/rack/data_node/volumes + for each replication level, stores + volume id ~ data node + writable volume ids +If any "assign" request comes in + 1. find a writable volume with the right replicationLevel + 2. if not found, grow the volumes with the right replication level + 3. return a writable volume to the user + +For the above operations, here are the todo list: + for data node: + 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE + 2. accept command to grow a volume( id + replication level) DONE + /admin/assign_volume?volume=some_id&replicationType=01 + 3. accept status for a volumeLocationList if replication > 1 DONE + /admin/set_volume_locations?volumeLocations=[{Vid:xxx,Locations:[loc1,loc2,loc3]}] + 4. for each write, pass the write to the next location + POST method should accept an index, like ttl, get decremented every hop + for master: + 1. accept data node's report of existing volumes and maxVolumeCount + 2. periodically refresh for active data nodes, and adjust writable volumes + 3. send command to grow a volume(id + replication level) + 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info + to other data nodes. BECAUSE the master will stop sending writes to these data nodes + + + + \ No newline at end of file diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index db85c4e5b..deddf913a 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "log" "math/rand" "mime" @@ -38,9 +39,23 @@ var ( func statusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, store.Status()) } -func addVolumeHandler(w http.ResponseWriter, r *http.Request) { - store.AddVolume(r.FormValue("volume")) - writeJson(w, r, store.Status()) +func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} +func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) { + volumeLocationsList := new([]storage.VolumeLocations) + json.Unmarshal([]byte(r.FormValue("volumeLocations")), volumeLocationsList) + err := store.SetVolumeLocations(*volumeLocationsList) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } } func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -157,7 +172,8 @@ func runVolume(cmd *Command, args []string) bool { defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) - http.HandleFunc("/add_volume", addVolumeHandler) + http.HandleFunc("/admin/assign_volume", assignVolumeHandler) + http.HandleFunc("/admin/set_volume_locations", setVolumeLocationsHandler) go func() { for { diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 1dbb89c06..a48429f5b 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -5,9 +5,9 @@ import ( "errors" "log" "net/url" + "pkg/util" "strconv" "strings" - "pkg/util" ) type Store struct { @@ -22,12 +22,12 @@ func NewStore(port int, publicUrl, dirname string, volumeListString string) (s * s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} s.volumes = make(map[VolumeId]*Volume) - s.AddVolume(volumeListString) + s.AddVolume(volumeListString, "00") log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") return } -func (s *Store) AddVolume(volumeListString string) error { +func (s *Store) AddVolume(volumeListString string, replicationType string) error { for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -35,7 +35,7 @@ func (s *Store) AddVolume(volumeListString string) error { if err != nil { return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") } - s.addVolume(VolumeId(id)) + s.addVolume(VolumeId(id), NewReplicationType(replicationType)) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -47,24 +47,24 @@ func (s *Store) AddVolume(volumeListString string) error { return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") } for id := start; id <= end; id++ { - s.addVolume(VolumeId(id)) + s.addVolume(VolumeId(id), NewReplicationType(replicationType)) } } } return nil } -func (s *Store) addVolume(vid VolumeId) error { +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { if s.volumes[vid] != nil { return errors.New("Volume Id " + vid.String() + " already exists!") } - s.volumes[vid] = NewVolume(s.dir, vid) + s.volumes[vid] = NewVolume(s.dir, vid, replicationType) return nil } func (s *Store) Status() *[]*VolumeInfo { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size = VolumeId(k), v.Size() + s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType *stats = append(*stats, s) } return stats @@ -73,7 +73,7 @@ func (s *Store) Join(mserver string) { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size = VolumeId(k), v.Size() + s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) @@ -89,23 +89,39 @@ func (s *Store) Close() { } } func (s *Store) Write(i VolumeId, n *Needle) uint32 { - v := s.volumes[i] - if v!=nil{ - return v.write(n) - } - return 0 + v := s.volumes[i] + if v != nil { + return v.write(n) + } + return 0 } func (s *Store) Delete(i VolumeId, n *Needle) uint32 { - v := s.volumes[i] - if v!=nil{ - return v.delete(n) - } - return 0 + v := s.volumes[i] + if v != nil { + return v.delete(n) + } + return 0 } func (s *Store) Read(i VolumeId, n *Needle) (int, error) { - v := s.volumes[i] - if v!=nil{ - return v.read(n) - } - return 0, errors.New("Not Found") + v := s.volumes[i] + if v != nil { + return v.read(n) + } + return 0, errors.New("Not Found") +} + +type VolumeLocations struct { + Vid VolumeId + Locations []string +} + +func (s *Store) SetVolumeLocations(volumeLocationList []VolumeLocations) error { + for _, volumeLocations := range volumeLocationList { + vid := volumeLocations.Vid + v := s.volumes[vid] + if v != nil { + v.locations = volumeLocations.Locations + } + } + return nil } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 47affeca5..de39043b3 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -18,12 +18,17 @@ type Volume struct { dataFile *os.File nm *NeedleMap + replicaType ReplicationType + accessLock sync.Mutex + + //transient + locations []string } -func NewVolume(dirname string, id VolumeId) (v *Volume) { +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { var e error - v = &Volume{dir: dirname, Id: id} + v = &Volume{dir: dirname, Id: id, replicaType:replicationType} fileName := id.String() v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { @@ -53,7 +58,7 @@ func (v *Volume) maybeWriteSuperBlock() { stat, _ := v.dataFile.Stat() if stat.Size() == 0 { header := make([]byte, SuperBlockSize) - header[0] = 1 //number of copies + header[0] = byte(v.replicaType) v.dataFile.Write(header) } } diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index 2d1538bbe..eca87f8c3 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -3,48 +3,64 @@ package storage import () type VolumeInfo struct { - Id VolumeId - Size int64 - ReplicationType ReplicationType + Id VolumeId + Size int64 + RepType ReplicationType } -type ReplicationType int +type ReplicationType byte const ( - Copy00 = ReplicationType(00) // single copy - Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center - Copy10 = ReplicationType(10) // 2 copies, each on different data center - Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center - LengthRelicationType = 5 + Copy00 = ReplicationType(00) // single copy + Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center + Copy10 = ReplicationType(10) // 2 copies, each on different data center + Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center + LengthRelicationType = 5 ) -func GetReplicationLevelIndex(v *VolumeInfo) int { - switch v.ReplicationType { - case Copy00: - return 0 - case Copy01: - return 1 - case Copy10: - return 2 - case Copy11: - return 3 - case Copy20: - return 4 +func NewReplicationType(t string) ReplicationType { + switch t { + case "00": + return Copy00 + case "01": + return Copy01 + case "10": + return Copy10 + case "11": + return Copy11 + case "20": + return Copy20 } - return -1 + return Copy00 +} + +func GetReplicationLevelIndex(v *VolumeInfo) int { + switch v.RepType { + case Copy00: + return 0 + case Copy01: + return 1 + case Copy10: + return 2 + case Copy11: + return 3 + case Copy20: + return 4 + } + return -1 } func GetCopyCount(v *VolumeInfo) int { - switch v.ReplicationType { - case Copy00: - return 1 - case Copy01: - return 2 - case Copy10: - return 2 - case Copy11: - return 3 - case Copy20: - return 3 - } - return 0 + switch v.RepType { + case Copy00: + return 1 + case Copy01: + return 2 + case Copy10: + return 2 + case Copy11: + return 3 + case Copy20: + return 3 + } + return 0 } diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index f11ea430a..48d76325e 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -1,7 +1,7 @@ package topology import ( - "errors" + "errors" "fmt" "math/rand" "pkg/storage" @@ -29,20 +29,22 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } vl.vid2location[v.Id].Add(dn) if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) { - vl.writables = append(vl.writables,v.Id) + if uint64(v.Size) < vl.volumeSizeLimit { + vl.writables = append(vl.writables, v.Id) + } } } func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) { - len_writers := len(vl.writables) - if len_writers <= 0 { - fmt.Println("No more writable volumes!") - return 0, nil, errors.New("No more writable volumes!") - } - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { - return count, locationList, nil - } - return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + len_writers := len(vl.writables) + if len_writers <= 0 { + fmt.Println("No more writable volumes!") + return 0, nil, errors.New("No more writable volumes!") + } + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return count, locationList, nil + } + return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index c61ee119b..d814a07ca 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -1,21 +1,39 @@ package topology -import ( -) +import () type DataNodeLocationList struct { - list []*DataNode + list []*DataNode } func NewDataNodeLocationList() *DataNodeLocationList { - return &DataNodeLocationList{} + return &DataNodeLocationList{} } -func (dnll *DataNodeLocationList) Add(loc *DataNode){ - for _, dnl := range dnll.list { - if loc.ip == dnl.ip && loc.port == dnl.port { - break - } - } - dnll.list = append(dnll.list, loc) +func (dnll *DataNodeLocationList) Add(loc *DataNode) { + for _, dnl := range dnll.list { + if loc.ip == dnl.ip && loc.port == dnl.port { + break + } + } + dnll.list = append(dnll.list, loc) +} + +func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { + var changed bool + for _, dnl := range dnll.list { + if dnl.lastSeen < freshThreshHold { + changed = true + break + } + } + if changed { + var l []*DataNode + for _, dnl := range dnll.list { + if dnl.lastSeen >= freshThreshHold { + l = append(l, dnl) + } + } + dnll.list = l + } }