diff --git a/go/topology/collection.go b/go/topology/collection.go index 506f43fbf..4b47ae88a 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -2,17 +2,18 @@ package topology import ( "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) type Collection struct { Name string volumeSizeLimit uint64 - storageType2VolumeLayout map[string]*VolumeLayout + storageType2VolumeLayout *util.ConcurrentReadMap } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.storageType2VolumeLayout = make(map[string]*VolumeLayout) + c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } @@ -21,16 +22,16 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl * if ttl != nil { keyString += ttl.String() } - if c.storageType2VolumeLayout[keyString] == nil { - c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) - } - return c.storageType2VolumeLayout[keyString] + vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + }) + return vl.(*VolumeLayout) } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.Lookup(vid); list != nil { + if list := vl.(*VolumeLayout).Lookup(vid); list != nil { return list } } @@ -39,9 +40,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.ListVolumeServers(); list != nil { + if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { nodes = append(nodes, list...) } } diff --git a/go/topology/topology.go b/go/topology/topology.go index eb64d336c..c2073ed2f 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -9,13 +9,14 @@ import ( "github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" "github.com/goraft/raft" ) type Topology struct { NodeImpl - collectionMap map[string]*Collection + collectionMap *util.ConcurrentReadMap pulse int64 @@ -38,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.collectionMap = make(map[string]*Collection) + t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -90,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { - for _, c := range t.collectionMap { - if list := c.Lookup(vid); list != nil { + for _, c := range t.collectionMap.Items { + if list := c.(*Collection).Lookup(vid); list != nil { return list } } } else { - if c, ok := t.collectionMap[collection]; ok { - return c.Lookup(vid) + if c, ok := t.collectionMap.Items[collection]; ok { + return c.(*Collection).Lookup(vid) } } return nil @@ -125,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in } func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - _, ok := t.collectionMap[collectionName] - if !ok { - t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) - } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) + return t.collectionMap.Get(collectionName, func() interface{} { + return NewCollection(collectionName, t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } -func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { - collection, ok = t.collectionMap[collectionName] - return +func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { + c, hasCollection := t.collectionMap.Items[collectionName] + return c.(*Collection), hasCollection } func (t *Topology) DeleteCollection(collectionName string) { - delete(t.collectionMap, collectionName) + delete(t.collectionMap.Items, collectionName) } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index af95c6536..6a1423ca8 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -11,10 +11,11 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, c := range t.collectionMap { - for _, layout := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, layout := range c.storageType2VolumeLayout.Items { if layout != nil { - tmp := layout.ToMap() + tmp := layout.(*VolumeLayout).ToMap() tmp["collection"] = c.Name layouts = append(layouts, tmp) } diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 97a76026d..d6fa2213e 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -80,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis return isCommitSuccess } func (t *Topology) Vacuum(garbageThreshold string) int { - for _, c := range t.collectionMap { - for _, vl := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) + volumeLayout := vl.(*VolumeLayout) + for vid, locationlist := range volumeLayout.vid2location { + if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { + batchVacuumVolumeCommit(volumeLayout, vid, locationlist) } } } diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go new file mode 100644 index 000000000..d16fdbcaf --- /dev/null +++ b/go/util/concurrent_read_map.go @@ -0,0 +1,37 @@ +package util + +import "sync" + +// A mostly for read map, which can thread-safely +// initialize the map entries. +type ConcurrentReadMap struct { + rmutex sync.RWMutex + mutex sync.Mutex + Items map[string]interface{} +} + +func NewConcurrentReadMap() *ConcurrentReadMap { + return &ConcurrentReadMap{Items: make(map[string]interface{})} +} + +func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + if value, ok := m.Items[key]; ok { + return value + } + value = newEntry() + m.Items[key] = value + return value +} + +func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { + m.rmutex.RLock() + if value, ok := m.Items[key]; ok { + m.rmutex.RUnlock() + return value + } else { + m.rmutex.RUnlock() + return m.initMapEntry(key, newEntry) + } +}