Browse Source

handle volume server up/down events

pull/2/head
Chris Lu 12 years ago
parent
commit
fc9f1da143
  1. 52
      weed-fs/src/pkg/topology/topology.go
  2. 56
      weed-fs/src/pkg/topology/topology_event_handling.go
  3. 34
      weed-fs/src/pkg/topology/volume_layout.go
  4. 25
      weed-fs/src/pkg/topology/volume_location.go

52
weed-fs/src/pkg/topology/topology.go

@ -2,12 +2,10 @@ package topology
import ( import (
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"pkg/directory" "pkg/directory"
"pkg/sequence" "pkg/sequence"
"pkg/storage" "pkg/storage"
"time"
) )
type Topology struct { type Topology struct {
@ -142,53 +140,3 @@ func (t *Topology) ToMap() interface{} {
m["layouts"] = layouts m["layouts"] = layouts
return m return m
} }
func (t *Topology) StartRefreshWritableVolumes() {
go func() {
for {
freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
}
}()
go func() {
for {
select {
case v := <-t.chanIncomplemteVolumes:
fmt.Println("Volume", v, "is incomplete!")
case v := <-t.chanRecoveredVolumes:
fmt.Println("Volume", v, "is recovered!")
case v := <-t.chanFullVolumes:
t.SetVolumeReadOnly(v)
fmt.Println("Volume", v, "is full!")
case dn := <-t.chanRecoveredDataNodes:
t.RegisterRecoveredDataNode(dn)
fmt.Println("DataNode", dn, "is back alive!")
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
fmt.Println("DataNode", dn, "is dead!")
}
}
}()
}
func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
vl := t.GetVolumeLayout(volumeInfo.RepType)
vl.SetVolumeReadOnly(volumeInfo.Id)
}
func (t *Topology) SetVolumeWritable(volumeInfo *storage.VolumeInfo) {
vl := t.GetVolumeLayout(volumeInfo.RepType)
vl.SetVolumeWritable(volumeInfo.Id)
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
t.SetVolumeReadOnly(&v)
}
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
if uint64(v.Size) < t.volumeSizeLimit {
t.SetVolumeWritable(&v)
}
}
}

56
weed-fs/src/pkg/topology/topology_event_handling.go

@ -0,0 +1,56 @@
package topology
import (
"fmt"
"math/rand"
"pkg/storage"
"time"
)
func (t *Topology) StartRefreshWritableVolumes() {
go func() {
for {
freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
}
}()
go func() {
for {
select {
case v := <-t.chanIncomplemteVolumes:
fmt.Println("Volume", v, "is incomplete!")
case v := <-t.chanRecoveredVolumes:
fmt.Println("Volume", v, "is recovered!")
case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v)
fmt.Println("Volume", v, "is full!")
case dn := <-t.chanRecoveredDataNodes:
t.RegisterRecoveredDataNode(dn)
fmt.Println("DataNode", dn, "is back alive!")
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
fmt.Println("DataNode", dn, "is dead!")
}
}
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) {
vl := t.GetVolumeLayout(volumeInfo.RepType)
vl.SetVolumeCapacityFull(volumeInfo.Id)
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.RepType)
vl.SetVolumeUnavailable(dn, v.Id)
}
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
if uint64(v.Size) < t.volumeSizeLimit {
vl := t.GetVolumeLayout(v.RepType)
vl.SetVolumeAvailable(dn, v.Id)
}
}
}

34
weed-fs/src/pkg/topology/volume_layout.go

@ -9,7 +9,7 @@ import (
type VolumeLayout struct { type VolumeLayout struct {
repType storage.ReplicationType repType storage.ReplicationType
vid2location map[storage.VolumeId]*DataNodeLocationList
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id writables []storage.VolumeId // transient array of writable volume id
pulse int64 pulse int64
volumeSizeLimit uint64 volumeSizeLimit uint64
@ -18,7 +18,7 @@ type VolumeLayout struct {
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
return &VolumeLayout{ return &VolumeLayout{
repType: repType, repType: repType,
vid2location: make(map[storage.VolumeId]*DataNodeLocationList),
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId), writables: *new([]storage.VolumeId),
pulse: pulse, pulse: pulse,
volumeSizeLimit: volumeSizeLimit, volumeSizeLimit: volumeSizeLimit,
@ -27,7 +27,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if _, ok := vl.vid2location[v.Id]; !ok { if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewDataNodeLocationList()
vl.vid2location[v.Id] = NewVolumeLocationList()
} }
if vl.vid2location[v.Id].Add(dn) { if vl.vid2location[v.Id].Add(dn) {
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
@ -38,7 +38,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
} }
func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *DataNodeLocationList, error) {
func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables) len_writers := len(vl.writables)
if len_writers <= 0 { if len_writers <= 0 {
fmt.Println("No more writable volumes!") fmt.Println("No more writable volumes!")
@ -56,7 +56,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int {
return len(vl.writables) return len(vl.writables)
} }
func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool {
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
for i, v := range vl.writables { for i, v := range vl.writables {
if v == vid { if v == vid {
vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
@ -65,8 +65,7 @@ func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool {
} }
return false return false
} }
func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool {
func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
for _, v := range vl.writables { for _, v := range vl.writables {
if v == vid { if v == vid {
return false return false
@ -76,6 +75,27 @@ func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool {
return true return true
} }
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
if vl.vid2location[vid].Remove(dn) {
if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
return vl.removeFromWritable(vid)
}
}
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
return vl.setVolumeWritable(vid)
}
}
return false
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
return vl.removeFromWritable(vid)
}
func (vl *VolumeLayout) ToMap() interface{} { func (vl *VolumeLayout) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["replication"] = vl.repType.String() m["replication"] = vl.repType.String()

25
weed-fs/src/pkg/topology/volume_location.go

@ -2,19 +2,23 @@ package topology
import () import ()
type DataNodeLocationList struct {
type VolumeLocationList struct {
list []*DataNode list []*DataNode
} }
func NewDataNodeLocationList() *DataNodeLocationList {
return &DataNodeLocationList{}
func NewVolumeLocationList() *VolumeLocationList {
return &VolumeLocationList{}
} }
func (dnll *DataNodeLocationList) Head() *DataNode {
func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0] return dnll.list[0]
} }
func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port { if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
return false return false
@ -23,8 +27,17 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
dnll.list = append(dnll.list, loc) dnll.list = append(dnll.list, loc)
return true return true
} }
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
dnll.list = append(dnll.list[:i],dnll.list[i+1:]...)
return true
}
}
return false
}
func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
var changed bool var changed bool
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold { if dnl.LastSeen < freshThreshHold {

Loading…
Cancel
Save