diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 6e3a813cf..72babdb6d 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" @@ -26,6 +27,7 @@ import ( "os" "path" "path/filepath" + "strings" "sync" "time" ) @@ -35,8 +37,7 @@ func init() { } const ( - readbufferSize = 16 - verifyProbeBlobSize = 16 + readbufferSize = 16 ) type commandVolumeFsck struct { @@ -88,7 +89,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, " delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") - c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server") + c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server") if err = fsckCommand.Parse(args); err != nil { return nil @@ -263,7 +264,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn serverReplicas := make(map[uint32][]pb.ServerAddress) for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, vinfo.collection) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -533,36 +534,35 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) { } } -func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, collection string) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { - db := needle_map.NewMemDb() - defer db.Close() + volumeFileIdDb := needle_map.NewMemDb() + defer volumeFileIdDb.Close() - if err = db.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil { + if err = volumeFileIdDb.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil { err = fmt.Errorf("failed to LoadFromIdx %+v", err) return } - volumeAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0) - if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) { + if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) { inUseCount++ if *c.verifyNeedle { - if v, ok := db.Get(nId); ok && v.Size.IsValid() { - newSize := types.Size(verifyProbeBlobSize) - if v.Size > newSize { - v.Size = newSize - } - if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, volumeAddr, volumeId, *v); err != nil { - fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err) - if *c.forcePurging { - return + if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() { + if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil { + // files may be deleted during copying filesIds + if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) { + fmt.Fprintf(c.writer, "failed to read %d:%s needle status of file %s: %+v\n", + volumeId, filerNeedleId.String(), itemPath, err) + if *c.forcePurging { + return + } } } } } - if err = db.Delete(nId); err != nil && *c.verbose { - fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err) + if err = volumeFileIdDb.Delete(filerNeedleId); err != nil && *c.verbose { + fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, filerNeedleId, err) } }); err != nil { err = fmt.Errorf("failed to readFilerFileIdFile %+v", err) @@ -570,8 +570,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri } var orphanFileCount uint64 - if err = db.AscendingVisit(func(n needle_map.NeedleValue) error { - if !n.Size.IsValid() { + if err = volumeFileIdDb.AscendingVisit(func(n needle_map.NeedleValue) error { + if n.Size.IsDeleted() { return nil } orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 66fdcb6bd..af6888458 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -169,3 +169,18 @@ func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddres ) return } + +func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ + VolumeId: volumeId, + NeedleId: uint64(needleValue.Key), + }); err != nil { + return err + } + return nil + }, + ) + return +}