diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 29b05efce..72c0d6f91 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -5,7 +5,9 @@ import ( "log" "net/http" "pkg/directory" + "pkg/replication" "pkg/storage" + "pkg/topology" "strconv" "strings" "time" @@ -29,11 +31,14 @@ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") - mapper *directory.Mapper volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") ) +var mapper *directory.Mapper +var topo *topology.Topology +var vg *replication.VolumeGrowth + func dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid := r.FormValue("volumeId") commaSep := strings.Index(vid, ",") @@ -43,10 +48,10 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { volumeId, _ := storage.NewVolumeId(vid) machines, e := mapper.Get(volumeId) if e == nil { - ret:= []map[string]string{} - for _, machine := range machines { - ret = append(ret,map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) - } + ret := []map[string]string{} + for _, machine := range machines { + ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) + } writeJson(w, r, ret) } else { log.Println("Invalid volume id", volumeId) @@ -62,7 +67,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, map[string]string{"error": err.Error()}) } } +func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { + c, _ := strconv.Atoi(r.FormValue("count")) + rt := storage.NewReplicationType(r.FormValue("replication")) + if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { + if topo.FreeSpace() <= 0 { + writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + } else { + vg.GrowByType(rt, topo) + } + } + fid, count, dn, err := topo.PickForWrite(rt, c) + if err == nil { + writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Ip + ":" + strconv.Itoa(dn.Port), "publicUrl": dn.PublicUrl, "count": count}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + ip := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) @@ -71,20 +96,40 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { log.Println(s, "volumes", r.FormValue("volumes")) } mapper.Add(directory.NewMachine(s, publicUrl, *volumes, time.Now().Unix())) + + //new ways + topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) } -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { +func dirOldStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, mapper) } +func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) { + writeJson(w, r, topo.ToMap()) +} +func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + rt := storage.NewReplicationType(r.FormValue("replication")) + count, err := strconv.Atoi(r.FormValue("count")) + if err != nil { + vg.GrowByType(rt, topo) + } else { + vg.GrowByCountAndType(count, rt, topo) + } +} func runMaster(cmd *Command, args []string) bool { + topo = topology.NewTopology("topo", *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + vg = replication.NewDefaultVolumeGrowth() log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/assign2", dirAssign2Handler) http.HandleFunc("/dir/lookup", dirLookupHandler) http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) + http.HandleFunc("/dir/status", dirOldStatusHandler) + http.HandleFunc("/dir/status2", dirNewStatusHandler) //temporary + http.HandleFunc("/vol/grow", volumeGrowHandler) - mapper.StartRefreshWritableVolumes() + mapper.StartRefreshWritableVolumes() log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index 5cfb6ee97..8502ca417 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -39,19 +39,22 @@ type AssignResult struct { Error string "error" } -func assign(count int) (AssignResult, error) { +func assign(count int) (*AssignResult, error) { values := make(url.Values) values.Add("count", strconv.Itoa(count)) - jsonBlob := util.Post("http://"+*server+"/dir/assign", values) + jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) + if err != nil { + return nil, err + } var ret AssignResult - err := json.Unmarshal(jsonBlob, &ret) + err = json.Unmarshal(jsonBlob, &ret) if err != nil { - return ret, err + return nil, err } if ret.Count <= 0 { - return ret, errors.New(ret.Error) + return nil, errors.New(ret.Error) } - return ret, nil + return &ret, nil } type UploadResult struct { diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 0b7ef9671..39faaffec 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -6,6 +6,7 @@ import ( "math/rand" "mime" "net/http" + "os" "pkg/storage" "strconv" "strings" @@ -18,7 +19,7 @@ func init() { } var cmdVolume = &Command{ - UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333", + UsageLine: "volume -port=8080 -dir=/tmp -min=3 -max=5 -publicUrl=server_name:8080 -mserver=localhost:9333", Short: "start a volume server", Long: `start a volume server to provide storage spaces @@ -26,12 +27,13 @@ var cmdVolume = &Command{ } var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") - volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") - publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") + volumes = cmdVolume.Flag.String("volumes", "", "comma-separated list, or ranges of volume ids") + publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") + masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") + vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + maxVolumeCount = cmdVolume.Flag.Int("maxVolumeCount", 5, "maximum number of volumes") store *storage.Store ) @@ -46,9 +48,9 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { } else { writeJson(w, r, map[string]string{"error": err.Error()}) } - if *IsDebug { - log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) - } + if *IsDebug { + log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) + } } func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) { if *IsDebug { @@ -86,11 +88,15 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { } cookie := n.Cookie count, e := store.Read(volumeId, n) + if e != nil { + w.WriteHeader(404) + } if *IsDebug { log.Println("read bytes", count, "error", e) } if n.Cookie != cookie { log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(404) return } if ext != "" { @@ -175,14 +181,25 @@ func parseURLPath(path string) (vid, fid, ext string) { } func runVolume(cmd *Command, args []string) bool { + fileInfo, err := os.Stat(*volumeFolder) //TODO: now default to 1G, this value should come from server? - store = storage.NewStore(*vport, *publicUrl, *chunkFolder, *volumes) + if err!=nil{ + log.Fatalf("No Existing Folder:%s", *volumeFolder) + } + if !fileInfo.IsDir() { + log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) + } + perm:=fileInfo.Mode().Perm() + log.Println("Volume Folder permission:", perm) + + store = storage.NewStore(*vport, *publicUrl, *volumeFolder, *maxVolumeCount, *volumes) defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler) http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler) + go func() { for { store.Join(*masterNode) diff --git a/weed-fs/src/pkg/admin/storage.go b/weed-fs/src/pkg/admin/storage.go index 8e78b8697..8d9e8a103 100644 --- a/weed-fs/src/pkg/admin/storage.go +++ b/weed-fs/src/pkg/admin/storage.go @@ -1,35 +1,10 @@ package admin import ( - "encoding/json" - "errors" - "strconv" - "net/url" - "pkg/util" "pkg/storage" "pkg/topology" ) -type AllocateVolumeResult struct { - Error string -} - -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{ - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values) - var ret AllocateVolumeResult - err := json.Unmarshal(jsonBlob, &ret) - if err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} - func SendVolumeLocationsList(t *topology.Topology, vid storage.VolumeId) error{ // values := make(url.Values) // values.Add("volumeLocationsList", vid.String()) diff --git a/weed-fs/src/pkg/admin/storage_test.go b/weed-fs/src/pkg/admin/storage_test.go deleted file mode 100644 index ecc2ab22e..000000000 --- a/weed-fs/src/pkg/admin/storage_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package admin - -import ( - "log" - "pkg/storage" - "pkg/topology" - "testing" -) - -func TestXYZ(t *testing.T) { - dn := topology.NewDataNode("server1") - dn.Ip = "localhost" - dn.Port = 8080 - vid, _:= storage.NewVolumeId("6") - out := AllocateVolume(dn,vid,storage.Copy00) - log.Println(out) -} diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 3269fcbc6..f6dec8a65 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -1,10 +1,15 @@ package replication import ( + "encoding/json" + "errors" "fmt" "math/rand" + "net/url" "pkg/storage" "pkg/topology" + "pkg/util" + "strconv" ) /* @@ -22,17 +27,36 @@ type VolumeGrowth struct { copyAll int } -func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) { - switch copyLevel { - case 1: - for i := 0; i < vg.copy1factor; i++ { +func NewDefaultVolumeGrowth() *VolumeGrowth { + return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} +} + +func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) { + switch repType { + case storage.Copy00: + vg.GrowByCountAndType(vg.copy1factor, repType, topo) + case storage.Copy10: + vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy20: + vg.GrowByCountAndType(vg.copy3factor, repType, topo) + case storage.Copy01: + vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy11: + vg.GrowByCountAndType(vg.copy3factor, repType, topo) + } + +} +func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) { + switch repType { + case storage.Copy00: + for i := 0; i < count; i++ { ret, server, vid := topo.RandomlyReserveOneVolume() if ret { - vg.Grow(vid, server) + vg.grow(topo, *vid, repType, server) } } - case 20: - for i := 0; i < vg.copy2factor; i++ { + case storage.Copy10: + for i := 0; i < count; i++ { nl := topology.NewNodeList(topo.Children(), nil) picked, ret := nl.RandomlyPickN(2) vid := topo.NextVolumeId() @@ -44,12 +68,12 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) { } } if len(servers) == 2 { - vg.Grow(vid, servers[0], servers[1]) + vg.grow(topo, vid, repType, servers[0], servers[1]) } } } - case 30: - for i := 0; i < vg.copy3factor; i++ { + case storage.Copy20: + for i := 0; i < count; i++ { nl := topology.NewNodeList(topo.Children(), nil) picked, ret := nl.RandomlyPickN(3) vid := topo.NextVolumeId() @@ -61,12 +85,12 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) { } } if len(servers) == 3 { - vg.Grow(vid, servers[0], servers[1], servers[2]) + vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]) } } } - case 02: - for i := 0; i < vg.copy2factor; i++ { + case storage.Copy01: + for i := 0; i < count; i++ { //randomly pick one server, and then choose from the same rack ret, server1, vid := topo.RandomlyReserveOneVolume() if ret { @@ -74,22 +98,53 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) { exclusion := make(map[string]topology.Node) exclusion[server1.String()] = server1 newNodeList := topology.NewNodeList(rack.Children(), exclusion) - ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid) - if ret2 { - vg.Grow(vid, server1, server2) + if newNodeList.FreeSpace() > 0 { + ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid) + if ret2 { + vg.grow(topo, *vid, repType, server1, server2) + } } } } - case 12: - for i := 0; i < vg.copy3factor; i++ { + case storage.Copy11: + for i := 0; i < count; i++ { } } } -func (vg *VolumeGrowth) Grow(vid storage.VolumeId, servers ...*topology.DataNode) { +func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) { for _, server := range servers { - vi := &storage.VolumeInfo{Id: vid, Size: 0} - server.AddVolume(vi) + if err := AllocateVolume(server, vid, repType); err == nil { + vi := &storage.VolumeInfo{Id: vid, Size: 0} + server.AddOrUpdateVolume(vi) + topo.RegisterVolumeLayout(vi, server) + fmt.Println("added", vid, "to", server) + } else { + //TODO: need error handling + fmt.Println("Failed to assign", vid, "to", servers) + } } fmt.Println("Assigning", vid, "to", servers) } + +type AllocateVolumeResult struct { + Error string +} + +func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go index f8e441f03..5068577b1 100644 --- a/weed-fs/src/pkg/replication/volume_growth_test.go +++ b/weed-fs/src/pkg/replication/volume_growth_test.go @@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork") + topo := topology.NewTopology("mynetwork","/tmp","testing",32*1024, 5) mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { dc := topology.NewDataCenter(dcKey) @@ -97,7 +97,7 @@ func setup(topologyLayout string) *topology.Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} - server.AddVolume(vi) + server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) } @@ -125,5 +125,15 @@ func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) rand.Seed(time.Now().UnixNano()) vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} - vg.GrowVolumeCopy(20,topo) + vg.GrowByType(storage.Copy20,topo) +} + + +func TestXYZ(t *testing.T) { + dn := topology.NewDataNode("server1") + dn.Ip = "localhost" + dn.Port = 8080 + vid, _:= storage.NewVolumeId("600") + out := AllocateVolume(dn,vid,storage.Copy00) + fmt.Println(out) } diff --git a/weed-fs/src/pkg/storage/storage_limit.go b/weed-fs/src/pkg/storage/storage_limit.go deleted file mode 100644 index b40618046..000000000 --- a/weed-fs/src/pkg/storage/storage_limit.go +++ /dev/null @@ -1,13 +0,0 @@ -package storage - -import ( -) - -type StorageLimit struct { - sizeLimit uint64 -} - -func NewStorageLimit(desiredLimit uint64) *StorageLimit { - sl := &StorageLimit{sizeLimit: desiredLimit} - return sl -} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 6bc1ec028..a0238d99d 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -12,19 +12,20 @@ import ( ) type Store struct { - volumes map[VolumeId]*Volume - dir string - Port int - PublicUrl string - Limit StorageLimit + volumes map[VolumeId]*Volume + dir string + Port int + PublicUrl string + MaxVolumeCount int } -func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *Store) { - s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} +func NewStore(port int, publicUrl, dirname string, maxVolumeCount int, volumeListString string) (s *Store) { + s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount} s.volumes = make(map[VolumeId]*Volume) - s.loadExistingVolumes() - s.AddVolume(volumeListString, "00") + if volumeListString != "" { + s.AddVolume(volumeListString, "00") + } log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString) return @@ -82,16 +83,16 @@ func (s *Store) loadExistingVolumes() { } } } -func (s *Store) Status() *[]*VolumeInfo { - stats := new([]*VolumeInfo) +func (s *Store) Status() []*VolumeInfo { + var stats []*VolumeInfo for k, v := range s.volumes { s := new(VolumeInfo) s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType - *stats = append(*stats, s) + stats = append(stats, s) } return stats } -func (s *Store) Join(mserver string) { +func (s *Store) Join(mserver string) error { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := new(VolumeInfo) @@ -103,7 +104,9 @@ func (s *Store) Join(mserver string) { values.Add("port", strconv.Itoa(s.Port)) values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) - util.Post("http://"+mserver+"/dir/join", values) + values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + _, err := util.Post("http://"+mserver+"/dir/join", values) + return err } func (s *Store) Close() { for _, v := range s.volumes { @@ -111,22 +114,19 @@ func (s *Store) Close() { } } func (s *Store) Write(i VolumeId, n *Needle) uint32 { - v := s.volumes[i] - if v != nil { + if v := s.volumes[i]; v != nil { return v.write(n) } return 0 } func (s *Store) Delete(i VolumeId, n *Needle) uint32 { - v := s.volumes[i] - if v != nil { + if v := s.volumes[i]; v != nil { return v.delete(n) } return 0 } func (s *Store) Read(i VolumeId, n *Needle) (int, error) { - v := s.volumes[i] - if v != nil { + if v := s.volumes[i]; v != nil { return v.read(n) } return 0, errors.New("Not Found") @@ -140,8 +140,7 @@ type VolumeLocations struct { func (s *Store) SetVolumeLocations(volumeLocationList []VolumeLocations) error { for _, volumeLocations := range volumeLocationList { vid := volumeLocations.Vid - v := s.volumes[vid] - if v != nil { + if v := s.volumes[vid]; v != nil { v.locations = volumeLocations.Locations } } diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index c9ed5265c..01a56a30c 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -50,8 +50,8 @@ func (r *ReplicationType) String() string { return "00" } -func GetReplicationLevelIndex(v *VolumeInfo) int { - switch v.RepType { +func GetReplicationLevelIndex(repType ReplicationType) int { + switch repType { case Copy00: return 0 case Copy01: @@ -65,8 +65,8 @@ func GetReplicationLevelIndex(v *VolumeInfo) int { } return -1 } -func GetCopyCount(v *VolumeInfo) int { - switch v.RepType { +func GetCopyCount(repType ReplicationType) int { + switch repType { case Copy00: return 1 case Copy01: diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 48466e258..5edf7c6eb 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,6 +1,7 @@ package topology -import () +import ( +) type DataCenter struct { NodeImpl @@ -33,3 +34,15 @@ func (dc *DataCenter) GetOrCreateRack(ip string) *Rack { dc.LinkChildNode(rack) return rack } + +func (dc *DataCenter) ToMap() interface{}{ + m := make(map[string]interface{}) + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m +} diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 254754ccd..1516572fd 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -22,22 +22,37 @@ func NewDataNode(id string) *DataNode { return s } func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { - dn.AddVolume(&storage.VolumeInfo{Id: vid}) + dn.AddOrUpdateVolume(&storage.VolumeInfo{Id: vid}) return vid } -func (dn *DataNode) AddVolume(v *storage.VolumeInfo) { - dn.volumes[v.Id] = v - dn.UpAdjustActiveVolumeCountDelta(1) - dn.UpAdjustMaxVolumeId(v.Id) +func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { + if dn.volumes[v.Id] == nil { + dn.volumes[v.Id] = v + dn.UpAdjustActiveVolumeCountDelta(1) + dn.UpAdjustMaxVolumeId(v.Id) + }else{ + dn.volumes[v.Id] = v + } } func (dn *DataNode) GetTopology() *Topology { - p := dn.parent - for p.Parent()!=nil{ - p = p.Parent() - } - t := p.(*Topology) - return t + p := dn.parent + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t } func (dn *DataNode) MatchLocation(ip string, port int) bool { - return dn.Ip == ip && dn.Port == port + return dn.Ip == ip && dn.Port == port +} + +func (dn *DataNode) ToMap() interface{} { + ret := make(map[string]interface{}) + ret["Ip"] = dn.Ip + ret["Port"] = dn.Port + ret["Volumes"] = dn.GetActiveVolumeCount() + ret["MaxVolumeCount"] = dn.GetMaxVolumeCount() + ret["FreeVolumeCount"] = dn.FreeSpace() + ret["PublicUrl"] = dn.PublicUrl + return ret } diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index fb610bd73..cdd9accd4 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -127,12 +127,10 @@ func (n *NodeImpl) GetMaxVolumeCount() int { func (n *NodeImpl) LinkChildNode(node Node) { if n.children[node.Id()] == nil { n.children[node.Id()] = node - n.activeVolumeCount += node.GetActiveVolumeCount() - n.maxVolumeCount += node.GetMaxVolumeCount() + n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) + n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.setParent(n) - if n.maxVolumeId < node.GetMaxVolumeId() { - n.maxVolumeId = node.GetMaxVolumeId() - } fmt.Println(n, "adds", node, "volumeCount =", n.activeVolumeCount) } } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index a52223887..8a09d0bfe 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -24,7 +24,7 @@ func (r *Rack) MatchLocationRange(ip string) bool{ return r.ipRange.Match(ip) } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataNode{ +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode{ for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip,port) { @@ -35,6 +35,19 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataN dn.Ip = ip dn.Port = port dn.PublicUrl = publicUrl + dn.maxVolumeCount = maxVolumeCount r.LinkChildNode(dn) return dn } + +func (rack *Rack) ToMap() interface{}{ + m := make(map[string]interface{}) + m["Free"] = rack.FreeSpace() + var dns []interface{} + for _, c := range rack.Children() { + dn := c.(*DataNode) + dns = append(dns, dn.ToMap()) + } + m["DataNodes"] = dns + return m +} diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 2366c29d0..7a96665af 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -1,8 +1,9 @@ package topology import ( - _ "fmt" + "errors" "math/rand" + "pkg/directory" "pkg/sequence" "pkg/storage" ) @@ -31,10 +32,14 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin t.sequence = sequence.NewSequencer(dirname, filename) return t } -func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, storage.VolumeId) { + +func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { + if t.FreeSpace()<=0 { + return false, nil, nil + } vid := t.NextVolumeId() ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, vid + return ret, node, &vid } func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, storage.VolumeId) { @@ -52,30 +57,68 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) registerVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - replicationTypeIndex := storage.GetReplicationLevelIndex(v) - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse) - } - t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn) +func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { + replicationTypeIndex := storage.GetReplicationLevelIndex(repType) + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) + if err != nil { + return "", 0, nil, errors.New("No writable volumes avalable!") + } + fileId, count := t.sequence.NextFileId(count) + return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil +} + +func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { + replicationTypeIndex := storage.GetReplicationLevelIndex(repType) + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + return t.replicaType2VolumeLayout[replicationTypeIndex] } -func (t *Topology) RegisterVolume(v *storage.VolumeInfo, ip string, port int, publicUrl string) { +func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { + t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) +} + +func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { dc := t.GetOrCreateDataCenter(ip) rack := dc.GetOrCreateRack(ip) - dn := rack.GetOrCreateDataNode(ip, port, publicUrl) - dn.AddVolume(v) - t.registerVolumeLayout(v,dn) + dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + for _, v := range volumeInfos { + dn.AddOrUpdateVolume(&v) + t.RegisterVolumeLayout(&v, dn) + } } -func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter{ - for _, c := range t.Children() { - dc := c.(*DataCenter) - if dc.MatchLocationRange(ip) { - return dc - } - } - dc := NewDataCenter("DefaultDataCenter") - t.LinkChildNode(dc) - return dc +func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { + for _, c := range t.Children() { + dc := c.(*DataCenter) + if dc.MatchLocationRange(ip) { + return dc + } + } + dc := NewDataCenter("DefaultDataCenter") + t.LinkChildNode(dc) + return dc +} + +func (t *Topology) ToMap() interface{} { + m := make(map[string]interface{}) + m["Free"] = t.FreeSpace() + var dcs []interface{} + for _, c := range t.Children() { + dc := c.(*DataCenter) + dcs = append(dcs, dc.ToMap()) + } + m["DataCenters"] = dcs + var layouts []interface{} + for _, layout := range t.replicaType2VolumeLayout { + if layout != nil { + layouts = append(layouts, layout.ToMap()) + } + } + m["layouts"] = layouts + return m } diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 48d76325e..99859668e 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -8,14 +8,16 @@ import ( ) type VolumeLayout struct { + repType storage.ReplicationType vid2location map[storage.VolumeId]*DataNodeLocationList writables []storage.VolumeId // transient array of writable volume id pulse int64 volumeSizeLimit uint64 } -func NewVolumeLayout(volumeSizeLimit uint64, pulse int64) *VolumeLayout { +func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { return &VolumeLayout{ + repType: repType, vid2location: make(map[storage.VolumeId]*DataNodeLocationList), writables: *new([]storage.VolumeId), pulse: pulse, @@ -27,24 +29,37 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewDataNodeLocationList() } - vl.vid2location[v.Id].Add(dn) - if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) { - if uint64(v.Size) < vl.volumeSizeLimit { - vl.writables = append(vl.writables, v.Id) + if vl.vid2location[v.Id].Add(dn) { + if len(vl.vid2location[v.Id].list) == storage.GetCopyCount(v.RepType) { + if uint64(v.Size) < vl.volumeSizeLimit { + vl.writables = append(vl.writables, v.Id) + } } } } -func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *DataNodeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { fmt.Println("No more writable volumes!") - return 0, nil, errors.New("No more writable volumes!") + return nil, 0, nil, errors.New("No more writable volumes!") } vid := vl.writables[rand.Intn(len_writers)] locationList := vl.vid2location[vid] if locationList != nil { - return count, locationList, nil + return &vid, count, locationList, nil } - return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") +} + +func (vl *VolumeLayout) GetActiveVolumeCount() int { + return len(vl.writables) +} + +func (vl *VolumeLayout) ToMap() interface{} { + m := make(map[string]interface{}) + m["replication"] = vl.repType.String() + m["writables"] = vl.writables + //m["locations"] = vl.vid2location + return m } diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index be8218898..92d89ae46 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -10,13 +10,18 @@ func NewDataNodeLocationList() *DataNodeLocationList { return &DataNodeLocationList{} } -func (dnll *DataNodeLocationList) Add(loc *DataNode) { +func (dnll *DataNodeLocationList) Head() *DataNode { + return dnll.list[0] +} + +func (dnll *DataNodeLocationList) Add(loc *DataNode) bool { for _, dnl := range dnll.list { if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - break + return false } } dnll.list = append(dnll.list, loc) + return true } func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go index 871448785..357b42185 100644 --- a/weed-fs/src/pkg/util/post.go +++ b/weed-fs/src/pkg/util/post.go @@ -7,17 +7,17 @@ import ( "net/url" ) -func Post(url string, values url.Values) []byte { +func Post(url string, values url.Values) ([]byte, error) { r, err := http.PostForm(url, values) if err != nil { log.Println("post:", err) - return nil + return nil, err } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("post:", err) - return nil + return nil, err } - return b + return b, nil }