Browse Source

volume: avoid reprocessing the same volume

fix https://github.com/chrislusf/seaweedfs/issues/1682
pull/1690/head
Chris Lu 4 years ago
parent
commit
e2076201d7
  1. 25
      weed/storage/disk_location.go

25
weed/storage/disk_location.go

@ -53,7 +53,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
} }
func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) { func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
if strings.HasSuffix(filename, ".idx") || strings.HasSuffix(filename, ".vif") {
if isValidVolume(filename) {
base := filename[:len(filename)-4] base := filename[:len(filename)-4]
collection, volumeId, err := parseCollectionVolumeId(base) collection, volumeId, err := parseCollectionVolumeId(base)
return volumeId, collection, err return volumeId, collection, err
@ -71,15 +71,26 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
return collection, vol, err return collection, vol, err
} }
func isValidVolume(basename string) bool {
return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
}
func getValidVolumeName(basename string) string {
if isValidVolume(basename) {
return basename[:len(basename)-4]
}
return ""
}
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
basename := fileInfo.Name() basename := fileInfo.Name()
if fileInfo.IsDir() { if fileInfo.IsDir() {
return false return false
} }
if !strings.HasSuffix(basename, ".idx") && !strings.HasSuffix(basename, ".vif") {
volumeName := getValidVolumeName(basename)
if volumeName == "" {
return false return false
} }
volumeName := basename[:len(basename)-4]
// check for incomplete volume // check for incomplete volume
noteFile := l.Directory + "/" + volumeName + ".note" noteFile := l.Directory + "/" + volumeName + ".note"
@ -126,11 +137,19 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
task_queue := make(chan os.FileInfo, 10*concurrency) task_queue := make(chan os.FileInfo, 10*concurrency)
go func() { go func() {
foundVolumeNames := make(map[string]bool)
if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
for _, fi := range fileInfos { for _, fi := range fileInfos {
volumeName := getValidVolumeName(fi.Name())
if volumeName == "" {
continue
}
if _, found := foundVolumeNames[volumeName]; !found {
foundVolumeNames[volumeName] = true
task_queue <- fi task_queue <- fi
} }
} }
}
close(task_queue) close(task_queue)
}() }()

Loading…
Cancel
Save