Browse Source

Add read-write lock to guard topology changes on new collections and ttls.

pull/38/head
chrislusf 10 years ago
parent
commit
52180f386b
  1. 21
      go/topology/collection.go
  2. 29
      go/topology/topology.go
  3. 7
      go/topology/topology_map.go
  4. 14
      go/topology/topology_vacuum.go
  5. 37
      go/util/concurrent_read_map.go

21
go/topology/collection.go

@ -2,17 +2,18 @@ package topology
import ( import (
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
) )
type Collection struct { type Collection struct {
Name string Name string
volumeSizeLimit uint64 volumeSizeLimit uint64
storageType2VolumeLayout map[string]*VolumeLayout
storageType2VolumeLayout *util.ConcurrentReadMap
} }
func NewCollection(name string, volumeSizeLimit uint64) *Collection { func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c return c
} }
@ -21,16 +22,16 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
if ttl != nil { if ttl != nil {
keyString += ttl.String() keyString += ttl.String()
} }
if c.storageType2VolumeLayout[keyString] == nil {
c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
}
return c.storageType2VolumeLayout[keyString]
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
} }
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
if list := vl.Lookup(vid); list != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list return list
} }
} }
@ -39,9 +40,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
} }
func (c *Collection) ListVolumeServers() (nodes []*DataNode) { func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.storageType2VolumeLayout {
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...) nodes = append(nodes, list...)
} }
} }

29
go/topology/topology.go

@ -9,13 +9,14 @@ import (
"github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft" "github.com/goraft/raft"
) )
type Topology struct { type Topology struct {
NodeImpl NodeImpl
collectionMap map[string]*Collection
collectionMap *util.ConcurrentReadMap
pulse int64 pulse int64
@ -38,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology" t.nodeType = "Topology"
t.NodeImpl.value = t t.NodeImpl.value = t
t.children = make(map[NodeId]Node) t.children = make(map[NodeId]Node)
t.collectionMap = make(map[string]*Collection)
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse) t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit t.volumeSizeLimit = volumeSizeLimit
@ -90,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections? //maybe an issue if lots of collections?
if collection == "" { if collection == "" {
for _, c := range t.collectionMap {
if list := c.Lookup(vid); list != nil {
for _, c := range t.collectionMap.Items {
if list := c.(*Collection).Lookup(vid); list != nil {
return list return list
} }
} }
} else { } else {
if c, ok := t.collectionMap[collection]; ok {
return c.Lookup(vid)
if c, ok := t.collectionMap.Items[collection]; ok {
return c.(*Collection).Lookup(vid)
} }
} }
return nil return nil
@ -125,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
} }
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
} }
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
collection, ok = t.collectionMap[collectionName]
return
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Items[collectionName]
return c.(*Collection), hasCollection
} }
func (t *Topology) DeleteCollection(collectionName string) { func (t *Topology) DeleteCollection(collectionName string) {
delete(t.collectionMap, collectionName)
delete(t.collectionMap.Items, collectionName)
} }
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {

7
go/topology/topology_map.go

@ -11,10 +11,11 @@ func (t *Topology) ToMap() interface{} {
} }
m["DataCenters"] = dcs m["DataCenters"] = dcs
var layouts []interface{} var layouts []interface{}
for _, c := range t.collectionMap {
for _, layout := range c.storageType2VolumeLayout {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items {
if layout != nil { if layout != nil {
tmp := layout.ToMap()
tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name tmp["collection"] = c.Name
layouts = append(layouts, tmp) layouts = append(layouts, tmp)
} }

14
go/topology/topology_vacuum.go

@ -80,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess return isCommitSuccess
} }
func (t *Topology) Vacuum(garbageThreshold string) int { func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
for _, vl := range c.storageType2VolumeLayout {
for _, col := range t.collectionMap.Items {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(vl, vid, locationlist) {
batchVacuumVolumeCommit(vl, vid, locationlist)
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
} }
} }
} }

37
go/util/concurrent_read_map.go

@ -0,0 +1,37 @@
package util
import "sync"
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
rmutex sync.RWMutex
mutex sync.Mutex
Items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
return &ConcurrentReadMap{Items: make(map[string]interface{})}
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
if value, ok := m.Items[key]; ok {
return value
}
value = newEntry()
m.Items[key] = value
return value
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rmutex.RLock()
if value, ok := m.Items[key]; ok {
m.rmutex.RUnlock()
return value
} else {
m.rmutex.RUnlock()
return m.initMapEntry(key, newEntry)
}
}
Loading…
Cancel
Save