Browse Source

[volume.fsck] param volumeId is comma separated the volume id (#3933)

volume.fsck param volumeId is comma separated the volume id

Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
pull/3937/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
a322ba042e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      weed/shell/command_volume_fsck.go

22
weed/shell/command_volume_fsck.go

@ -27,6 +27,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -45,7 +46,7 @@ type commandVolumeFsck struct {
writer io.Writer writer io.Writer
bucketsPath string bucketsPath string
collection *string collection *string
volumeId *uint
volumeIds map[uint32]bool
tempFolder string tempFolder string
verbose *bool verbose *bool
forcePurging *bool forcePurging *bool
@ -83,7 +84,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c.verbose = fsckCommand.Bool("v", false, "verbose mode") c.verbose = fsckCommand.Bool("v", false, "verbose mode")
c.findMissingChunksInFiler = fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") c.findMissingChunksInFiler = fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
c.collection = fsckCommand.String("collection", "", "the collection name") c.collection = fsckCommand.String("collection", "", "the collection name")
c.volumeId = fsckCommand.Uint("volumeId", 0, "the volume id")
volumeIds := fsckCommand.String("volumeId", "", "comma separated the volume id")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup.") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup.")
c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging") c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
@ -98,7 +99,16 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
if err = commandEnv.confirmIsLocked(args); err != nil { if err = commandEnv.confirmIsLocked(args); err != nil {
return return
} }
c.volumeIds = make(map[uint32]bool)
if *volumeIds != "" {
for _, volumeIdStr := range strings.Split(*volumeIds, ",") {
if volumeIdInt, err := strconv.ParseUint(volumeIdStr, 10, 32); err == nil {
c.volumeIds[uint32(volumeIdInt)] = true
} else {
return fmt.Errorf("parse volumeId string %s to int: %v", volumeIdStr, err)
}
}
}
c.env = commandEnv c.env = commandEnv
c.writer = writer c.writer = writer
@ -131,10 +141,12 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
// collect each volume file ids // collect each volume file ids
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo {
if *c.volumeId > 0 && uint32(*c.volumeId) != volumeId {
if len(c.volumeIds) > 0 {
if _, ok := c.volumeIds[volumeId]; !ok {
delete(volumeIdToVInfo, volumeId) delete(volumeIdToVInfo, volumeId)
continue continue
} }
}
if *c.collection != "" && vinfo.collection != *c.collection { if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId) delete(volumeIdToVInfo, volumeId)
continue continue
@ -232,7 +244,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
util.Uint32toBytes(buffer[12:], uint32(len(i.path))) util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
f.Write(buffer) f.Write(buffer)
f.Write([]byte(i.path)) f.Write([]byte(i.path))
} else if *c.findMissingChunksInFiler && *c.volumeId == 0 {
} else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 {
fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
if purgeAbsent { if purgeAbsent {
fmt.Printf("deleting path %s after volume not found", i.path) fmt.Printf("deleting path %s after volume not found", i.path)

Loading…
Cancel
Save