Browse Source

rename ConcurrentReadMap to ConcurrentMap

use RWMutex lock both read and write on ConcurrentMap
pull/279/head
tnextday 10 years ago
parent
commit
6efa09d263
  1. 27
      go/topology/collection.go
  2. 25
      go/topology/topology.go
  3. 15
      go/topology/topology_map.go
  4. 47
      go/topology/topology_replicate.go
  5. 23
      go/topology/topology_vacuum.go
  6. 121
      go/util/concurrent_map.go
  7. 37
      go/util/concurrent_read_map.go

27
go/topology/collection.go

@ -11,12 +11,12 @@ type Collection struct {
Name string
volumeSizeLimit uint64
rp *storage.ReplicaPlacement
storageType2VolumeLayout *util.ConcurrentReadMap
storageType2VolumeLayout *util.ConcurrentMap
}
func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit, rp: rp}
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
c.storageType2VolumeLayout = util.NewConcurrentMap()
return c
}
@ -29,30 +29,35 @@ func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout {
if ttl != nil {
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
vl := c.storageType2VolumeLayout.GetOrNew(keyString, func() interface{} {
return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList {
for _, vl := range c.storageType2VolumeLayout.Items {
func (c *Collection) Lookup(vid storage.VolumeId) (vll *VolumeLocationList) {
c.storageType2VolumeLayout.Walk(func(k string, vl interface{}) (e error) {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list
if vl != nil {
if vll = vl.(*VolumeLayout).Lookup(vid); vll != nil {
return util.ErrBreakWalk
}
}
}
}
return nil
return nil
})
return
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.storageType2VolumeLayout.Items {
c.storageType2VolumeLayout.Walk(func(k string, vl interface{}) (e error) {
if vl != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
}
}
}
return nil
})
return
}

25
go/topology/topology.go

@ -16,7 +16,7 @@ import (
type Topology struct {
NodeImpl
collectionMap *util.ConcurrentReadMap
collectionMap *util.ConcurrentMap
pulse int64
@ -41,7 +41,7 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()
t.collectionMap = util.NewConcurrentMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.CollectionSettings = cs
@ -90,20 +90,23 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil
}
func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) (vl *VolumeLocationList) {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items {
t.collectionMap.Walk(func(k string, c interface{}) (e error) {
if list := c.(*Collection).Lookup(vid); list != nil {
return list
vl = list
return util.ErrBreakWalk
}
}
return nil
})
} else {
if c, ok := t.collectionMap.Items[collection]; ok {
if c, _ := t.collectionMap.Get(collection); c != nil {
return c.(*Collection).Lookup(vid)
}
}
return nil
return
}
func (t *Topology) NextVolumeId() storage.VolumeId {
@ -128,18 +131,18 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
}
func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return t.collectionMap.GetOrNew(collectionName, func() interface{} {
return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(ttl)
}
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Items[collectionName]
c, hasCollection := t.collectionMap.Get(collectionName)
return c.(*Collection), hasCollection
}
func (t *Topology) DeleteCollection(collectionName string) {
delete(t.collectionMap.Items, collectionName)
t.collectionMap.Delete(collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {

15
go/topology/topology_map.go

@ -11,14 +11,15 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items {
if layout != nil {
tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
for i1 := range t.collectionMap.IterItems() {
c := i1.Value.(*Collection)
for i2 := range c.storageType2VolumeLayout.IterItems() {
if i2.Value == nil {
continue
}
tmp := i2.Value.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
}
}
m["layouts"] = layouts

47
go/topology/topology_replicate.go

@ -58,33 +58,34 @@ func (t *ReplicateTask) WorkingDataNodes() []*DataNode {
}
func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for i1 := range t.collectionMap.IterItems() {
c := i1.Value.(*Collection)
glog.V(0).Infoln("checking replicate on collection:", c.Name)
growOption := &VolumeGrowOption{ReplicaPlacement: c.rp}
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
rp1 := locationList.CalcReplicaPlacement()
if rp1.Compare(volumeLayout.rp) >= 0 {
continue
}
if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil {
for _, s := range additionServers {
s.UpAdjustPlannedVolumeCountDelta(1)
rt := &ReplicateTask{
Vid: vid,
Collection: c.Name,
SrcDN: locationList.PickForRead(),
DstDN: s,
}
tasks = append(tasks, rt)
glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url())
for i2 := range c.storageType2VolumeLayout.IterItems() {
if i2.Value == nil {
continue
}
volumeLayout := i2.Value.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
rp1 := locationList.CalcReplicaPlacement()
if rp1.Compare(volumeLayout.rp) >= 0 {
continue
}
if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil {
for _, s := range additionServers {
s.UpAdjustPlannedVolumeCountDelta(1)
rt := &ReplicateTask{
Vid: vid,
Collection: c.Name,
SrcDN: locationList.PickForRead(),
DstDN: s,
}
} else {
glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e)
tasks = append(tasks, rt)
glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url())
}
} else {
glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e)
}
}
}

23
go/topology/topology_vacuum.go

@ -81,18 +81,19 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
glog.V(0).Infoln("Start vacuum on demand")
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for item := range t.collectionMap.IterItems() {
c := item.Value.(*Collection)
glog.V(0).Infoln("vacuum on collection:", c.Name)
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList) {
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
}
for item1 := range c.storageType2VolumeLayout.IterItems() {
if item1.Value == nil {
continue
}
volumeLayout := item1.Value.(*VolumeLayout)
for vid, locationList := range volumeLayout.vid2location {
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList) {
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
}
}
}

121
go/util/concurrent_map.go

@ -0,0 +1,121 @@
package util
import (
"errors"
"sync"
)
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentMap struct {
mutex sync.RWMutex
items map[string]interface{}
}
func NewConcurrentMap() *ConcurrentMap {
return &ConcurrentMap{items: make(map[string]interface{})}
}
func (m *ConcurrentMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
if value, ok := m.items[key]; ok {
return value
}
value = newEntry()
m.items[key] = value
return value
}
func (m *ConcurrentMap) GetOrNew(key string, newEntry func() interface{}) interface{} {
m.mutex.RLock()
value, ok := m.items[key]
m.mutex.RUnlock()
if ok {
return value
}
return m.initMapEntry(key, newEntry)
}
func (m *ConcurrentMap) Get(key string) (item interface{}, ok bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
item, ok = m.items[key]
return
}
func (m *ConcurrentMap) Has(key string) bool {
m.mutex.RLock()
_, ok := m.items[key]
m.mutex.RUnlock()
return ok
}
func (m *ConcurrentMap) Delete(key string) {
m.mutex.Lock()
delete(m.items, key)
m.mutex.Unlock()
}
func (m *ConcurrentMap) Size() int {
m.mutex.RLock()
defer m.mutex.RUnlock()
return len(m.items)
}
// Wipes all items from the map
func (m *ConcurrentMap) Flush() int {
m.mutex.Lock()
size := len(m.items)
m.items = make(map[string]interface{})
m.mutex.Unlock()
return size
}
var ErrBreakWalk = errors.New("Break walk.")
// break walk when walker fuc return an error
type MapWalker func(k string, v interface{}) (e error)
// MUST NOT add or delete item in walker
func (m *ConcurrentMap) Walk(mw MapWalker) (e error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for k, v := range m.items {
if e = mw(k, v); e != nil {
return e
}
}
return
}
func (m *ConcurrentMap) Keys() (keys []string) {
m.mutex.RLock()
keys = make([]string, 0, len(m.items))
for key := range m.items {
keys = append(keys, key)
}
m.mutex.RUnlock()
return
}
// Item is a pair of key and value
type Item struct {
Key string
Value interface{}
}
// Return a channel from which each item (key:value pair) in the map can be read
// You can't break the iterator
func (m *ConcurrentMap) IterItems() <-chan Item {
ch := make(chan Item)
go func() {
m.mutex.RLock()
for key, value := range m.items {
ch <- Item{key, value}
}
m.mutex.RUnlock()
close(ch)
}()
return ch
}

37
go/util/concurrent_read_map.go

@ -1,37 +0,0 @@
package util
import (
"sync"
)
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
rwmutex sync.RWMutex
Items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
return &ConcurrentReadMap{Items: make(map[string]interface{})}
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
if value, ok := m.Items[key]; ok {
return value
}
value = newEntry()
m.Items[key] = value
return value
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rwmutex.RLock()
value, ok := m.Items[key]
m.rwmutex.RUnlock()
if ok {
return value
}
return m.initMapEntry(key, newEntry)
}
Loading…
Cancel
Save