diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index dd58175cf..1a70d6d7c 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -7,19 +7,6 @@ import ( "errors" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/storage" - "github.com/seaweedfs/seaweedfs/weed/storage/idx" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" - "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" - "golang.org/x/sync/errgroup" "io" "math" "net/http" @@ -31,7 +18,20 @@ import ( "strings" "sync" "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "golang.org/x/sync/errgroup" ) func init() { @@ -164,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. delete(volumeIdToVInfo, volumeId) continue } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) } @@ -199,7 +199,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("failed to collect file ids from filer: %v", err) } // volume file ids subtract filer file ids - if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging); err != nil { + if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil { return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) } } @@ -289,7 +289,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf return nil } -func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool) error { +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 volumeIdOrphanFileIds := make(map[uint32]map[string]bool) @@ -299,7 +299,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn serverReplicas := make(map[uint32][]pb.ServerAddress) for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -395,7 +395,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error { if *c.verbose { fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -432,29 +432,6 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId } buf.Write(resp.FileContent) } - if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) { - index, err := idx.FirstInvalidIndex(buf.Bytes(), - func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { - resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ - VolumeId: volumeId, - NeedleId: uint64(key), - Offset: offset.ToActualOffset(), - Size: int32(size), - }) - if err != nil { - return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) - } - if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { - return true, nil - } - return false, nil - }) - if err != nil { - fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err) - } else { - buf.Truncate(index * types.NeedleMapEntrySize) - } - } idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId) err = writeToFile(buf.Bytes(), idxFilename) if err != nil { @@ -570,7 +547,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) { } } -func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { volumeFileIdDb := needle_map.NewMemDb() defer volumeFileIdDb.Close() @@ -610,9 +587,30 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri if n.Size.IsDeleted() { return nil } - orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) - orphanFileCount++ - orphanDataSize += uint64(n.Size) + if cutoffFrom > 0 || modifyFrom > 0 { + return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, + func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ + VolumeId: volumeId, + NeedleId: types.NeedleIdToUint64(n.Key), + Offset: n.Offset.ToActualOffset(), + Size: int32(n.Size), + }) + if err != nil { + return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err) + } + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { + orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) + orphanFileCount++ + orphanDataSize += uint64(n.Size) + } + return nil + }) + } else { + orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) + orphanFileCount++ + orphanDataSize += uint64(n.Size) + } return nil }); err != nil { err = fmt.Errorf("failed to AscendingVisit %+v", err)