Browse Source

ensure properly make volume unwritable. Previously passing volumeInfo

reference, causing make wrong volumes unwritable!
pull/2/head
Chris Lu 12 years ago
parent
commit
e19edccf3a
  1. 2
      weed-fs/src/pkg/topology/node.go
  2. 4
      weed-fs/src/pkg/topology/topology.go
  3. 8
      weed-fs/src/pkg/topology/topology_event_handling.go
  4. 7
      weed-fs/src/pkg/topology/volume_layout.go

2
weed-fs/src/pkg/topology/node.go

@ -165,7 +165,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
for _, v := range dn.volumes { for _, v := range dn.volumes {
if uint64(v.Size) >= volumeSizeLimit { if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- &v
n.GetTopology().chanFullVolumes <- v
} }
} }
} }

4
weed-fs/src/pkg/topology/topology.go

@ -23,7 +23,7 @@ type Topology struct {
chanDeadDataNodes chan *DataNode chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan *storage.VolumeInfo
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration configuration *Configuration
} }
@ -42,7 +42,7 @@ func NewTopology(id string, confFile string, dirname string, sequenceFilename st
t.chanDeadDataNodes = make(chan *DataNode) t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.chanFullVolumes = make(chan storage.VolumeInfo)
t.loadConfiguration(confFile) t.loadConfiguration(confFile)

8
weed-fs/src/pkg/topology/topology_event_handling.go

@ -26,7 +26,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
select { select {
case v := <-t.chanFullVolumes: case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v) t.SetVolumeCapacityFull(v)
fmt.Println("Volume", v, "is full!")
case dn := <-t.chanRecoveredDataNodes: case dn := <-t.chanRecoveredDataNodes:
t.RegisterRecoveredDataNode(dn) t.RegisterRecoveredDataNode(dn)
fmt.Println("DataNode", dn, "is back alive!") fmt.Println("DataNode", dn, "is back alive!")
@ -37,12 +36,15 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
} }
}() }()
} }
func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) {
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.RepType) vl := t.GetVolumeLayout(volumeInfo.RepType)
vl.SetVolumeCapacityFull(volumeInfo.Id)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
for _, dn := range vl.vid2location[volumeInfo.Id].list { for _, dn := range vl.vid2location[volumeInfo.Id].list {
dn.UpAdjustActiveVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1)
} }
return true
} }
func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {

7
weed-fs/src/pkg/topology/volume_layout.go

@ -38,8 +38,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
} }
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) (*[]*DataNode) {
return &vl.vid2location[vid].list
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) *[]*DataNode {
return &vl.vid2location[vid].list
} }
func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
@ -63,6 +63,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int {
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
for i, v := range vl.writables { for i, v := range vl.writables {
if v == vid { if v == vid {
fmt.Println("Volume", vid, "becomes unwritable")
vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
return true return true
} }
@ -75,6 +76,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return false return false
} }
} }
fmt.Println("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid) vl.writables = append(vl.writables, vid)
return true return true
} }
@ -91,7 +93,6 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
if vl.vid2location[vid].Add(dn) { if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
fmt.Println("Volume", vid, "becomes writable")
return vl.setVolumeWritable(vid) return vl.setVolumeWritable(vid)
} }
} }

Loading…
Cancel
Save