From bf8a9d2db138053951af646b87f613b7d96142ee Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 31 Oct 2022 08:32:46 +0500 Subject: [PATCH] [volume.chek.disk] sync of deletions the fix (#3923) * sync of deletions the fix * avoid return if only partiallyDeletedNeedles * refactor sync deletions --- weed/shell/command_volume_check_disk.go | 58 +++++++++++++++++++------ weed/shell/command_volume_fsck.go | 2 +- weed/storage/needle_map/memdb.go | 6 ++- weed/storage/types/needle_id_type.go | 4 ++ 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 41c28b810..6a1634f8a 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" "io" "math" + "net/http" "time" ) @@ -21,7 +22,8 @@ func init() { } type commandVolumeCheckDisk struct { - env *CommandEnv + env *CommandEnv + syncDeletions *bool } func (c *commandVolumeCheckDisk) Name() string { @@ -49,6 +51,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write verbose := fsckCommand.Bool("v", false, "verbose mode") volumeId := fsckCommand.Uint("volumeId", 0, "the volume id") applyChanges := fsckCommand.Bool("force", false, "apply the fix") + c.syncDeletions = fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -145,13 +148,17 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m // find missing keys // hash join, can be more efficient var missingNeedles []needle_map.NeedleValue + var partiallyDeletedNeedles []needle_map.NeedleValue var counter int doCutoffOfLastNeedle := true - minuend.DescendingVisit(func(value needle_map.NeedleValue) error { + minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error { counter++ - if _, found := subtrahend.Get(value.Key); !found { + if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found { + if minuendValue.Size.IsDeleted() { + return nil + } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, value); err == nil { + if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil @@ -159,16 +166,22 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m doCutoffOfLastNeedle = false } } - missingNeedles = append(missingNeedles, value) - } else if doCutoffOfLastNeedle { - doCutoffOfLastNeedle = false + missingNeedles = append(missingNeedles, minuendValue) + } else { + if minuendValue.Size.IsDeleted() && !subtrahendValue.Size.IsDeleted() { + partiallyDeletedNeedles = append(partiallyDeletedNeedles, minuendValue) + } + if doCutoffOfLastNeedle { + doCutoffOfLastNeedle = false + } } return nil }) - fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles)) + fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) - if counter == 0 || len(missingNeedles) == 0 { + if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { return false, nil } @@ -190,7 +203,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m } if verbose { - fmt.Fprintf(writer, "read %d,%x %s => %s \n", source.info.Id, needleValue.Key, source.location.dataNode.Id, target.location.dataNode.Id) + fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } hasChanges = true @@ -201,6 +214,27 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m } + if *c.syncDeletions && len(partiallyDeletedNeedles) > 0 { + var fidList []string + for _, needleValue := range partiallyDeletedNeedles { + fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) + if verbose { + fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + } + } + deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer( + pb.NewServerAddressFromDataNode(target.location.dataNode), + c.env.option.GrpcDialOption, fidList, false) + if deleteErr != nil { + return hasChanges, deleteErr + } + for _, deleteResult := range deleteResults { + if deleteResult.Status == http.StatusAccepted { + hasChanges = true + return + } + } + } return } @@ -245,9 +279,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect if verbose { fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) } - - return db.LoadFromReaderAt(bytes.NewReader(buf.Bytes())) - + return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 854929998..6e3a813cf 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -574,7 +574,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri if !n.Size.IsValid() { return nil } - orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String())) + orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) orphanFileCount++ orphanDataSize += uint64(n.Size) return nil diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index f2b161792..d3d47b605 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -145,8 +145,12 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) { + return cm.LoadFilterFromReaderAt(readerAt, true, true) +} + +func (cm *MemDb) LoadFilterFromReaderAt(readerAt io.ReaderAt, isFilterOffsetZero bool, isFilterDeleted bool) (ret error) { return idx.WalkIndexFile(readerAt, 0, func(key NeedleId, offset Offset, size Size) error { - if offset.IsZero() || size.IsDeleted() { + if (isFilterOffsetZero && offset.IsZero()) || (isFilterDeleted && size.IsDeleted()) { return cm.Delete(key) } return cm.Set(key, offset, size) diff --git a/weed/storage/types/needle_id_type.go b/weed/storage/types/needle_id_type.go index e65c862f2..1ea9b0a1a 100644 --- a/weed/storage/types/needle_id_type.go +++ b/weed/storage/types/needle_id_type.go @@ -34,6 +34,10 @@ func (k NeedleId) String() string { return strconv.FormatUint(uint64(k), 16) } +func (k NeedleId) FileId(volumeId uint32) string { + return fmt.Sprintf("%d,%s00000000", volumeId, k.String()) +} + func ParseNeedleId(idString string) (NeedleId, error) { key, err := strconv.ParseUint(idString, 16, 64) if err != nil {