From e2076201d79e84df0407a9e1e535bf966452bba0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 17 Dec 2020 13:03:39 -0800 Subject: [PATCH] volume: avoid reprocessing the same volume fix https://github.com/chrislusf/seaweedfs/issues/1682 --- weed/storage/disk_location.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 2d4d120af..9b2ab69fe 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -53,7 +53,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32 } func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) { - if strings.HasSuffix(filename, ".idx") || strings.HasSuffix(filename, ".vif") { + if isValidVolume(filename) { base := filename[:len(filename)-4] collection, volumeId, err := parseCollectionVolumeId(base) return volumeId, collection, err @@ -71,15 +71,26 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI return collection, vol, err } +func isValidVolume(basename string) bool { + return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif") +} + +func getValidVolumeName(basename string) string { + if isValidVolume(basename) { + return basename[:len(basename)-4] + } + return "" +} + func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { basename := fileInfo.Name() if fileInfo.IsDir() { return false } - if !strings.HasSuffix(basename, ".idx") && !strings.HasSuffix(basename, ".vif") { + volumeName := getValidVolumeName(basename) + if volumeName == "" { return false } - volumeName := basename[:len(basename)-4] // check for incomplete volume noteFile := l.Directory + "/" + volumeName + ".note" @@ -126,9 +137,17 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con task_queue := make(chan os.FileInfo, 10*concurrency) go func() { + foundVolumeNames := make(map[string]bool) if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { for _, fi := range fileInfos { - task_queue <- fi + volumeName := getValidVolumeName(fi.Name()) + if volumeName == "" { + continue + } + if _, found := foundVolumeNames[volumeName]; !found { + foundVolumeNames[volumeName] = true + task_queue <- fi + } } } close(task_queue)