Browse Source

volume: handling readonly volumes after compaction

ensure readonly volumes are not added as writable
pull/1255/head
Chris Lu 5 years ago
parent
commit
c90eb0da1f
  1. 7
      weed/topology/topology_vacuum.go
  2. 27
      weed/topology/volume_layout.go

7
weed/topology/topology_vacuum.go

@ -5,9 +5,10 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@ -105,7 +106,9 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
} else { } else {
glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
} }
if isCommitSuccess {
}
if isCommitSuccess {
for _, dn := range locationlist.list {
vl.SetVolumeAvailable(dn, vid) vl.SetVolumeAvailable(dn, vid)
} }
} }

27
weed/topology/volume_layout.go

@ -51,6 +51,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock() vl.accessLock.Lock()
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
defer vl.ensureCorrectWritables(v)
defer vl.rememberOversizedVolume(v)
if _, ok := vl.vid2location[v.Id]; !ok { if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList() vl.vid2location[v.Id] = NewVolumeLocationList()
} }
@ -74,9 +77,6 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
} }
} }
vl.rememberOversizedVolume(v)
vl.ensureCorrectWritables(v)
} }
func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
@ -109,22 +109,13 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok { if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.addToWritable(v.Id)
vl.setVolumeWritable(v.Id)
} }
} else { } else {
vl.removeFromWritable(v.Id) vl.removeFromWritable(v.Id)
} }
} }
func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
}
}
vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
return uint64(v.Size) >= vl.volumeSizeLimit return uint64(v.Size) >= vl.volumeSizeLimit
} }
@ -270,7 +261,17 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bo
vl.accessLock.Lock() vl.accessLock.Lock()
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
vInfo, err := dn.GetVolumesById(v.Id)
if err != nil {
return false
}
vl.vid2location[vid].Set(dn) vl.vid2location[vid].Set(dn)
if vInfo.ReadOnly {
return false
}
if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid) return vl.setVolumeWritable(vid)
} }

Loading…
Cancel
Save