Browse Source

Add volume.scrub and ec.scrub shell commands to scrub regular & EC volumes on demand. (#8188)

* Implement RPC skeleton for regular/EC volumes scrubbing.

See https://github.com/seaweedfs/seaweedfs/issues/8018 for details.

* Add `volume.scrub` and `ec.scrub` shell commands to scrub regular & EC volumes on demand.

F.ex:

```
> ec.scrub --full
Scrubbing 10.200.17.13:9005 (1/10)...
Scrubbing 10.200.17.13:9001 (2/10)...
Scrubbing 10.200.17.13:9008 (3/10)...
Scrubbing 10.200.17.13:9009 (4/10)...
Scrubbing 10.200.17.13:9004 (5/10)...
Scrubbing 10.200.17.13:9010 (6/10)...
Scrubbing 10.200.17.13:9007 (7/10)...
Scrubbing 10.200.17.13:9002 (8/10)...
Scrubbing 10.200.17.13:9003 (9/10)...
Scrubbing 10.200.17.13:9006 (10/10)...
Scrubbed 20 EC files and 20 volumes on 10 nodes

Got scrub failures on 1 EC volumes and 2 EC shards :(
Affected volumes: 10.200.17.13:9005:1
Details:
	[10.200.17.13:9005] expected 551041 bytes for needle 6, got 551072
	[10.200.17.13:9005] needles in volume file (1) don't match index entries (173) for volume 1
```
pull/8216/head
Lisandro Pin 2 days ago
committed by GitHub
parent
commit
2ecbae3611
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 19
      weed/shell/command_ec_common.go
  2. 156
      weed/shell/command_ec_scrub.go
  3. 150
      weed/shell/command_volume_scrub.go

19
weed/shell/command_ec_common.go

@ -164,7 +164,6 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super
} }
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
if delayBeforeCollecting > 0 { if delayBeforeCollecting > 0 {
time.Sleep(delayBeforeCollecting) time.Sleep(delayBeforeCollecting)
} }
@ -179,7 +178,25 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
} }
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
}
func collectDataNodes(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) ([]*master_pb.DataNodeInfo, error) {
dataNodes := []*master_pb.DataNodeInfo{}
topo, _, err := collectTopologyInfo(commandEnv, delayBeforeCollecting)
if err != nil {
return nil, err
}
for _, dci := range topo.GetDataCenterInfos() {
for _, r := range dci.GetRackInfos() {
for _, dn := range r.GetDataNodeInfos() {
dataNodes = append(dataNodes, dn)
}
}
}
return dataNodes, nil
} }
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {

156
weed/shell/command_ec_scrub.go

@ -0,0 +1,156 @@
package shell
import (
"context"
"flag"
"fmt"
"io"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
func init() {
Commands = append(Commands, &commandEcVolumeScrub{})
}
type commandEcVolumeScrub struct {
env *CommandEnv
volumeServerAddrs []pb.ServerAddress
volumeIDs []uint32
mode volume_server_pb.VolumeScrubMode
grpcDialOption grpc.DialOption
}
func (c *commandEcVolumeScrub) Name() string {
return "ec.scrub"
}
func (c *commandEcVolumeScrub) Help() string {
return `scrubs EC volume contents on volume servers.
Supports either scrubbing only needle data, or deep scrubbing file contents as well.
Scrubbing can be limited to specific EC volume IDs for specific volume servers.
By default, all volume IDs across all servers are processed.
`
}
func (c *commandEcVolumeScrub) HasTag(CommandTag) bool {
return false
}
func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)")
// TODO: add per-node parallelization
if err = volScrubCommand.Parse(args); err != nil {
return err
}
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
c.volumeServerAddrs = []pb.ServerAddress{}
if *nodesStr != "" {
for _, addr := range strings.Split(*nodesStr, ",") {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(addr))
}
} else {
dns, err := collectDataNodes(commandEnv, 0)
if err != nil {
return err
}
for _, dn := range dns {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(dn.Address))
}
}
c.volumeIDs = []uint32{}
if *volumeIDsStr != "" {
for _, vids := range strings.Split(*volumeIDsStr, ",") {
vids = strings.TrimSpace(vids)
if vids == "" {
continue
}
if vid, err := strconv.ParseUint(vids, 10, 32); err == nil {
c.volumeIDs = append(c.volumeIDs, uint32(vid))
} else {
return fmt.Errorf("invalid volume ID %q", vids)
}
}
}
switch strings.ToUpper(*mode) {
case "INDEX":
c.mode = volume_server_pb.VolumeScrubMode_INDEX
case "FULL":
c.mode = volume_server_pb.VolumeScrubMode_FULL
default:
return fmt.Errorf("unsupported scrubbing mode %q", *mode)
}
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
c.env = commandEnv
return c.scrubEcVolumes(writer)
}
func (c *commandEcVolumeScrub) scrubEcVolumes(writer io.Writer) error {
var brokenVolumesStr, brokenShardsStr []string
var details []string
var totalVolumes, brokenVolumes, brokenShards, totalFiles uint64
for i, addr := range c.volumeServerAddrs {
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs))
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubEcVolume(context.Background(), &volume_server_pb.ScrubEcVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
})
if err != nil {
return err
}
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
brokenShards += uint64(len(res.GetBrokenShardInfos()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
for _, si := range res.GetBrokenShardInfos() {
brokenShardsStr = append(brokenShardsStr, fmt.Sprintf("%s:%v:%v", addr, si.VolumeId, si.ShardId))
}
return nil
})
if err != nil {
return err
}
}
fmt.Fprintf(writer, "Scrubbed %d EC files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))
if brokenVolumes != 0 {
fmt.Fprintf(writer, "\nGot scrub failures on %d EC volumes and %d EC shards :(\n", brokenVolumes, brokenShards)
fmt.Fprintf(writer, "Affected volumes: %s\n", strings.Join(brokenVolumesStr, ", "))
if len(brokenShardsStr) != 0 {
fmt.Fprintf(writer, "Affected shards: %s\n", strings.Join(brokenShardsStr, ", "))
}
if len(details) != 0 {
fmt.Fprintf(writer, "Details:\n\t%s\n", strings.Join(details, "\n\t"))
}
}
return nil
}

150
weed/shell/command_volume_scrub.go

@ -0,0 +1,150 @@
package shell
import (
"context"
"flag"
"fmt"
"io"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
func init() {
Commands = append(Commands, &commandVolumeScrub{})
}
type commandVolumeScrub struct {
env *CommandEnv
volumeServerAddrs []pb.ServerAddress
volumeIDs []uint32
mode volume_server_pb.VolumeScrubMode
grpcDialOption grpc.DialOption
}
func (c *commandVolumeScrub) Name() string {
return "volume.scrub"
}
func (c *commandVolumeScrub) Help() string {
return `scrubs volume contents on volume servers.
Supports either scrubbing only needle data, or deep scrubbing file contents as well.
Scrubbing can be limited to specific volume IDs for specific volume servers.
By default, all volume IDs across all servers are processed.
`
}
func (c *commandVolumeScrub) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)")
// TODO: add per-node parallelization
if err = volScrubCommand.Parse(args); err != nil {
return err
}
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
c.volumeServerAddrs = []pb.ServerAddress{}
if *nodesStr != "" {
for _, addr := range strings.Split(*nodesStr, ",") {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(addr))
}
} else {
dns, err := collectDataNodes(commandEnv, 0)
if err != nil {
return err
}
for _, dn := range dns {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(dn.Address))
}
}
c.volumeIDs = []uint32{}
if *volumeIDsStr != "" {
for _, vids := range strings.Split(*volumeIDsStr, ",") {
vids = strings.TrimSpace(vids)
if vids == "" {
continue
}
if vid, err := strconv.ParseUint(vids, 10, 32); err == nil {
c.volumeIDs = append(c.volumeIDs, uint32(vid))
} else {
return fmt.Errorf("invalid volume ID %q", vids)
}
}
}
switch strings.ToUpper(*mode) {
case "INDEX":
c.mode = volume_server_pb.VolumeScrubMode_INDEX
case "FULL":
c.mode = volume_server_pb.VolumeScrubMode_FULL
default:
return fmt.Errorf("unsupported scrubbing mode %q", *mode)
}
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
c.env = commandEnv
return c.scrubVolumes(writer)
}
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer) error {
var brokenVolumesStr []string
var details []string
var totalVolumes, brokenVolumes, totalFiles uint64
for i, addr := range c.volumeServerAddrs {
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs))
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
})
if err != nil {
return err
}
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
return nil
})
if err != nil {
return err
}
}
fmt.Fprintf(writer, "Scrubbed %d files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))
if brokenVolumes != 0 {
fmt.Fprintf(writer, "\nGot scrub failures on %d volumes :(\n", brokenVolumes)
fmt.Fprintf(writer, "Affected volumes: %s\n", strings.Join(brokenVolumesStr, ", "))
if len(details) != 0 {
fmt.Fprintf(writer, "Details:\n\t%s\n", strings.Join(details, "\n\t"))
}
}
return nil
}
Loading…
Cancel
Save