From 7988ee08057afd1da73dc8981895e7930b55ec60 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 19 Jun 2024 18:50:55 +0500 Subject: [PATCH] [fs.verify] skip failed files if entry not found on filerStore (#5693) --- weed/pb/filer_pb_tail.go | 3 ++ weed/shell/command_fs_verify.go | 75 ++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index b54dad871..b7cca7585 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -17,6 +17,7 @@ const ( TrivialOnError EventErrorType = iota FatalOnError RetryForeverOnError + DontLogError ) // MetadataFollowOption is used to control the behavior of the metadata following @@ -96,6 +97,8 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc glog.Errorf("process %v: %v", resp, err) return true }) + case DontLogError: + // pass default: glog.Errorf("process %v: %v", resp, err) } diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 1337ae090..b30020c40 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -1,6 +1,7 @@ package shell import ( + "bytes" "context" "flag" "fmt" @@ -89,17 +90,18 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr defer close(c.waitChan[volumeServerStr]) } } - var fCount, eConut uint64 + var fCount, eCount uint64 if *c.metadataFromLog { - itemErrCount := atomic.NewUint64(0) var wg sync.WaitGroup - fCount, err = c.verifyProcessMetadata(path, itemErrCount, &wg) + fCount, eCount, err = c.verifyProcessMetadata(path, &wg) wg.Wait() - eConut = itemErrCount.Load() + if err != nil { + return err + } } else { - fCount, eConut, err = c.verifyTraverseBfs(path) + fCount, eCount, err = c.verifyTraverseBfs(path) } - fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) + fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eCount) return err } @@ -143,19 +145,44 @@ type ItemEntry struct { path util.FullPath } -func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.Uint64, wg *sync.WaitGroup) (fileCount uint64, err error) { +func (c *commandFsVerify) verifyProcessMetadata(path string, wg *sync.WaitGroup) (fileCount uint64, errCount uint64, err error) { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - if resp.EventNotification.NewEntry == nil || len(message.NewEntry.Chunks) == 0 { + if resp.EventNotification.NewEntry == nil { + return nil + } + chunkCount := len(message.NewEntry.Chunks) + if chunkCount == 0 { return nil } entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name) - if c.verifyEntry(entryPath, message.NewEntry.Chunks, errorCount, wg) { - if *c.verbose { - fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, len(message.NewEntry.Chunks)) + errorChunksCount := atomic.NewUint64(0) + if !c.verifyEntry(entryPath, message.NewEntry.Chunks, errorChunksCount, wg) { + if err = c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + entryResp, errReq := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: message.NewParentPath, + Name: message.NewEntry.Name, + }) + if strings.HasSuffix(errReq.Error(), "no entry is found in filer store") { + return nil + } else if errReq != nil { + return errReq + } + if entryResp.Entry.Attributes.Mtime == message.NewEntry.Attributes.Mtime && + bytes.Equal(entryResp.Entry.Attributes.Md5, message.NewEntry.Attributes.Md5) { + fmt.Fprintf(c.writer, "file: %s needles:%d failed:%d\n", entryPath, chunkCount, errorChunksCount.Load()) + errCount++ + } + return nil + }); err != nil { + return err } - fileCount++ + return nil + } + if *c.verbose { + fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, chunkCount) } + fileCount++ return nil } metadataFollowOption := &pb.MetadataFollowOption{ @@ -168,9 +195,9 @@ func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic. DirectoriesToWatch: nil, StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(), StopTsNs: time.Now().UnixNano(), - EventErrorType: pb.TrivialOnError, + EventErrorType: pb.DontLogError, } - return fileCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn) + return fileCount, errCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn) } func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool { @@ -181,8 +208,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, for _, volumeServer := range volumeIds { if *c.concurrency == 0 { if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) + if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + } if itemIsVerifed.Load() { itemIsVerifed.Store(false) errorCount.Add(1) @@ -207,8 +236,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { defer wg.Done() if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - msg, fChunk.GetFileIdString(), err) + if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + msg, fChunk.GetFileIdString(), err) + } if itemIsVerifed.Load() { itemIsVerifed.Store(false) errorCount.Add(1) @@ -218,9 +249,11 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, }(chunk, path, volumeServer, fileMsg) } } else { - err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) + if !*c.metadataFromLog { + err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + } if itemIsVerifed.Load() { itemIsVerifed.Store(false) errorCount.Add(1)