From 579ebbdf602c13c839f6e933163cf16f81d25bfb Mon Sep 17 00:00:00 2001 From: NyaMisty Date: Mon, 3 Jun 2024 05:25:42 +0800 Subject: [PATCH] Support concurrent volume.fsck & support disabling -cutoffTimeAgo to improve speed (#5636) --- weed/shell/command_volume_fsck.go | 51 ++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 8916e90bd..d85a9e13f 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/sync/errgroup" "io" "math" "net/http" @@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("read filer buckets path: %v", err) } - collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano() + var collectCutoffFromAtNs int64 = 0 + if cutoffTimeAgo.Seconds() != 0 { + collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano() + } var collectModifyFromAtNs int64 = 0 if modifyTimeAgo.Seconds() != 0 { collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() } // collect each volume file ids - for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { - for volumeId, vinfo := range volumeIdToVInfo { - if len(c.volumeIds) > 0 { - if _, ok := c.volumeIds[volumeId]; !ok { + eg, gCtx := errgroup.WithContext(context.Background()) + _ = gCtx + for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo + eg.Go(func() error { + for volumeId, vinfo := range volumeIdToVInfo { + if len(c.volumeIds) > 0 { + if _, ok := c.volumeIds[volumeId]; !ok { + delete(volumeIdToVInfo, volumeId) + continue + } + } + if *c.collection != "" && vinfo.collection != *c.collection { delete(volumeIdToVInfo, volumeId) continue } + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } } - if *c.collection != "" && vinfo.collection != *c.collection { - delete(volumeIdToVInfo, volumeId) - continue - } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) - if err != nil { - return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + if *c.verbose { + fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) } - } - if *c.verbose { - fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) - } + return nil + }) + } + err = eg.Wait() + if err != nil { + fmt.Fprintf(c.writer, "got error: %v", err) + return err } if *c.findMissingChunksInFiler { @@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId } buf.Write(resp.FileContent) } - if !vinfo.isReadOnly { + if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) { index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ @@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId if err != nil { return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) } - if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) { + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { return true, nil } return false, nil