From f7f582ec8698dc43f1a2289dbd06fe0cade7468f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 13 Apr 2014 01:29:52 -0700 Subject: [PATCH] 1. refactoring, merge "replication" logic into "topology" package 2. when growing volumes, additional preferred "rack" and "dataNode" paraemters are also provided. Previously only "dataCenter" paraemter is provided. --- .../allocate_volume.go | 5 +- go/topology/configuration.go | 24 ++++---- go/topology/data_node.go | 10 ++++ .../store_replicate.go | 2 +- go/topology/topology.go | 9 ++- go/{replication => topology}/volume_growth.go | 56 +++++++++++------- .../volume_growth_test.go | 13 ++-- go/topology/volume_layout.go | 24 ++++++-- go/weed/weed_server/master_server.go | 13 ++-- go/weed/weed_server/master_server_handlers.go | 59 +++++++++++++------ go/weed/weed_server/volume_server_handlers.go | 6 +- 11 files changed, 144 insertions(+), 77 deletions(-) rename go/{replication => topology}/allocate_volume.go (76%) rename go/{replication => topology}/store_replicate.go (99%) rename go/{replication => topology}/volume_growth.go (55%) rename go/{replication => topology}/volume_growth_test.go (89%) diff --git a/go/replication/allocate_volume.go b/go/topology/allocate_volume.go similarity index 76% rename from go/replication/allocate_volume.go rename to go/topology/allocate_volume.go index fb40c6353..77b4ac508 100644 --- a/go/replication/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -1,8 +1,7 @@ -package replication +package topology import ( "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" @@ -13,7 +12,7 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { +func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { values := make(url.Values) values.Add("volume", vid.String()) values.Add("collection", collection) diff --git a/go/topology/configuration.go b/go/topology/configuration.go index 058600a7c..ffcebb59c 100644 --- a/go/topology/configuration.go +++ b/go/topology/configuration.go @@ -47,19 +47,19 @@ func (c *Configuration) String() string { } func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { - if dcName == "" { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - } else { - if rackName == "" { - return dcName, "DefaultRack" - } else { - return dcName, rackName + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName } } - return "DefaultDataCenter", "DefaultRack" + if dcName == "" { + dcName = "DefaultDataCenter" + } + + if rackName == "" { + rackName = "DefaultRack" + } + + return dcName, rackName } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 0cedb5cfe..ae80e08bb 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -24,6 +24,7 @@ func NewDataNode(id string) *DataNode { s.NodeImpl.value = s return s } + func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v @@ -36,6 +37,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { dn.volumes[v.Id] = v } } + func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { @@ -53,9 +55,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { dn.AddOrUpdateVolume(v) } } + func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } + +func (dn *DataNode) GetRack() *Rack { + return dn.Parent().(*NodeImpl).value.(*Rack) +} + func (dn *DataNode) GetTopology() *Topology { p := dn.Parent() for p.Parent() != nil { @@ -64,9 +72,11 @@ func (dn *DataNode) GetTopology() *Topology { t := p.(*Topology) return t } + func (dn *DataNode) MatchLocation(ip string, port int) bool { return dn.Ip == ip && dn.Port == port } + func (dn *DataNode) Url() string { return dn.Ip + ":" + strconv.Itoa(dn.Port) } diff --git a/go/replication/store_replicate.go b/go/topology/store_replicate.go similarity index 99% rename from go/replication/store_replicate.go rename to go/topology/store_replicate.go index 249e7e3e6..a982cebe5 100644 --- a/go/replication/store_replicate.go +++ b/go/topology/store_replicate.go @@ -1,4 +1,4 @@ -package replication +package topology import ( "bytes" diff --git a/go/topology/topology.go b/go/topology/topology.go index 6c5bde304..b1fa3f2a2 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -108,8 +108,13 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return next } -func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter) +func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } diff --git a/go/replication/volume_growth.go b/go/topology/volume_growth.go similarity index 55% rename from go/replication/volume_growth.go rename to go/topology/volume_growth.go index 33dfc570e..ee6233364 100644 --- a/go/replication/volume_growth.go +++ b/go/topology/volume_growth.go @@ -1,9 +1,8 @@ -package replication +package topology import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "fmt" "math/rand" "sync" @@ -17,6 +16,14 @@ This package is created to resolve these replica placement issues: 4. volume allocation for each bucket */ +type VolumeGrowOption struct { + Collection string + ReplicaPlacement *storage.ReplicaPlacement + DataCenter string + Rack string + DataNode string +} + type VolumeGrowth struct { accessLock sync.Mutex } @@ -41,19 +48,19 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { return } -func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) { - count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo) - if count > 0 && count%rp.GetCopyCount() == 0 { +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { + count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) + if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { return count, nil } return count, err } -func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil { + if c, e := vg.findAndGrow(topo, option); e == nil { counter += c } else { return counter, e @@ -62,21 +69,22 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, r return } -func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp) +func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { + servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { return 0, e } vid := topo.NextVolumeId() - err := vg.grow(topo, vid, collection, rp, servers...) + err := vg.grow(topo, vid, option, servers...) return len(servers), err } -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) { +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) error { - if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) { - return fmt.Errorf("Not matching preferred:%s", preferredDataCenter) + rp := option.ReplicaPlacement + mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { + return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) @@ -88,7 +96,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref } //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) error { + mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { + return fmt.Errorf("Not matching preferred rack:%s", option.Rack) + } if node.FreeSpace() < rp.SameRackCount+1 { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) } @@ -99,7 +110,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref } //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) error { + mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { + return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) + } if node.FreeSpace() < 1 { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) } @@ -109,9 +123,9 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref return nil, server_err } - servers = append(servers, mainServer.(*topology.DataNode)) + servers = append(servers, mainServer.(*DataNode)) for _, server := range otherServers { - servers = append(servers, server.(*topology.DataNode)) + servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { r := rand.Intn(rack.FreeSpace()) @@ -132,10 +146,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref return } -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error { +func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, collection, rp); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion} + if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil { + vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/replication/volume_growth_test.go b/go/topology/volume_growth_test.go similarity index 89% rename from go/replication/volume_growth_test.go rename to go/topology/volume_growth_test.go index bb6cbe90e..7f6bd9489 100644 --- a/go/replication/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -1,9 +1,8 @@ -package replication +package topology import ( "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "encoding/json" "fmt" "testing" @@ -70,7 +69,7 @@ var topologyLayout = ` } ` -func setup(topologyLayout string) *topology.Topology { +func setup(topologyLayout string) *Topology { var data interface{} err := json.Unmarshal([]byte(topologyLayout), &data) if err != nil { @@ -79,22 +78,22 @@ func setup(topologyLayout string) *topology.Topology { fmt.Println("data:", data) //need to connect all nodes first before server adding volumes - topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf", + topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", sequence.NewMemorySequencer(), 32*1024, 5) if err != nil { panic("error: " + err.Error()) } mTopology := data.(map[string]interface{}) for dcKey, dcValue := range mTopology { - dc := topology.NewDataCenter(dcKey) + dc := NewDataCenter(dcKey) dcMap := dcValue.(map[string]interface{}) topo.LinkChildNode(dc) for rackKey, rackValue := range dcMap { - rack := topology.NewRack(rackKey) + rack := NewRack(rackKey) rackMap := rackValue.(map[string]interface{}) dc.LinkChildNode(rack) for serverKey, serverValue := range rackMap { - server := topology.NewDataNode(serverKey) + server := NewDataNode(serverKey) serverMap := serverValue.(map[string]interface{}) rack.LinkChildNode(server) for _, v := range serverMap["volumes"].([]interface{}) { diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index a53e2ae82..bd95cc796 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -71,13 +71,13 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int, option *VolumeGrowOption) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { glog.V(0).Infoln("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - if dataCenter == "" { + if option.DataCenter == "" { vid := vl.writables[rand.Intn(len_writers)] locationList := vl.vid2location[vid] if locationList != nil { @@ -91,7 +91,13 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol for _, v := range vl.writables { volumeLocationList := vl.vid2location[v] for _, dn := range volumeLocationList.list { - if dn.GetDataCenter().Id() == NodeId(dataCenter) { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } counter++ if rand.Intn(counter) < 1 { vid, locationList = v, volumeLocationList @@ -104,14 +110,20 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!") } -func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int { - if dataCenter == "" { +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + if option.DataCenter == "" { return len(vl.writables) } counter := 0 for _, v := range vl.writables { for _, dn := range vl.vid2location[v].list { - if dn.GetDataCenter().Id() == NodeId(dataCenter) { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } counter++ } } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 286b90aca..9d8af156f 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -2,10 +2,10 @@ package weed_server import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" + "errors" "github.com/goraft/raft" "github.com/gorilla/mux" "net/http" @@ -25,7 +25,7 @@ type MasterServer struct { whiteList []string Topo *topology.Topology - vg *replication.VolumeGrowth + vg *topology.VolumeGrowth vgLock sync.Mutex bounedLeaderChan chan int @@ -53,7 +53,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { glog.Fatalf("cannot create topology:%s", e) } - ms.vg = replication.NewDefaultVolumeGrowth() + ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler))) @@ -94,11 +94,11 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else { + } else if ms.Topo.RaftServer.Leader() != "" { ms.bounedLeaderChan <- 1 defer func() { <-ms.bounedLeaderChan }() targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil || ms.Topo.RaftServer.Leader() == "" { + if err != nil { writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL http://" + ms.Topo.RaftServer.Leader() + " Parse Error " + err.Error()}) return } @@ -106,6 +106,9 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy := httputil.NewSingleHostReverseProxy(targetUrl) proxy.Transport = util.Transport proxy.ServeHTTP(w, r) + } else { + //drop it to the floor + writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader())) } } } diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index a2f7b162c..884689a5e 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -3,6 +3,7 @@ package weed_server import ( "code.google.com/p/weed-fs/go/stats" "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" @@ -39,24 +40,19 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { stats.AssignRequest() - c, e := strconv.Atoi(r.FormValue("count")) + requestedCount, e := strconv.Atoi(r.FormValue("count")) if e != nil { - c = 1 + requestedCount = 1 } - replication := r.FormValue("replication") - if replication == "" { - replication = ms.defaultReplicaPlacement - } - collection := r.FormValue("collection") - dataCenter := r.FormValue("dataCenter") - replicaPlacement, err := storage.NewReplicaPlacementFromString(replication) + + option, err := ms.getVolumeGrowOption(r) if err != nil { w.WriteHeader(http.StatusNotAcceptable) writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) return } - if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 { + if !ms.Topo.HasWriableVolume(option) { if ms.Topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) @@ -64,15 +60,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } else { ms.vgLock.Lock() defer ms.vgLock.Unlock() - if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 { - if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.Topo); err != nil { + if !ms.Topo.HasWriableVolume(option) { + if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) return } } } } - fid, count, dn, err := ms.Topo.PickForWrite(collection, replicaPlacement, c, dataCenter) + fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { @@ -138,13 +134,18 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { count := 0 - replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication")) + option, err := ms.getVolumeGrowOption(r) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + return + } if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*replicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount())) + if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) } else { - count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.Topo) + count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) } } else { err = errors.New("parameter count is not found") @@ -189,3 +190,27 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * submitForClientHandler(w, r, ms.Topo.RaftServer.Leader()) } } + +func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { + replicationString := r.FormValue("replication") + if replicationString == "" { + replicationString = ms.defaultReplicaPlacement + } + replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) + if err != nil { + return nil, err + } + volumeGrowOption := &topology.VolumeGrowOption{ + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + } + return volumeGrowOption, nil +} diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index 0649ce7f4..36ed204ac 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -3,9 +3,9 @@ package weed_server import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/stats" "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/util" "mime" "net/http" @@ -214,7 +214,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, ne) return } - ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) + ret, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) if errorStatus == "" { w.WriteHeader(http.StatusCreated) } else { @@ -251,7 +251,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } n.Size = 0 - ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) + ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) if ret != 0 { w.WriteHeader(http.StatusAccepted)