diff --git a/go/topology/collection.go b/go/topology/collection.go index e5c7b0f0f..1c81ba2b6 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -11,12 +11,12 @@ type Collection struct { Name string volumeSizeLimit uint64 rp *storage.ReplicaPlacement - storageType2VolumeLayout *util.ConcurrentReadMap + storageType2VolumeLayout *util.ConcurrentMap } func NewCollection(name string, rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit, rp: rp} - c.storageType2VolumeLayout = util.NewConcurrentReadMap() + c.storageType2VolumeLayout = util.NewConcurrentMap() return c } @@ -29,30 +29,35 @@ func (c *Collection) GetOrCreateVolumeLayout(ttl *storage.TTL) *VolumeLayout { if ttl != nil { keyString += ttl.String() } - vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { + vl := c.storageType2VolumeLayout.GetOrNew(keyString, func() interface{} { return NewVolumeLayout(c.rp, ttl, c.volumeSizeLimit) }) return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) *VolumeLocationList { - for _, vl := range c.storageType2VolumeLayout.Items { +func (c *Collection) Lookup(vid storage.VolumeId) (vll *VolumeLocationList) { + c.storageType2VolumeLayout.Walk(func(k string, vl interface{}) (e error) { if vl != nil { - if list := vl.(*VolumeLayout).Lookup(vid); list != nil { - return list + if vl != nil { + if vll = vl.(*VolumeLayout).Lookup(vid); vll != nil { + return util.ErrBreakWalk + } } } - } - return nil + return nil + }) + return } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout.Items { + c.storageType2VolumeLayout.Walk(func(k string, vl interface{}) (e error) { if vl != nil { if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { nodes = append(nodes, list...) } } - } + return nil + }) return + } diff --git a/go/topology/topology.go b/go/topology/topology.go index 410a1c70e..753c8ffe8 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -16,7 +16,7 @@ import ( type Topology struct { NodeImpl - collectionMap *util.ConcurrentReadMap + collectionMap *util.ConcurrentMap pulse int64 @@ -41,7 +41,7 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.collectionMap = util.NewConcurrentReadMap() + t.collectionMap = util.NewConcurrentMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit t.CollectionSettings = cs @@ -90,20 +90,23 @@ func (t *Topology) loadConfiguration(configurationFile string) error { return nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) *VolumeLocationList { +func (t *Topology) Lookup(collection string, vid storage.VolumeId) (vl *VolumeLocationList) { //maybe an issue if lots of collections? if collection == "" { - for _, c := range t.collectionMap.Items { + t.collectionMap.Walk(func(k string, c interface{}) (e error) { if list := c.(*Collection).Lookup(vid); list != nil { - return list + vl = list + return util.ErrBreakWalk } - } + return nil + }) + } else { - if c, ok := t.collectionMap.Items[collection]; ok { + if c, _ := t.collectionMap.Get(collection); c != nil { return c.(*Collection).Lookup(vid) } } - return nil + return } func (t *Topology) NextVolumeId() storage.VolumeId { @@ -128,18 +131,18 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, } func (t *Topology) GetVolumeLayout(collectionName string, ttl *storage.TTL) *VolumeLayout { - return t.collectionMap.Get(collectionName, func() interface{} { + return t.collectionMap.GetOrNew(collectionName, func() interface{} { return NewCollection(collectionName, t.CollectionSettings.GetReplicaPlacement(collectionName), t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(ttl) } func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { - c, hasCollection := t.collectionMap.Items[collectionName] + c, hasCollection := t.collectionMap.Get(collectionName) return c.(*Collection), hasCollection } func (t *Topology) DeleteCollection(collectionName string) { - delete(t.collectionMap.Items, collectionName) + t.collectionMap.Delete(collectionName) } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index 6a1423ca8..66ec1593f 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -11,14 +11,15 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, col := range t.collectionMap.Items { - c := col.(*Collection) - for _, layout := range c.storageType2VolumeLayout.Items { - if layout != nil { - tmp := layout.(*VolumeLayout).ToMap() - tmp["collection"] = c.Name - layouts = append(layouts, tmp) + for i1 := range t.collectionMap.IterItems() { + c := i1.Value.(*Collection) + for i2 := range c.storageType2VolumeLayout.IterItems() { + if i2.Value == nil { + continue } + tmp := i2.Value.(*VolumeLayout).ToMap() + tmp["collection"] = c.Name + layouts = append(layouts, tmp) } } m["layouts"] = layouts diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 9ed262dd2..be29139ac 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -58,33 +58,34 @@ func (t *ReplicateTask) WorkingDataNodes() []*DataNode { } func planReplicateTasks(t *Topology) (tasks []*ReplicateTask) { - for _, col := range t.collectionMap.Items { - c := col.(*Collection) + for i1 := range t.collectionMap.IterItems() { + c := i1.Value.(*Collection) glog.V(0).Infoln("checking replicate on collection:", c.Name) growOption := &VolumeGrowOption{ReplicaPlacement: c.rp} - for _, vl := range c.storageType2VolumeLayout.Items { - if vl != nil { - volumeLayout := vl.(*VolumeLayout) - for vid, locationList := range volumeLayout.vid2location { - rp1 := locationList.CalcReplicaPlacement() - if rp1.Compare(volumeLayout.rp) >= 0 { - continue - } - if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil { - for _, s := range additionServers { - s.UpAdjustPlannedVolumeCountDelta(1) - rt := &ReplicateTask{ - Vid: vid, - Collection: c.Name, - SrcDN: locationList.PickForRead(), - DstDN: s, - } - tasks = append(tasks, rt) - glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url()) + for i2 := range c.storageType2VolumeLayout.IterItems() { + if i2.Value == nil { + continue + } + volumeLayout := i2.Value.(*VolumeLayout) + for vid, locationList := range volumeLayout.vid2location { + rp1 := locationList.CalcReplicaPlacement() + if rp1.Compare(volumeLayout.rp) >= 0 { + continue + } + if additionServers, e := FindEmptySlotsForOneVolume(t, growOption, locationList); e == nil { + for _, s := range additionServers { + s.UpAdjustPlannedVolumeCountDelta(1) + rt := &ReplicateTask{ + Vid: vid, + Collection: c.Name, + SrcDN: locationList.PickForRead(), + DstDN: s, } - } else { - glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e) + tasks = append(tasks, rt) + glog.V(0).Infof("add replicate task, vid: %v, src: %s, dst: %s", vid, rt.SrcDN.Url(), rt.DstDN.Url()) } + } else { + glog.V(0).Infof("find empty slots error, vid: %v, rp: %s => %s, %v", vid, rp1.String(), volumeLayout.rp.String(), e) } } } diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 446eb0c1c..660fdb183 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -81,18 +81,19 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } func (t *Topology) Vacuum(garbageThreshold string) int { glog.V(0).Infoln("Start vacuum on demand") - for _, col := range t.collectionMap.Items { - c := col.(*Collection) + for item := range t.collectionMap.IterItems() { + c := item.Value.(*Collection) glog.V(0).Infoln("vacuum on collection:", c.Name) - for _, vl := range c.storageType2VolumeLayout.Items { - if vl != nil { - volumeLayout := vl.(*VolumeLayout) - for vid, locationList := range volumeLayout.vid2location { - glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationList) { - batchVacuumVolumeCommit(volumeLayout, vid, locationList) - } + for item1 := range c.storageType2VolumeLayout.IterItems() { + if item1.Value == nil { + continue + } + volumeLayout := item1.Value.(*VolumeLayout) + for vid, locationList := range volumeLayout.vid2location { + glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) + if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationList) { + batchVacuumVolumeCommit(volumeLayout, vid, locationList) } } } diff --git a/go/util/concurrent_map.go b/go/util/concurrent_map.go new file mode 100644 index 000000000..13f7fa383 --- /dev/null +++ b/go/util/concurrent_map.go @@ -0,0 +1,121 @@ +package util + +import ( + "errors" + "sync" +) + +// A mostly for read map, which can thread-safely +// initialize the map entries. +type ConcurrentMap struct { + mutex sync.RWMutex + items map[string]interface{} +} + +func NewConcurrentMap() *ConcurrentMap { + return &ConcurrentMap{items: make(map[string]interface{})} +} + +func (m *ConcurrentMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + if value, ok := m.items[key]; ok { + return value + } + value = newEntry() + m.items[key] = value + return value +} + +func (m *ConcurrentMap) GetOrNew(key string, newEntry func() interface{}) interface{} { + m.mutex.RLock() + value, ok := m.items[key] + m.mutex.RUnlock() + if ok { + return value + } + return m.initMapEntry(key, newEntry) +} + +func (m *ConcurrentMap) Get(key string) (item interface{}, ok bool) { + m.mutex.RLock() + defer m.mutex.RUnlock() + item, ok = m.items[key] + return +} + +func (m *ConcurrentMap) Has(key string) bool { + m.mutex.RLock() + _, ok := m.items[key] + m.mutex.RUnlock() + return ok +} + +func (m *ConcurrentMap) Delete(key string) { + m.mutex.Lock() + delete(m.items, key) + m.mutex.Unlock() +} + +func (m *ConcurrentMap) Size() int { + m.mutex.RLock() + defer m.mutex.RUnlock() + return len(m.items) +} + +// Wipes all items from the map +func (m *ConcurrentMap) Flush() int { + m.mutex.Lock() + size := len(m.items) + m.items = make(map[string]interface{}) + m.mutex.Unlock() + return size +} + +var ErrBreakWalk = errors.New("Break walk.") + +// break walk when walker fuc return an error +type MapWalker func(k string, v interface{}) (e error) + +// MUST NOT add or delete item in walker +func (m *ConcurrentMap) Walk(mw MapWalker) (e error) { + m.mutex.RLock() + defer m.mutex.RUnlock() + for k, v := range m.items { + if e = mw(k, v); e != nil { + return e + } + } + return +} + +func (m *ConcurrentMap) Keys() (keys []string) { + m.mutex.RLock() + keys = make([]string, 0, len(m.items)) + for key := range m.items { + keys = append(keys, key) + } + m.mutex.RUnlock() + return +} + +// Item is a pair of key and value +type Item struct { + Key string + Value interface{} +} + +// Return a channel from which each item (key:value pair) in the map can be read +// You can't break the iterator +func (m *ConcurrentMap) IterItems() <-chan Item { + ch := make(chan Item) + go func() { + m.mutex.RLock() + for key, value := range m.items { + ch <- Item{key, value} + } + m.mutex.RUnlock() + close(ch) + }() + return ch +} diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go deleted file mode 100644 index 9e9e7f438..000000000 --- a/go/util/concurrent_read_map.go +++ /dev/null @@ -1,37 +0,0 @@ -package util - -import ( - "sync" -) - -// A mostly for read map, which can thread-safely -// initialize the map entries. -type ConcurrentReadMap struct { - rwmutex sync.RWMutex - Items map[string]interface{} -} - -func NewConcurrentReadMap() *ConcurrentReadMap { - return &ConcurrentReadMap{Items: make(map[string]interface{})} -} - -func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { - m.rwmutex.Lock() - defer m.rwmutex.Unlock() - if value, ok := m.Items[key]; ok { - return value - } - value = newEntry() - m.Items[key] = value - return value -} - -func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { - m.rwmutex.RLock() - value, ok := m.Items[key] - m.rwmutex.RUnlock() - if ok { - return value - } - return m.initMapEntry(key, newEntry) -}