|
@ -137,7 +137,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
return fmt.Errorf("read filer buckets path: %v", err) |
|
|
return fmt.Errorf("read filer buckets path: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
collectMtime := time.Now().UnixNano() |
|
|
|
|
|
|
|
|
collectCutoffFromAtNs := time.Now().UnixNano() |
|
|
// collect each volume file ids
|
|
|
// collect each volume file ids
|
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
|
for volumeId, vinfo := range volumeIdToVInfo { |
|
|
for volumeId, vinfo := range volumeIdToVInfo { |
|
@ -164,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
|
|
|
|
|
|
if *c.findMissingChunksInFiler { |
|
|
if *c.findMissingChunksInFiler { |
|
|
// collect all filer file ids and paths
|
|
|
// collect all filer file ids and paths
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectMtime); err != nil { |
|
|
|
|
|
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectCutoffFromAtNs); err != nil { |
|
|
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) |
|
|
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) |
|
|
} |
|
|
} |
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
|
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { |
|
@ -175,7 +175,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
// collect all filer file ids
|
|
|
// collect all filer file ids
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, collectMtime); err != nil { |
|
|
|
|
|
|
|
|
if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0); err != nil { |
|
|
return fmt.Errorf("failed to collect file ids from filer: %v", err) |
|
|
return fmt.Errorf("failed to collect file ids from filer: %v", err) |
|
|
} |
|
|
} |
|
|
// volume file ids subtract filer file ids
|
|
|
// volume file ids subtract filer file ids
|
|
@ -187,7 +187,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, collectMtime int64) error { |
|
|
|
|
|
|
|
|
func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, cutoffFromAtNs int64) error { |
|
|
if *c.verbose { |
|
|
if *c.verbose { |
|
|
fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath()) |
|
|
fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath()) |
|
|
} |
|
|
} |
|
@ -222,7 +222,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m |
|
|
} |
|
|
} |
|
|
dataChunks = append(dataChunks, manifestChunks...) |
|
|
dataChunks = append(dataChunks, manifestChunks...) |
|
|
for _, chunk := range dataChunks { |
|
|
for _, chunk := range dataChunks { |
|
|
if chunk.ModifiedTsNs > collectMtime { |
|
|
|
|
|
|
|
|
if cutoffFromAtNs != 0 && chunk.ModifiedTsNs > cutoffFromAtNs { |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
outputChan <- &Item{ |
|
|
outputChan <- &Item{ |
|
|