From 14415304dcaf55279883547b95fa13bd64d48b4e Mon Sep 17 00:00:00 2001 From: tnextday Date: Sun, 6 Mar 2016 23:47:10 +0800 Subject: [PATCH] make sure `VolumeLayout` concurrent safe. --- go/topology/topology_event_handling.go | 8 +++- go/topology/topology_replicate.go | 6 ++- go/topology/topology_vacuum.go | 8 +++- go/topology/volume_layout.go | 57 +++++++++++++++++++------- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 1cfaf4a0c..8dc808039 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -47,7 +47,11 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } - for _, dn := range vl.vid2location[volumeInfo.Id].AllDataNode() { + vll := vl.Lookup(volumeInfo.Id) + if vll != nil { + return false + } + for _, dn := range vll.AllDataNode() { if !volumeInfo.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(-1) } @@ -68,7 +72,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { vl := t.GetVolumeLayout(v.Collection, v.Ttl) - if vl.isWritable(&v) { + if vl.IsWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index be29139ac..129c1b65b 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -67,7 +67,11 @@ func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { continue } volumeLayout := i2.Value.(*VolumeLayout) - for vid, locationList := range volumeLayout.vid2location { + for _, vid := range volumeLayout.ListVolumeId() { + locationList := volumeLayout.Lookup(vid) + if locationList == nil { + continue + } rp1 := locationList.CalcReplicaPlacement() if rp1.Compare(volumeLayout.rp) >= 0 { continue diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index a0634c3b6..2220c24d2 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -38,7 +38,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist return isCheckSuccess } func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - vl.removeFromWritable(vid) + vl.RemoveFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.AllDataNode() { go func(index int, url string, vid storage.VolumeId) { @@ -89,7 +89,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int { continue } volumeLayout := item1.Value.(*VolumeLayout) - for vid, locationList := range volumeLayout.vid2location { + for _, vid := range volumeLayout.ListVolumeId() { + locationList := volumeLayout.Lookup(vid) + if locationList == nil { + continue + } glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { if batchVacuumVolumeCompact(volumeLayout, vid, locationList) { diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index fd011af5f..a5e5418dc 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -17,7 +17,7 @@ type VolumeLayout struct { vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 - accessLock sync.RWMutex + mutex sync.RWMutex } func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { @@ -30,12 +30,14 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL } func (vl *VolumeLayout) String() string { + vl.mutex.RLock() + defer vl.mutex.RUnlock() return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) } func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + vl.mutex.Lock() + defer vl.mutex.Unlock() if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() @@ -43,7 +45,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id].Set(dn) glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length()) //TODO balancing data when have more replications - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.IsWritable(v) { vl.addToWritable(v.Id) } else { vl.removeFromWritable(v.Id) @@ -51,8 +53,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + vl.mutex.Lock() + defer vl.mutex.Unlock() //TODO only delete data node from locations? vl.removeFromWritable(v.Id) delete(vl.vid2location, v.Id) @@ -67,13 +69,15 @@ func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { vl.writables = append(vl.writables, vid) } -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { +func (vl *VolumeLayout) IsWritable(v *storage.VolumeInfo) bool { return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion && !v.ReadOnly } func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList { + vl.mutex.RLock() + defer vl.mutex.RUnlock() if location := vl.vid2location[vid]; location != nil { return location.Duplicate() } @@ -81,13 +85,27 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList { } func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + vl.mutex.RLock() + defer vl.mutex.RUnlock() for _, location := range vl.vid2location { nodes = append(nodes, location.AllDataNode()...) } return } +func (vl *VolumeLayout) ListVolumeId() (vids []storage.VolumeId) { + vl.mutex.RLock() + defer vl.mutex.RUnlock() + vids = make([]storage.VolumeId, 0, len(vl.vid2location)) + for vid := range vl.vid2location { + vids = append(vids, vid) + } + return +} + func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { + vl.mutex.RLock() + defer vl.mutex.RUnlock() len_writers := len(vl.writables) if len_writers <= 0 { glog.V(0).Infoln("No more writable volumes!") @@ -125,6 +143,8 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + vl.mutex.RLock() + defer vl.mutex.RUnlock() if option.DataCenter == "" { return len(vl.writables) } @@ -160,6 +180,13 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } + +func (vl *VolumeLayout) RemoveFromWritable(vid storage.VolumeId) bool { + vl.mutex.Lock() + defer vl.mutex.Unlock() + return vl.removeFromWritable(vid) +} + func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { for _, v := range vl.writables { if v == vid { @@ -172,8 +199,8 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { } func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + vl.mutex.Lock() + defer vl.mutex.Unlock() if location, ok := vl.vid2location[vid]; ok { if location.Remove(dn) { @@ -186,8 +213,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) return false } func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + vl.mutex.Lock() + defer vl.mutex.Unlock() vl.vid2location[vid].Set(dn) if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { @@ -197,16 +224,16 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b } func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + vl.mutex.Lock() + defer vl.mutex.Unlock() // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") return vl.removeFromWritable(vid) } func (vl *VolumeLayout) ToMap() map[string]interface{} { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() + vl.mutex.RLock() + defer vl.mutex.RUnlock() m := make(map[string]interface{}) m["replication"] = vl.rp.String() m["ttl"] = vl.ttl.String()