From 7f1383a41ef8011224f15fd9252aa0677af62324 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 1 Apr 2022 14:45:41 +0500 Subject: [PATCH] findExtraChunksInVolumeServers in consideration of replication --- weed/shell/command_volume_fsck.go | 68 +++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 557c1e18e..1aa33e054 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -245,13 +245,32 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 - + volumeIdOrphanFileIds := make(map[uint32]map[string]bool) + isSeveralReplicas := make(map[uint32]bool) + isEcVolumeReplicas := make(map[uint32]bool) + isReadOnlyReplicas := make(map[uint32]bool) + serverReplicas := make(map[uint32][]pb.ServerAddress) for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo { inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } + isSeveralReplicas[volumeId] = false + if _, found := volumeIdOrphanFileIds[volumeId]; !found { + volumeIdOrphanFileIds[volumeId] = make(map[string]bool) + } else { + isSeveralReplicas[volumeId] = true + } + for _, fid := range orphanFileIds { + if isSeveralReplicas[volumeId] { + if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found { + continue + } + } + volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId] + } + totalInUseCount += inUseCount totalOrphanChunkCount += uint64(len(orphanFileIds)) totalOrphanDataSize += orphanDataSize @@ -261,31 +280,48 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn fmt.Fprintf(writer, "%s\n", fid) } } + isEcVolumeReplicas[volumeId] = vinfo.isEcVolume + if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) { + isReadOnlyReplicas[volumeId] = vinfo.isReadOnly + } + serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server) + } - if applyPurging && len(orphanFileIds) > 0 { - if verbose { - fmt.Fprintf(writer, "purging process for volume %d", volumeId) - } - - if vinfo.isEcVolume { - fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId) - continue + for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds { + if !(applyPurging && len(orphanReplicaFileIds) > 0) { + continue + } + orphanFileIds := []string{} + for fid, foundInAllReplicas := range orphanReplicaFileIds { + if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) { + orphanFileIds = append(orphanFileIds, fid) } + } + if !(len(orphanFileIds) > 0) { + continue + } + if verbose { + fmt.Fprintf(writer, "purging process for volume %d", volumeId) + } + if isEcVolumeReplicas[volumeId] { + fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId) + continue + } + for _, server := range serverReplicas[volumeId] { needleVID := needle.VolumeId(volumeId) - if vinfo.isReadOnly { - err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true) + if isReadOnlyReplicas[volumeId] { + err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true) if err != nil { return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) } - fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) - defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false) - } - - fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) + fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server) + defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false) + fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, server) + } if verbose { fmt.Fprintf(writer, "purging files from volume %d\n", volumeId) }