diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index fc2c902b0..8dcd64dca 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -2,12 +2,10 @@ package topology import ( "errors" - "fmt" "math/rand" "pkg/directory" "pkg/sequence" "pkg/storage" - "time" ) type Topology struct { @@ -142,53 +140,3 @@ func (t *Topology) ToMap() interface{} { m["layouts"] = layouts return m } - -func (t *Topology) StartRefreshWritableVolumes() { - go func() { - for { - freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval - t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) - time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - go func() { - for { - select { - case v := <-t.chanIncomplemteVolumes: - fmt.Println("Volume", v, "is incomplete!") - case v := <-t.chanRecoveredVolumes: - fmt.Println("Volume", v, "is recovered!") - case v := <-t.chanFullVolumes: - t.SetVolumeReadOnly(v) - fmt.Println("Volume", v, "is full!") - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - fmt.Println("DataNode", dn, "is back alive!") - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - fmt.Println("DataNode", dn, "is dead!") - } - } - }() -} -func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) { - vl := t.GetVolumeLayout(volumeInfo.RepType) - vl.SetVolumeReadOnly(volumeInfo.Id) -} -func (t *Topology) SetVolumeWritable(volumeInfo *storage.VolumeInfo) { - vl := t.GetVolumeLayout(volumeInfo.RepType) - vl.SetVolumeWritable(volumeInfo.Id) -} -func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.volumes { - fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) - t.SetVolumeReadOnly(&v) - } -} -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.volumes { - if uint64(v.Size) < t.volumeSizeLimit { - t.SetVolumeWritable(&v) - } - } -} diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go new file mode 100644 index 000000000..813826a61 --- /dev/null +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -0,0 +1,56 @@ +package topology + +import ( + "fmt" + "math/rand" + "pkg/storage" + "time" +) + +func (t *Topology) StartRefreshWritableVolumes() { + go func() { + for { + freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func() { + for { + select { + case v := <-t.chanIncomplemteVolumes: + fmt.Println("Volume", v, "is incomplete!") + case v := <-t.chanRecoveredVolumes: + fmt.Println("Volume", v, "is recovered!") + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + fmt.Println("Volume", v, "is full!") + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + fmt.Println("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + fmt.Println("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) { + vl := t.GetVolumeLayout(volumeInfo.RepType) + vl.SetVolumeCapacityFull(volumeInfo.Id) +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.volumes { + fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.RepType) + vl.SetVolumeUnavailable(dn, v.Id) + } +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.volumes { + if uint64(v.Size) < t.volumeSizeLimit { + vl := t.GetVolumeLayout(v.RepType) + vl.SetVolumeAvailable(dn, v.Id) + } + } +} diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 0c3841c72..ba091b9bb 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -9,7 +9,7 @@ import ( type VolumeLayout struct { repType storage.ReplicationType - vid2location map[storage.VolumeId]*DataNodeLocationList + vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id pulse int64 volumeSizeLimit uint64 @@ -18,7 +18,7 @@ type VolumeLayout struct { func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { return &VolumeLayout{ repType: repType, - vid2location: make(map[storage.VolumeId]*DataNodeLocationList), + vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), pulse: pulse, volumeSizeLimit: volumeSizeLimit, @@ -27,7 +27,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if _, ok := vl.vid2location[v.Id]; !ok { - vl.vid2location[v.Id] = NewDataNodeLocationList() + vl.vid2location[v.Id] = NewVolumeLocationList() } if vl.vid2location[v.Id].Add(dn) { if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { @@ -38,7 +38,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } -func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *DataNodeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { fmt.Println("No more writable volumes!") @@ -56,24 +56,44 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int { return len(vl.writables) } -func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool { - for i, v := range vl.writables{ - if v == vid { - vl.writables = append(vl.writables[:i],vl.writables[i+1:]...) - return true +func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { + for i, v := range vl.writables { + if v == vid { + vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) + return true + } + } + return false +} +func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables { + if v == vid { + return false + } + } + vl.writables = append(vl.writables, vid) + return true +} + +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Remove(dn) { + if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { + return vl.removeFromWritable(vid) } } return false } +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Add(dn) { + if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + return vl.setVolumeWritable(vid) + } + } + return false +} -func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool { - for _, v := range vl.writables{ - if v == vid { - return false - } - } - vl.writables = append(vl.writables, vid) - return true +func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { + return vl.removeFromWritable(vid) } func (vl *VolumeLayout) ToMap() interface{} { diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index f2e5dd894..16afb2dfb 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -2,19 +2,23 @@ package topology import () -type DataNodeLocationList struct { +type VolumeLocationList struct { list []*DataNode } -func NewDataNodeLocationList() *DataNodeLocationList { - return &DataNodeLocationList{} +func NewVolumeLocationList() *VolumeLocationList { + return &VolumeLocationList{} } -func (dnll *DataNodeLocationList) Head() *DataNode { +func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } -func (dnll *DataNodeLocationList) Add(loc *DataNode) bool { +func (dnll *VolumeLocationList) Length() int { + return len(dnll.list) +} + +func (dnll *VolumeLocationList) Add(loc *DataNode) bool { for _, dnl := range dnll.list { if loc.Ip == dnl.Ip && loc.Port == dnl.Port { return false @@ -23,8 +27,17 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool { dnll.list = append(dnll.list, loc) return true } +func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i],dnll.list[i+1:]...) + return true + } + } + return false +} -func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { +func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { var changed bool for _, dnl := range dnll.list { if dnl.LastSeen < freshThreshHold {