Browse Source

make sure `VolumeLayout` concurrent safe.

pull/279/head
tnextday 10 years ago
parent
commit
14415304dc
  1. 8
      go/topology/topology_event_handling.go
  2. 6
      go/topology/topology_replicate.go
  3. 8
      go/topology/topology_vacuum.go
  4. 57
      go/topology/volume_layout.go

8
go/topology/topology_event_handling.go

@ -47,7 +47,11 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
for _, dn := range vl.vid2location[volumeInfo.Id].AllDataNode() {
vll := vl.Lookup(volumeInfo.Id)
if vll != nil {
return false
}
for _, dn := range vll.AllDataNode() {
if !volumeInfo.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(-1)
}
@ -68,7 +72,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
if vl.isWritable(&v) {
if vl.IsWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
}

6
go/topology/topology_replicate.go

@ -67,7 +67,11 @@ func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
continue
}
volumeLayout := i2.Value.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
for _, vid := range volumeLayout.ListVolumeId() {
locationList := volumeLayout.Lookup(vid)
if locationList == nil {
continue
}
rp1 := locationList.CalcReplicaPlacement()
if rp1.Compare(volumeLayout.rp) >= 0 {
continue

8
go/topology/topology_vacuum.go

@ -38,7 +38,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
return isCheckSuccess
}
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
vl.removeFromWritable(vid)
vl.RemoveFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.AllDataNode() {
go func(index int, url string, vid storage.VolumeId) {
@ -89,7 +89,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int {
continue
}
volumeLayout := item1.Value.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
for _, vid := range volumeLayout.ListVolumeId() {
locationList := volumeLayout.Lookup(vid)
if locationList == nil {
continue
}
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList) {

57
go/topology/volume_layout.go

@ -17,7 +17,7 @@ type VolumeLayout struct {
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.RWMutex
mutex sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
@ -30,12 +30,14 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL
}
func (vl *VolumeLayout) String() string {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.mutex.Lock()
defer vl.mutex.Unlock()
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
@ -43,7 +45,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length())
//TODO balancing data when have more replications
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.IsWritable(v) {
vl.addToWritable(v.Id)
} else {
vl.removeFromWritable(v.Id)
@ -51,8 +53,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.mutex.Lock()
defer vl.mutex.Unlock()
//TODO only delete data node from locations?
vl.removeFromWritable(v.Id)
delete(vl.vid2location, v.Id)
@ -67,13 +69,15 @@ func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) IsWritable(v *storage.VolumeInfo) bool {
return uint64(v.Size) < vl.volumeSizeLimit &&
v.Version == storage.CurrentVersion &&
!v.ReadOnly
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
if location := vl.vid2location[vid]; location != nil {
return location.Duplicate()
}
@ -81,13 +85,27 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList {
}
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
for _, location := range vl.vid2location {
nodes = append(nodes, location.AllDataNode()...)
}
return
}
func (vl *VolumeLayout) ListVolumeId() (vids []storage.VolumeId) {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
vids = make([]storage.VolumeId, 0, len(vl.vid2location))
for vid := range vl.vid2location {
vids = append(vids, vid)
}
return
}
func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
@ -125,6 +143,8 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
vl.mutex.RLock()
defer vl.mutex.RUnlock()
if option.DataCenter == "" {
return len(vl.writables)
}
@ -160,6 +180,13 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
}
return false
}
func (vl *VolumeLayout) RemoveFromWritable(vid storage.VolumeId) bool {
vl.mutex.Lock()
defer vl.mutex.Unlock()
return vl.removeFromWritable(vid)
}
func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
@ -172,8 +199,8 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
}
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.mutex.Lock()
defer vl.mutex.Unlock()
if location, ok := vl.vid2location[vid]; ok {
if location.Remove(dn) {
@ -186,8 +213,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.mutex.Lock()
defer vl.mutex.Unlock()
vl.vid2location[vid].Set(dn)
if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
@ -197,16 +224,16 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.mutex.Lock()
defer vl.mutex.Unlock()
// glog.V(0).Infoln("Volume", vid, "reaches full capacity.")
return vl.removeFromWritable(vid)
}
func (vl *VolumeLayout) ToMap() map[string]interface{} {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
vl.mutex.RLock()
defer vl.mutex.RUnlock()
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
m["ttl"] = vl.ttl.String()

Loading…
Cancel
Save