Browse Source

rwlock concurrent read map

pull/312/head
Chris Lu 9 years ago
parent
commit
6df18a9181
  1. 4
      go/topology/collection.go
  2. 2
      go/topology/data_node.go
  3. 10
      go/topology/topology.go
  4. 4
      go/topology/topology_map.go
  5. 4
      go/topology/topology_vacuum.go
  6. 44
      go/util/concurrent_read_map.go
  7. 2
      go/weed/weed_server/master_server_handlers_admin.go

4
go/topology/collection.go

@ -35,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
} }
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil { if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list return list
@ -46,7 +46,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
} }
func (c *Collection) ListVolumeServers() (nodes []*DataNode) { func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.storageType2VolumeLayout.Items {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil { if vl != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...) nodes = append(nodes, list...)

2
go/topology/data_node.go

@ -3,7 +3,6 @@ package topology
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"sync"
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/storage"
@ -11,7 +10,6 @@ import (
type DataNode struct { type DataNode struct {
NodeImpl NodeImpl
sync.RWMutex
volumes map[storage.VolumeId]storage.VolumeInfo volumes map[storage.VolumeId]storage.VolumeInfo
Ip string Ip string
Port int Port int

10
go/topology/topology.go

@ -90,13 +90,13 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections? //maybe an issue if lots of collections?
if collection == "" { if collection == "" {
for _, c := range t.collectionMap.Items {
for _, c := range t.collectionMap.Items() {
if list := c.(*Collection).Lookup(vid); list != nil { if list := c.(*Collection).Lookup(vid); list != nil {
return list return list
} }
} }
} else { } else {
if c, ok := t.collectionMap.Items[collection]; ok {
if c, ok := t.collectionMap.Find(collection); ok {
return c.(*Collection).Lookup(vid) return c.(*Collection).Lookup(vid)
} }
} }
@ -130,13 +130,13 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
} }
func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Items[collectionName]
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Find(collectionName)
return c.(*Collection), hasCollection return c.(*Collection), hasCollection
} }
func (t *Topology) DeleteCollection(collectionName string) { func (t *Topology) DeleteCollection(collectionName string) {
delete(t.collectionMap.Items, collectionName)
t.collectionMap.Delete(collectionName)
} }
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {

4
go/topology/topology_map.go

@ -11,9 +11,9 @@ func (t *Topology) ToMap() interface{} {
} }
m["DataCenters"] = dcs m["DataCenters"] = dcs
var layouts []interface{} var layouts []interface{}
for _, col := range t.collectionMap.Items {
for _, col := range t.collectionMap.Items() {
c := col.(*Collection) c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items {
for _, layout := range c.storageType2VolumeLayout.Items() {
if layout != nil { if layout != nil {
tmp := layout.(*VolumeLayout).ToMap() tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name tmp["collection"] = c.Name

4
go/topology/topology_vacuum.go

@ -81,10 +81,10 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
} }
func (t *Topology) Vacuum(garbageThreshold string) int { func (t *Topology) Vacuum(garbageThreshold string) int {
glog.V(0).Infoln("Start vacuum on demand") glog.V(0).Infoln("Start vacuum on demand")
for _, col := range t.collectionMap.Items {
for _, col := range t.collectionMap.Items() {
c := col.(*Collection) c := col.(*Collection)
glog.V(0).Infoln("vacuum on collection:", c.Name) glog.V(0).Infoln("vacuum on collection:", c.Name)
for _, vl := range c.storageType2VolumeLayout.Items {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil { if vl != nil {
volumeLayout := vl.(*VolumeLayout) volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location { for vid, locationlist := range volumeLayout.vid2location {

44
go/util/concurrent_read_map.go

@ -7,31 +7,53 @@ import (
// A mostly for read map, which can thread-safely // A mostly for read map, which can thread-safely
// initialize the map entries. // initialize the map entries.
type ConcurrentReadMap struct { type ConcurrentReadMap struct {
rmutex sync.RWMutex
Items map[string]interface{}
sync.RWMutex
items map[string]interface{}
} }
func NewConcurrentReadMap() *ConcurrentReadMap { func NewConcurrentReadMap() *ConcurrentReadMap {
return &ConcurrentReadMap{Items: make(map[string]interface{})}
return &ConcurrentReadMap{items: make(map[string]interface{})}
} }
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) { func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
m.rmutex.Lock()
defer m.rmutex.Unlock()
if value, ok := m.Items[key]; ok {
m.Lock()
defer m.Unlock()
if value, ok := m.items[key]; ok {
return value return value
} }
value = newEntry() value = newEntry()
m.Items[key] = value
m.items[key] = value
return value return value
} }
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} { func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
m.rmutex.RLock()
if value, ok := m.Items[key]; ok {
m.rmutex.RUnlock()
m.RLock()
if value, ok := m.items[key]; ok {
m.RUnlock()
return value return value
} }
m.rmutex.RUnlock()
m.RUnlock()
return m.initMapEntry(key, newEntry) return m.initMapEntry(key, newEntry)
} }
func (m *ConcurrentReadMap) Find(key string) (interface{}, bool) {
m.RLock()
value, ok := m.items[key]
m.RUnlock()
return value, ok
}
func (m *ConcurrentReadMap) Items() (itemsCopy []interface{}) {
m.RLock()
for _, i := range m.items {
itemsCopy = append(itemsCopy, i)
}
return itemsCopy
}
func (m *ConcurrentReadMap) Delete(key string) {
m.Lock()
delete(m.items, key)
m.Unlock()
}

2
go/weed/weed_server/master_server_handlers_admin.go

@ -19,7 +19,7 @@ import (
) )
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
if !ok { if !ok {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
return return

Loading…
Cancel
Save