Browse Source

refactoring

pull/8015/head
Chris Lu 24 hours ago
parent
commit
c2b216b141
  1. 16
      weed/command/filer_meta_backup.go
  2. 9
      weed/pb/filer_pb/filer_client_bfs.go
  3. 5
      weed/shell/command_fs_merge_volumes.go
  4. 5
      weed/shell/command_fs_meta_change_volume_id.go
  5. 5
      weed/shell/command_fs_meta_notify.go
  6. 2
      weed/shell/command_fs_meta_save.go

16
weed/command/filer_meta_backup.go

@ -157,26 +157,18 @@ func (metaBackup *FilerMetaBackupOptions) shouldInclude(fullpath string) bool {
}
func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
var saveErr error
traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
return filer_pb.TraverseBfs(context.Background(), metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
fullpath := string(parentPath.Child(entry.Name))
if !metaBackup.shouldInclude(fullpath) {
return
return nil
}
println("+", fullpath)
if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
saveErr = fmt.Errorf("insert entry error: %w\n", err)
return
return fmt.Errorf("insert entry error: %w\n", err)
}
return nil
})
if traverseErr != nil {
return fmt.Errorf("traverse: %w", traverseErr)
}
return saveErr
}
var (

9
weed/pb/filer_pb/filer_client_bfs.go

@ -12,14 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return TraverseBfsWithContext(context.Background(), filerClient, parentPath, func(parentPath util.FullPath, entry *Entry) error {
fn(parentPath, entry)
return nil
})
}
func TraverseBfsWithContext(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) {
func TraverseBfs(ctx context.Context, filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry) error) (err error) {
K := 5
ctx, cancel := context.WithCancel(ctx)

5
weed/shell/command_fs_merge_volumes.go

@ -109,9 +109,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
defer util_http.GetGlobalHttpClient().CloseIdleConnections()
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if entry.IsDirectory {
return
return nil
}
for _, chunk := range entry.Chunks {
chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
@ -141,6 +141,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
fmt.Printf("failed to update %s: %v\n", path, err)
}
}
return nil
})
})
}

5
weed/shell/command_fs_meta_change_volume_id.go

@ -75,13 +75,13 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv,
}
return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if !entry.IsDirectory {
var hasChanges bool
for _, chunk := range entry.Chunks {
if chunk.IsChunkManifest {
fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
return
return nil
}
chunkVolumeId := chunk.Fid.VolumeId
if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found {
@ -102,6 +102,7 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv,
}
}
}
return nil
})
})
}

5
weed/shell/command_fs_meta_notify.go

@ -1,6 +1,7 @@
package shell
import (
"context"
"fmt"
"io"
@ -51,7 +52,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
var dirCount, fileCount uint64
err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
err = filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if entry.IsDirectory {
dirCount++
@ -69,7 +70,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
if notifyErr != nil {
fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
}
return nil
})
if err == nil {

2
weed/shell/command_fs_meta_save.go

@ -192,7 +192,7 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := filer_pb.TraverseBfsWithContext(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if strings.HasPrefix(string(parentPath), filer.SystemLogDir) {
return nil

Loading…
Cancel
Save