Browse Source

make VolumeLayout thread safe

pull/291/head
chrislusf 9 years ago
parent
commit
95e0d2f1b2
  1. 18
      go/topology/volume_layout.go

18
go/topology/volume_layout.go

@ -17,7 +17,7 @@ type VolumeLayout struct {
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
accessLock sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
@ -44,7 +44,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.AddToWritable(v.Id)
vl.addToWritable(v.Id)
} else {
vl.removeFromWritable(v.Id)
}
@ -58,7 +58,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.vid2location, v.Id)
}
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
@ -74,6 +74,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
if location := vl.vid2location[vid]; location != nil {
return location.list
}
@ -81,6 +84,9 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
}
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
for _, location := range vl.vid2location {
nodes = append(nodes, location.list...)
}
@ -88,6 +94,9 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
}
func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
@ -125,6 +134,9 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
if option.DataCenter == "" {
return len(vl.writables)
}

Loading…
Cancel
Save