|
|
@ -393,8 +393,9 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI |
|
|
|
} |
|
|
|
buf.Write(resp.FileContent) |
|
|
|
} |
|
|
|
fileredBuf := filterDeletedNeedleFromIdx(buf.Bytes()) |
|
|
|
if vinfo.isReadOnly == false { |
|
|
|
index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { |
|
|
|
index, err := idx.FirstInvalidIndex(fileredBuf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { |
|
|
|
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ |
|
|
|
VolumeId: volumeId, |
|
|
|
NeedleId: uint64(key), |
|
|
@ -409,11 +410,11 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI |
|
|
|
if err != nil { |
|
|
|
fmt.Fprintf(writer, "Failed to search for last valid index on volume %d with error %v", volumeId, err) |
|
|
|
} else { |
|
|
|
buf.Truncate(index * types.NeedleMapEntrySize) |
|
|
|
fileredBuf.Truncate(index * types.NeedleMapEntrySize) |
|
|
|
} |
|
|
|
} |
|
|
|
idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId) |
|
|
|
err = writeToFile(buf.Bytes(), idxFilename) |
|
|
|
err = writeToFile(fileredBuf.Bytes(), idxFilename) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err) |
|
|
|
} |
|
|
@ -719,3 +720,14 @@ func writeToFile(bytes []byte, fileName string) error { |
|
|
|
dst.Write(bytes) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func filterDeletedNeedleFromIdx(arr []byte) bytes.Buffer { |
|
|
|
var filteredBuf bytes.Buffer |
|
|
|
for i := 0; i < len(arr); i += types.NeedleMapEntrySize { |
|
|
|
size := types.BytesToSize(arr[i+types.NeedleIdSize+types.OffsetSize : i+types.NeedleIdSize+types.OffsetSize+types.SizeSize]) |
|
|
|
if size > 0 { |
|
|
|
filteredBuf.Write(arr[i : i+types.NeedleIdSize+types.OffsetSize+types.SizeSize]) |
|
|
|
} |
|
|
|
} |
|
|
|
return filteredBuf |
|
|
|
} |