Browse Source

lock dn.volumes

pull/306/head
Chris Lu 9 years ago
parent
commit
57c85adc53
  1. 17
      go/topology/data_node.go
  2. 2
      go/topology/node.go
  3. 4
      go/topology/topology_event_handling.go
  4. 2
      go/topology/topology_map.go

17
go/topology/data_node.go

@ -3,6 +3,7 @@ package topology
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"sync"
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/storage"
@ -10,6 +11,7 @@ import (
type DataNode struct { type DataNode struct {
NodeImpl NodeImpl
sync.RWMutex
volumes map[storage.VolumeId]storage.VolumeInfo volumes map[storage.VolumeId]storage.VolumeInfo
Ip string Ip string
Port int Port int
@ -28,10 +30,14 @@ func NewDataNode(id string) *DataNode {
} }
func (dn *DataNode) String() string { func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
} }
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.Lock()
defer dn.Unlock()
if _, ok := dn.volumes[v.Id]; !ok { if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1) dn.UpAdjustVolumeCountDelta(1)
@ -49,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
for _, v := range actualVolumes { for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v actualVolumeMap[v.Id] = v
} }
dn.RLock()
for vid, v := range dn.volumes { for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok { if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid) glog.V(0).Infoln("Deleting volume id:", vid)
@ -58,12 +65,22 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
dn.UpAdjustActiveVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1)
} }
} //TODO: adjust max volume id, if need to reclaim volume ids } //TODO: adjust max volume id, if need to reclaim volume ids
dn.RUnlock()
for _, v := range actualVolumes { for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v) dn.AddOrUpdateVolume(v)
} }
return return
} }
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
dn.RLock()
for _, v := range dn.volumes {
ret = append(ret, v)
}
dn.RUnlock()
return ret
}
func (dn *DataNode) GetDataCenter() *DataCenter { func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
} }

2
go/topology/node.go

@ -231,7 +231,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
n.GetTopology().chanDeadDataNodes <- dn n.GetTopology().chanDeadDataNodes <- dn
} }
} }
for _, v := range dn.volumes {
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit { if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v n.GetTopology().chanFullVolumes <- v

4
go/topology/topology_event_handling.go

@ -54,7 +54,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
return true return true
} }
func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
for _, v := range dn.GetVolumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
dn.Parent().UnlinkChildNode(dn.Id()) dn.Parent().UnlinkChildNode(dn.Id())
} }
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
for _, v := range dn.GetVolumes() {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) { if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id) vl.SetVolumeAvailable(dn, v.Id)

2
go/topology/topology_map.go

@ -39,7 +39,7 @@ func (t *Topology) ToVolumeMap() interface{} {
for _, d := range rack.Children() { for _, d := range rack.Children() {
dn := d.(*DataNode) dn := d.(*DataNode)
var volumes []interface{} var volumes []interface{}
for _, v := range dn.volumes {
for _, v := range dn.GetVolumes() {
volumes = append(volumes, v) volumes = append(volumes, v)
} }
dataNodes[d.Id()] = volumes dataNodes[d.Id()] = volumes

Loading…
Cancel
Save