Browse Source

return a copy of VolumeLocationList in VolumeLayout.Lookup, so when it outside of the VolumeLayout, we don't care about the concurrent safe of it

pull/279/head
tnextday 10 years ago
parent
commit
2fd0349d34
  1. 4
      go/topology/batch_operation.go
  2. 2
      go/topology/topology_event_handling.go
  3. 10
      go/topology/topology_vacuum.go
  4. 16
      go/topology/volume_layout.go
  5. 6
      go/topology/volume_location_list.go

4
go/topology/batch_operation.go

@ -11,7 +11,7 @@ import (
func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) {
ch := make(chan bool, locationList.Length())
for _, dn := range locationList.list {
for _, dn := range locationList.AllDataNode() {
go func(url string, path string, values url.Values) {
_, e := util.RemoteApiCall(url, path, values)
if e != nil {
@ -22,7 +22,7 @@ func BatchOperation(locationList *VolumeLocationList, path string, values url.Va
}(dn.Url(), path, values)
}
isSuccess = true
for range locationList.list {
for range locationList.AllDataNode() {
select {
case canVacuum := <-ch:
isSuccess = isSuccess && canVacuum

2
go/topology/topology_event_handling.go

@ -47,7 +47,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
for _, dn := range vl.vid2location[volumeInfo.Id].list {
for _, dn := range vl.vid2location[volumeInfo.Id].AllDataNode() {
if !volumeInfo.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(-1)
}

10
go/topology/topology_vacuum.go

@ -13,7 +13,7 @@ import (
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
for index, dn := range locationlist.AllDataNode() {
go func(index int, url string, vid storage.VolumeId) {
//glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
@ -26,7 +26,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
}(index, dn.Url(), vid)
}
isCheckSuccess := true
for range locationlist.list {
for range locationlist.AllDataNode() {
select {
case canVacuum := <-ch:
isCheckSuccess = isCheckSuccess && canVacuum
@ -40,7 +40,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
for index, dn := range locationlist.AllDataNode() {
go func(index int, url string, vid storage.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
if e := vacuumVolume_Compact(url, vid); e != nil {
@ -53,7 +53,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
for range locationlist.list {
for range locationlist.AllDataNode() {
select {
case _ = <-ch:
case <-time.After(30 * time.Minute):
@ -65,7 +65,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}
func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
for _, dn := range locationlist.AllDataNode() {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)

16
go/topology/volume_layout.go

@ -17,7 +17,7 @@ type VolumeLayout struct {
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
accessLock sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
@ -44,7 +44,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length())
//TODO balancing data when have more replications
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.AddToWritable(v.Id)
vl.addToWritable(v.Id)
} else {
vl.removeFromWritable(v.Id)
}
@ -58,7 +58,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.vid2location, v.Id)
}
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
@ -75,14 +75,14 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *VolumeLocationList {
if location := vl.vid2location[vid]; location != nil {
return location
return location.Duplicate()
}
return nil
}
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
for _, location := range vl.vid2location {
nodes = append(nodes, location.list...)
nodes = append(nodes, location.AllDataNode()...)
}
return
}
@ -106,7 +106,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
counter := 0
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
for _, dn := range volumeLocationList.AllDataNode() {
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
continue
@ -130,7 +130,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
}
counter := 0
for _, v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
for _, dn := range vl.vid2location[v].AllDataNode() {
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
continue
@ -205,6 +205,8 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
}
func (vl *VolumeLayout) ToMap() map[string]interface{} {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
m["ttl"] = vl.ttl.String()

6
go/topology/volume_location_list.go

@ -32,6 +32,12 @@ func (dnll *VolumeLocationList) AllDataNode() []*DataNode {
return dnll.list
}
func (dnll *VolumeLocationList) Duplicate() *VolumeLocationList {
l := make([]*DataNode, len(dnll.list))
copy(l, dnll.list)
return &VolumeLocationList{list: l}
}
func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}

Loading…
Cancel
Save