Browse Source

[fs.verify] skip failed files if entry not found on filerStore (#5693)

pull/5696/head
Konstantin Lebedev 7 months ago
committed by GitHub
parent
commit
7988ee0805
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      weed/pb/filer_pb_tail.go
  2. 59
      weed/shell/command_fs_verify.go

3
weed/pb/filer_pb_tail.go

@ -17,6 +17,7 @@ const (
TrivialOnError EventErrorType = iota TrivialOnError EventErrorType = iota
FatalOnError FatalOnError
RetryForeverOnError RetryForeverOnError
DontLogError
) )
// MetadataFollowOption is used to control the behavior of the metadata following // MetadataFollowOption is used to control the behavior of the metadata following
@ -96,6 +97,8 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
glog.Errorf("process %v: %v", resp, err) glog.Errorf("process %v: %v", resp, err)
return true return true
}) })
case DontLogError:
// pass
default: default:
glog.Errorf("process %v: %v", resp, err) glog.Errorf("process %v: %v", resp, err)
} }

59
weed/shell/command_fs_verify.go

@ -1,6 +1,7 @@
package shell package shell
import ( import (
"bytes"
"context" "context"
"flag" "flag"
"fmt" "fmt"
@ -89,17 +90,18 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
defer close(c.waitChan[volumeServerStr]) defer close(c.waitChan[volumeServerStr])
} }
} }
var fCount, eConut uint64
var fCount, eCount uint64
if *c.metadataFromLog { if *c.metadataFromLog {
itemErrCount := atomic.NewUint64(0)
var wg sync.WaitGroup var wg sync.WaitGroup
fCount, err = c.verifyProcessMetadata(path, itemErrCount, &wg)
fCount, eCount, err = c.verifyProcessMetadata(path, &wg)
wg.Wait() wg.Wait()
eConut = itemErrCount.Load()
if err != nil {
return err
}
} else { } else {
fCount, eConut, err = c.verifyTraverseBfs(path)
fCount, eCount, err = c.verifyTraverseBfs(path)
} }
fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eCount)
return err return err
} }
@ -143,19 +145,44 @@ type ItemEntry struct {
path util.FullPath path util.FullPath
} }
func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.Uint64, wg *sync.WaitGroup) (fileCount uint64, err error) {
func (c *commandFsVerify) verifyProcessMetadata(path string, wg *sync.WaitGroup) (fileCount uint64, errCount uint64, err error) {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification message := resp.EventNotification
if resp.EventNotification.NewEntry == nil || len(message.NewEntry.Chunks) == 0 {
if resp.EventNotification.NewEntry == nil {
return nil
}
chunkCount := len(message.NewEntry.Chunks)
if chunkCount == 0 {
return nil return nil
} }
entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name) entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name)
if c.verifyEntry(entryPath, message.NewEntry.Chunks, errorCount, wg) {
errorChunksCount := atomic.NewUint64(0)
if !c.verifyEntry(entryPath, message.NewEntry.Chunks, errorChunksCount, wg) {
if err = c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
entryResp, errReq := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: message.NewParentPath,
Name: message.NewEntry.Name,
})
if strings.HasSuffix(errReq.Error(), "no entry is found in filer store") {
return nil
} else if errReq != nil {
return errReq
}
if entryResp.Entry.Attributes.Mtime == message.NewEntry.Attributes.Mtime &&
bytes.Equal(entryResp.Entry.Attributes.Md5, message.NewEntry.Attributes.Md5) {
fmt.Fprintf(c.writer, "file: %s needles:%d failed:%d\n", entryPath, chunkCount, errorChunksCount.Load())
errCount++
}
return nil
}); err != nil {
return err
}
return nil
}
if *c.verbose { if *c.verbose {
fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, len(message.NewEntry.Chunks))
fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, chunkCount)
} }
fileCount++ fileCount++
}
return nil return nil
} }
metadataFollowOption := &pb.MetadataFollowOption{ metadataFollowOption := &pb.MetadataFollowOption{
@ -168,9 +195,9 @@ func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.
DirectoriesToWatch: nil, DirectoriesToWatch: nil,
StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(), StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(),
StopTsNs: time.Now().UnixNano(), StopTsNs: time.Now().UnixNano(),
EventErrorType: pb.TrivialOnError,
EventErrorType: pb.DontLogError,
} }
return fileCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
return fileCount, errCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
} }
func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool { func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool {
@ -181,8 +208,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
for _, volumeServer := range volumeIds { for _, volumeServer := range volumeIds {
if *c.concurrency == 0 { if *c.concurrency == 0 {
if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil { if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil {
if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err) fileMsg, chunk.GetFileIdString(), err)
}
if itemIsVerifed.Load() { if itemIsVerifed.Load() {
itemIsVerifed.Store(false) itemIsVerifed.Store(false)
errorCount.Add(1) errorCount.Add(1)
@ -207,8 +236,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
defer wg.Done() defer wg.Done()
if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil { if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil {
if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
msg, fChunk.GetFileIdString(), err) msg, fChunk.GetFileIdString(), err)
}
if itemIsVerifed.Load() { if itemIsVerifed.Load() {
itemIsVerifed.Store(false) itemIsVerifed.Store(false)
errorCount.Add(1) errorCount.Add(1)
@ -218,9 +249,11 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
}(chunk, path, volumeServer, fileMsg) }(chunk, path, volumeServer, fileMsg)
} }
} else { } else {
if !*c.metadataFromLog {
err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err) fileMsg, chunk.GetFileIdString(), err)
}
if itemIsVerifed.Load() { if itemIsVerifed.Load() {
itemIsVerifed.Store(false) itemIsVerifed.Store(false)
errorCount.Add(1) errorCount.Add(1)

Loading…
Cancel
Save