Browse Source

Support concurrent volume.fsck & support disabling -cutoffTimeAgo to improve speed (#5636)

pull/5638/head
NyaMisty 7 months ago
committed by GitHub
parent
commit
579ebbdf60
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 51
      weed/shell/command_volume_fsck.go

51
weed/shell/command_volume_fsck.go

@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/sync/errgroup"
"io" "io"
"math" "math"
"net/http" "net/http"
@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("read filer buckets path: %v", err) return fmt.Errorf("read filer buckets path: %v", err)
} }
collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano()
var collectCutoffFromAtNs int64 = 0
if cutoffTimeAgo.Seconds() != 0 {
collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
}
var collectModifyFromAtNs int64 = 0 var collectModifyFromAtNs int64 = 0
if modifyTimeAgo.Seconds() != 0 { if modifyTimeAgo.Seconds() != 0 {
collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
} }
// collect each volume file ids // collect each volume file ids
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
if len(c.volumeIds) > 0 {
if _, ok := c.volumeIds[volumeId]; !ok {
eg, gCtx := errgroup.WithContext(context.Background())
_ = gCtx
for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
eg.Go(func() error {
for volumeId, vinfo := range volumeIdToVInfo {
if len(c.volumeIds) > 0 {
if _, ok := c.volumeIds[volumeId]; !ok {
delete(volumeIdToVInfo, volumeId)
continue
}
}
if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId) delete(volumeIdToVInfo, volumeId)
continue continue
} }
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
} }
if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId)
continue
}
err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
if *c.verbose {
fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
} }
}
if *c.verbose {
fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
}
return nil
})
}
err = eg.Wait()
if err != nil {
fmt.Fprintf(c.writer, "got error: %v", err)
return err
} }
if *c.findMissingChunksInFiler { if *c.findMissingChunksInFiler {
@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
} }
buf.Write(resp.FileContent) buf.Write(resp.FileContent)
} }
if !vinfo.isReadOnly {
if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
index, err := idx.FirstInvalidIndex(buf.Bytes(), index, err := idx.FirstInvalidIndex(buf.Bytes(),
func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
if err != nil { if err != nil {
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
} }
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) {
if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
return true, nil return true, nil
} }
return false, nil return false, nil

Loading…
Cancel
Save