|
|
@ -5,17 +5,6 @@ import ( |
|
|
|
"context" |
|
|
|
"flag" |
|
|
|
"fmt" |
|
|
|
"io" |
|
|
|
"io/ioutil" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"net/url" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
"path/filepath" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/operation" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb" |
|
|
@ -26,6 +15,17 @@ import ( |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle_map" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
"io" |
|
|
|
"io/ioutil" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"net/url" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
"path/filepath" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
func init() { |
|
|
@ -112,6 +112,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
|
return fmt.Errorf("read filer buckets path: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
collectMtime := time.Now().Unix() |
|
|
|
// collect each volume file ids
|
|
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
|
|
for volumeId, vinfo := range volumeIdToVInfo { |
|
|
@ -132,7 +133,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
|
|
|
|
|
if *findMissingChunksInFiler { |
|
|
|
// collect all filer file ids and paths
|
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent); err != nil { |
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent, collectMtime); err != nil { |
|
|
|
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) |
|
|
|
} |
|
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
|
@ -155,7 +156,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool) error { |
|
|
|
func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool, collectMtime int64) error { |
|
|
|
|
|
|
|
if verbose { |
|
|
|
fmt.Fprintf(writer, "checking each file from filer ...\n") |
|
|
@ -195,6 +196,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m |
|
|
|
} |
|
|
|
dataChunks = append(dataChunks, manifestChunks...) |
|
|
|
for _, chunk := range dataChunks { |
|
|
|
if chunk.Mtime > collectMtime { |
|
|
|
continue |
|
|
|
} |
|
|
|
outputChan <- &Item{ |
|
|
|
vid: chunk.Fid.VolumeId, |
|
|
|
fileKey: chunk.Fid.FileKey, |
|
|
@ -400,7 +404,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(dataNodeVolumeIdToVInfo map[stri |
|
|
|
func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, dataNodeId string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) { |
|
|
|
|
|
|
|
if verbose { |
|
|
|
fmt.Fprintf(writer, "find missing file chunks in volume %d ...\n", volumeId) |
|
|
|
fmt.Fprintf(writer, "find missing file chunks in dataNodeId %s volume %d ...\n", dataNodeId, volumeId) |
|
|
|
} |
|
|
|
|
|
|
|
db := needle_map.NewMemDb() |
|
|
|