Browse Source

volume: load volume can optionally be skipped, if ec volume exists

fix https://github.com/chrislusf/seaweedfs/issues/2489
pull/2500/head
chrislu 3 years ago
parent
commit
488afa5002
  1. 13
      weed/storage/disk_location.go

13
weed/storage/disk_location.go

@ -84,7 +84,7 @@ func getValidVolumeName(basename string) string {
return "" return ""
} }
func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind) bool {
func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool) bool {
basename := dirEntry.Name() basename := dirEntry.Name()
if dirEntry.IsDir() { if dirEntry.IsDir() {
return false return false
@ -94,10 +94,12 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false return false
} }
// skip ec volumes
// skip if ec volumes exists
if skipIfEcVolumesExists {
if util.FileExists(l.Directory + "/" + volumeName + ".ecx") { if util.FileExists(l.Directory + "/" + volumeName + ".ecx") {
return false return false
} }
}
// check for incomplete volume // check for incomplete volume
noteFile := l.Directory + "/" + volumeName + ".note" noteFile := l.Directory + "/" + volumeName + ".note"
@ -166,7 +168,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
go func() { go func() {
defer wg.Done() defer wg.Done()
for fi := range task_queue { for fi := range task_queue {
_ = l.loadExistingVolume(fi, needleMapKind)
_ = l.loadExistingVolume(fi, needleMapKind, true)
} }
}() }()
} }
@ -246,7 +248,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
if fileInfo, found := l.LocateVolume(vid); found { if fileInfo, found := l.LocateVolume(vid); found {
return l.loadExistingVolume(fileInfo, needleMapKind)
return l.loadExistingVolume(fileInfo, needleMapKind, false)
} }
return false return false
} }
@ -332,9 +334,12 @@ func (l *DiskLocation) Close() {
} }
func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) { func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
println("LocateVolume", vid, "on", l.Directory)
if dirEntries, err := os.ReadDir(l.Directory); err == nil { if dirEntries, err := os.ReadDir(l.Directory); err == nil {
for _, entry := range dirEntries { for _, entry := range dirEntries {
println("checking", entry.Name(), "...")
volId, _, err := volumeIdFromFileName(entry.Name()) volId, _, err := volumeIdFromFileName(entry.Name())
println("volId", volId, "err", err)
if vid == volId && err == nil { if vid == volId && err == nil {
return entry, true return entry, true
} }

Loading…
Cancel
Save