diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 32d498202..1337ae090 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -30,6 +30,7 @@ type commandFsVerify struct { volumeServers []pb.ServerAddress volumeIds map[uint32][]pb.ServerAddress verbose *bool + metadataFromLog *bool concurrency *int modifyTimeAgoAtSec int64 writer io.Writer @@ -56,7 +57,7 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files") modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify") c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server") - + c.metadataFromLog = fsVerifyCommand.Bool("metadataFromLog", false, "Using filer log to get metadata") if err = fsVerifyCommand.Parse(args); err != nil { return err } @@ -88,14 +89,18 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr defer close(c.waitChan[volumeServerStr]) } } - - fCount, eConut, terr := c.verifyTraverseBfs(path) - if terr == nil { - fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) + var fCount, eConut uint64 + if *c.metadataFromLog { + itemErrCount := atomic.NewUint64(0) + var wg sync.WaitGroup + fCount, err = c.verifyProcessMetadata(path, itemErrCount, &wg) + wg.Wait() + eConut = itemErrCount.Load() + } else { + fCount, eConut, err = c.verifyTraverseBfs(path) } - - return terr - + fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) + return err } func (c *commandFsVerify) collectVolumeIds() error { @@ -117,7 +122,7 @@ func (c *commandFsVerify) collectVolumeIds() error { return nil } -func (c *commandFsVerify) verifyEntry(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error { +func (c *commandFsVerify) verifyChunk(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error { err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeNeedleStatus(context.Background(), @@ -138,6 +143,94 @@ type ItemEntry struct { path util.FullPath } +func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.Uint64, wg *sync.WaitGroup) (fileCount uint64, err error) { + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if resp.EventNotification.NewEntry == nil || len(message.NewEntry.Chunks) == 0 { + return nil + } + entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name) + if c.verifyEntry(entryPath, message.NewEntry.Chunks, errorCount, wg) { + if *c.verbose { + fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, len(message.NewEntry.Chunks)) + } + fileCount++ + } + return nil + } + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "shell_verify", + ClientId: util.RandomInt32(), + ClientEpoch: 0, + SelfSignature: 0, + PathPrefix: path, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(), + StopTsNs: time.Now().UnixNano(), + EventErrorType: pb.TrivialOnError, + } + return fileCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn) +} + +func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool { + fileMsg := fmt.Sprintf("file:%s", path) + itemIsVerifed := atomic.NewBool(true) + for _, chunk := range chunks { + if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { + for _, volumeServer := range volumeIds { + if *c.concurrency == 0 { + if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + } + continue + } + c.waitChanLock.RLock() + waitChan, ok := c.waitChan[string(volumeServer)] + c.waitChanLock.RUnlock() + if !ok { + fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s\n", + string(volumeServer), fileMsg, chunk.GetFileIdString()) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + continue + } + wg.Add(1) + waitChan <- struct{}{} + go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { + defer wg.Done() + if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + msg, fChunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + } + <-waitChan + }(chunk, path, volumeServer, fileMsg) + } + } else { + err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + break + } + } + return itemIsVerifed.Load() +} + func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) { timeNowAtSec := time.Now().Unix() return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false, @@ -166,63 +259,9 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC for itemEntry := range outputChan { i := itemEntry.(*ItemEntry) itemPath := string(i.path) - fileMsg := fmt.Sprintf("file:%s", itemPath) - itemIsVerifed := atomic.NewBool(true) - for _, chunk := range i.chunks { - if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { - for _, volumeServer := range volumeIds { - if *c.concurrency == 0 { - if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - } - continue - } - c.waitChanLock.RLock() - waitChan, ok := c.waitChan[string(volumeServer)] - c.waitChanLock.RUnlock() - if !ok { - fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n", - string(volumeServer), fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - continue - } - wg.Add(1) - waitChan <- struct{}{} - go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { - defer wg.Done() - if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - msg, fChunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - } - <-waitChan - }(chunk, itemPath, volumeServer, fileMsg) - } - } else { - err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - break - } - } - if itemIsVerifed.Load() { + if c.verifyEntry(itemPath, i.chunks, itemErrCount, &wg) { if *c.verbose { - fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks)) + fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", itemPath, len(i.chunks)) } fileCount++ }