From 6181aa7594a7f8459910807e581d84adb2ab44c6 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:31:51 +0500 Subject: [PATCH] fix: fs verify error counter (#5261) --- weed/shell/command_fs_verify.go | 71 +++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index e59c43fd2..32d498202 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/util" + "go.uber.org/atomic" "golang.org/x/exp/slices" "io" "math" @@ -137,7 +138,7 @@ type ItemEntry struct { path util.FullPath } -func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) { +func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) { timeNowAtSec := time.Now().Unix() return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { @@ -160,19 +161,24 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo return nil }, func(outputChan chan interface{}) { + var wg sync.WaitGroup + itemErrCount := atomic.NewUint64(0) for itemEntry := range outputChan { i := itemEntry.(*ItemEntry) itemPath := string(i.path) fileMsg := fmt.Sprintf("file:%s", itemPath) - errItem := make(map[string]error) - errItemLock := sync.RWMutex{} + itemIsVerifed := atomic.NewBool(true) for _, chunk := range i.chunks { if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { for _, volumeServer := range volumeIds { if *c.concurrency == 0 { if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n", - fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + itemErrCount.Add(1) + } } continue } @@ -180,43 +186,48 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo waitChan, ok := c.waitChan[string(volumeServer)] c.waitChanLock.RUnlock() if !ok { - fmt.Fprintf(c.writer, "%s failed to get channel for %s chunk: %d:%d: %+v\n", - string(volumeServer), fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n", + string(volumeServer), fileMsg, chunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + itemErrCount.Add(1) + } continue } + wg.Add(1) waitChan <- struct{}{} - go func(fId *filer_pb.FileId, path string, volumeServer pb.ServerAddress, msg string) { - if err = c.verifyEntry(volumeServer, fId); err != nil { - errItemLock.Lock() - errItem[path] = err - fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n", - msg, fId.VolumeId, fId.FileKey, err) - errItemLock.Unlock() + go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { + defer wg.Done() + if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + msg, fChunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + itemErrCount.Add(1) + } } <-waitChan - }(chunk.Fid, itemPath, volumeServer, fileMsg) + }(chunk, itemPath, volumeServer, fileMsg) } } else { err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) - fmt.Fprintf(c.writer, "%s %d:%d: %+v\n", - fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err) + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + itemErrCount.Add(1) + } break } } - errItemLock.RLock() - err, _ = errItem[itemPath] - errItemLock.RUnlock() - - if err != nil { - errCount++ - continue - } - - if *c.verbose { - fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks)) + if itemIsVerifed.Load() { + if *c.verbose { + fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks)) + } + fileCount++ } - fileCount++ } + wg.Wait() + errCount = itemErrCount.Load() }) - }