From 2fd0349d340c8966221ff05b1c776c018a7a84d6 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sun, 6 Mar 2016 23:19:41 +0800 Subject: [PATCH] return a copy of VolumeLocationList in VolumeLayout.Lookup, so when it outside of the VolumeLayout, we don't care about the concurrent safe of it --- go/topology/batch_operation.go | 4 ++-- go/topology/topology_event_handling.go | 2 +- go/topology/topology_vacuum.go | 10 +++++----- go/topology/volume_layout.go | 16 +++++++++------- go/topology/volume_location_list.go | 6 ++++++ 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/go/topology/batch_operation.go b/go/topology/batch_operation.go index 3cf791d1e..2ecf1c4bd 100644 --- a/go/topology/batch_operation.go +++ b/go/topology/batch_operation.go @@ -11,7 +11,7 @@ import ( func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) { ch := make(chan bool, locationList.Length()) - for _, dn := range locationList.list { + for _, dn := range locationList.AllDataNode() { go func(url string, path string, values url.Values) { _, e := util.RemoteApiCall(url, path, values) if e != nil { @@ -22,7 +22,7 @@ func BatchOperation(locationList *VolumeLocationList, path string, values url.Va }(dn.Url(), path, values) } isSuccess = true - for range locationList.list { + for range locationList.AllDataNode() { select { case canVacuum := <-ch: isSuccess = isSuccess && canVacuum diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 65792a08a..1cfaf4a0c 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -47,7 +47,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } - for _, dn := range vl.vid2location[volumeInfo.Id].list { + for _, dn := range vl.vid2location[volumeInfo.Id].AllDataNode() { if !volumeInfo.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(-1) } diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 660fdb183..a0634c3b6 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -13,7 +13,7 @@ import ( func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { + for index, dn := range locationlist.AllDataNode() { go func(index int, url string, vid storage.VolumeId) { //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url()) if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { @@ -26,7 +26,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist }(index, dn.Url(), vid) } isCheckSuccess := true - for range locationlist.list { + for range locationlist.AllDataNode() { select { case canVacuum := <-ch: isCheckSuccess = isCheckSuccess && canVacuum @@ -40,7 +40,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { + for index, dn := range locationlist.AllDataNode() { go func(index int, url string, vid storage.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) if e := vacuumVolume_Compact(url, vid); e != nil { @@ -53,7 +53,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli }(index, dn.Url(), vid) } isVacuumSuccess := true - for range locationlist.list { + for range locationlist.AllDataNode() { select { case _ = <-ch: case <-time.After(30 * time.Minute): @@ -65,7 +65,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli } func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true - for _, dn := range locationlist.list { + for _, dn := range locationlist.AllDataNode() { glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 2a38fda99..fd011af5f 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -17,7 +17,7 @@ type VolumeLayout struct { vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 - accessLock sync.Mutex + accessLock sync.RWMutex } func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { @@ -44,7 +44,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length()) //TODO balancing data when have more replications if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - vl.AddToWritable(v.Id) + vl.addToWritable(v.Id) } else { vl.removeFromWritable(v.Id) } @@ -58,7 +58,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.vid2location, v.Id) } -func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) { +func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { for _, id := range vl.writables { if vid == id { return @@ -75,14 +75,14 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList { if location := vl.vid2location[vid]; location != nil { - return location + return location.Duplicate() } return nil } func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { for _, location := range vl.vid2location { - nodes = append(nodes, location.list...) + nodes = append(nodes, location.AllDataNode()...) } return } @@ -106,7 +106,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s counter := 0 for _, v := range vl.writables { volumeLocationList := vl.vid2location[v] - for _, dn := range volumeLocationList.list { + for _, dn := range volumeLocationList.AllDataNode() { if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { continue @@ -130,7 +130,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { } counter := 0 for _, v := range vl.writables { - for _, dn := range vl.vid2location[v].list { + for _, dn := range vl.vid2location[v].AllDataNode() { if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { continue @@ -205,6 +205,8 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { } func (vl *VolumeLayout) ToMap() map[string]interface{} { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() m := make(map[string]interface{}) m["replication"] = vl.rp.String() m["ttl"] = vl.ttl.String() diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 929d0c8a3..901d27ce0 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -32,6 +32,12 @@ func (dnll *VolumeLocationList) AllDataNode() []*DataNode { return dnll.list } +func (dnll *VolumeLocationList) Duplicate() *VolumeLocationList { + l := make([]*DataNode, len(dnll.list)) + copy(l, dnll.list) + return &VolumeLocationList{list: l} +} + func (dnll *VolumeLocationList) Length() int { return len(dnll.list) }