Browse Source

Implement index (fast) scrubbing for regular/EC volumes. (#8207)

Implement index (fast) scrubbing for regular/EC volumes via `ScrubVolume()`/`ScrubEcVolume()`.

Also rearranges existing index test files for reuse across unit tests for different modules.
pull/8226/head
Lisandro Pin 3 weeks ago
committed by GitHub
parent
commit
f84b70c362
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 25
      weed/server/volume_grpc_scrub.go
  2. 11
      weed/storage/erasure_coding/ec_volume.go
  3. 2
      weed/storage/erasure_coding/ec_volume_test.go
  4. 103
      weed/storage/idx/check.go
  5. 108
      weed/storage/idx/check_test.go
  6. 0
      weed/storage/idx/test_files/389.ecx
  7. BIN
      weed/storage/idx/test_files/deleted_files.ecx
  8. BIN
      weed/storage/idx/test_files/deleted_files.idx
  9. BIN
      weed/storage/idx/test_files/deleted_files_bitrot.ecx
  10. BIN
      weed/storage/idx/test_files/simple_index.idx
  11. BIN
      weed/storage/idx/test_files/simple_index_bitrot.idx
  12. BIN
      weed/storage/idx/test_files/simple_index_truncated.idx
  13. 22
      weed/storage/volume_checking.go

25
weed/server/volume_grpc_scrub.go

@ -31,11 +31,11 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
return nil, fmt.Errorf("volume id %d not found", vid)
}
var files uint64
var files int64
var serrs []error
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, serrs = scrubVolumeIndex(ctx, v)
files, serrs = v.CheckIndex()
case volume_server_pb.VolumeScrubMode_FULL:
files, serrs = scrubVolumeFull(ctx, v)
default:
@ -43,7 +43,7 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
}
totalVolumes += 1
totalFiles += files
totalFiles += uint64(files)
if len(serrs) != 0 {
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
for _, err := range serrs {
@ -61,11 +61,7 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
return res, nil
}
func scrubVolumeIndex(ctx context.Context, v *storage.Volume) (uint64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeIndex(): not implemented")}
}
func scrubVolumeFull(ctx context.Context, v *storage.Volume) (uint64, []error) {
func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented")}
}
@ -91,12 +87,13 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
return nil, fmt.Errorf("EC volume id %d not found", vid)
}
var files uint64
var files int64
var shardInfos []*volume_server_pb.EcShardInfo
var serrs []error
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, shardInfos, serrs = scrubEcVolumeIndex(v)
// index scrubs do not verify individual EC shards
files, serrs = v.CheckIndex()
case volume_server_pb.VolumeScrubMode_FULL:
files, shardInfos, serrs = scrubEcVolumeFull(ctx, v)
default:
@ -104,7 +101,7 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
}
totalVolumes += 1
totalFiles += files
totalFiles += uint64(files)
if len(serrs) != 0 || len(shardInfos) != 0 {
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
brokenShardInfos = append(brokenShardInfos, shardInfos...)
@ -124,10 +121,6 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
return res, nil
}
func scrubEcVolumeIndex(ecv *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeIndex(): not implemented")}
}
func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) {
func scrubEcVolumeFull(ctx context.Context, ecv *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented")}
}

11
weed/storage/erasure_coding/ec_volume.go

@ -332,3 +332,14 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t
func (ev *EcVolume) IsTimeToDestroy() bool {
return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds)
}
func (ev *EcVolume) CheckIndex() (int64, []error) {
if ev.ecxFile == nil {
return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)}
}
if ev.ecxFileSize == 0 {
return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)}
}
return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version)
}

2
weed/storage/erasure_coding/ec_volume_test.go

@ -13,7 +13,7 @@ import (
func TestPositioning(t *testing.T) {
ecxFile, err := os.OpenFile("389.ecx", os.O_RDONLY, 0)
ecxFile, err := os.OpenFile("../idx/test_files/389.ecx", os.O_RDONLY, 0)
if err != nil {
t.Errorf("failed to open ecx file: %v", err)
}

103
weed/storage/idx/check.go

@ -0,0 +1,103 @@
package idx
import (
"fmt"
"io"
"sort"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
type indexEntry struct {
index int
id types.NeedleId
offset int64
size types.Size
}
func (ie *indexEntry) Compare(other *indexEntry) int {
if ie.offset < other.offset {
return -1
}
if ie.offset > other.offset {
return 1
}
if ie.size < other.size {
return -1
}
if ie.size > other.size {
return 1
}
return 0
}
// CheckIndexFile verifies the integrity of a IDX/ECX index file. Returns a count of processed file entries, and slice of found errors.
func CheckIndexFile(r io.ReaderAt, indexFileSize int64, version needle.Version) (int64, []error) {
errs := []error{}
entries := []*indexEntry{}
var i int
err := WalkIndexFile(r, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error {
entries = append(entries, &indexEntry{
index: i,
id: id,
offset: offset.ToActualOffset(),
size: size,
})
i++
return nil
})
if err != nil {
errs = append(errs, err)
}
sort.Slice(entries, func(i, j int) bool {
return entries[i].Compare(entries[j]) < 0
})
for i, e := range entries {
if i == 0 {
// nothing to check for the first entry
continue
}
start, end := e.offset, e.offset
if size := needle.GetActualSize(e.size, version); size != 0 {
end += size - 1
}
last := entries[i-1]
lastStart, lastEnd := last.offset, last.offset
if lastSize := needle.GetActualSize(last.size, version); lastSize != 0 {
lastEnd += lastSize - 1
}
// check if needles overlap
if start <= lastEnd {
errs = append(errs, fmt.Errorf(
"needle %d (#%d) at [%d-%d] overlaps needle %d at [%d-%d]",
e.id, e.index+1,
start, end,
last.id,
lastStart, lastEnd))
}
// The check below is intended to ensure all index entries are contiguous; unfortunately, Seaweed
// can delete index entries for files while keeping their data, so volumes with deleted files
// will fail this test :(
// See https://github.com/seaweedfs/seaweedfs/issues/8204 for details.
/*
if e.offset != lastEnd + 1 {
errs = append(errs, fmt.Errorf("offset %d for needle %d (#%d) doesn't match end of needle %d at %d", e.offset, e.id, e.index+1, last.id, lastEnd))
}
*/
}
count := int64(len(entries))
if got, want := count*types.NeedleMapEntrySize, indexFileSize; got != want {
errs = append(errs, fmt.Errorf("expected an index file of size %d, got %d", want, got))
}
return count, errs
}

108
weed/storage/idx/check_test.go

@ -0,0 +1,108 @@
package idx
import (
"fmt"
"os"
"reflect"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func TestCheckIndexFile(t *testing.T) {
testCases := []struct {
name string
indexPath string
version needle.Version
want int64
wantErrs []error
}{
{
name: "healthy index",
indexPath: "./test_files/simple_index.idx",
version: needle.Version3,
want: 161,
wantErrs: []error{},
},
{
name: "healthy index with deleted files",
indexPath: "./test_files/deleted_files.idx",
version: needle.Version3,
want: 230,
wantErrs: []error{},
},
{
name: "damaged index (bitrot)",
indexPath: "./test_files/simple_index_bitrot.idx",
version: needle.Version3,
want: 161,
wantErrs: []error{
fmt.Errorf("needle 3544668469065756977 (#2) at [6602459528-7427766999] overlaps needle 49 at [6602459528-7427766999]"),
fmt.Errorf("expected an index file of size 2577, got 2576"),
},
},
{
name: "damaged index (truncated)",
indexPath: "./test_files/simple_index_truncated.idx",
version: needle.Version3,
want: 158,
wantErrs: []error{
fmt.Errorf("expected an index file of size 2540, got 2528"),
},
},
{
name: "healthy EC index",
indexPath: "./test_files/389.ecx",
version: needle.Version3,
want: 485098,
wantErrs: []error{},
},
{
name: "healthy EC index with deleted files",
indexPath: "./test_files/deleted_files.ecx",
version: needle.Version3,
want: 116,
wantErrs: []error{},
},
{
name: "damaged EC index (bitrot)",
indexPath: "./test_files/deleted_files_bitrot.ecx",
version: needle.Version3,
want: 116,
wantErrs: []error{
fmt.Errorf("needle 3223857 (#110) at [6602459528-7427767055] overlaps needle 12593 at [6601933184-7407907279]"),
fmt.Errorf("needle 3544668469065757234 (#43) at [6737203600-7579354079] overlaps needle 3223857 at [6602459528-7427767055]"),
fmt.Errorf("needle 3421236 (#112) at [7006693800-7899362591] overlaps needle 3544668469065757234 at [6737203600-7579354079]"),
fmt.Errorf("needle 310 (#113) at [7276179888-8185702583] overlaps needle 3421236 at [7006693800-7899362591]"),
fmt.Errorf("needle 7089336938131513954 (#52) at [13204919056-13205053935] overlaps needle 27410143614427489 at [13070174984-14703946887]"),
fmt.Errorf("needle 25186 (#50) at [13204919056-14855533967] overlaps needle 7089336938131513954 at [13204919056-13205053935]"),
fmt.Errorf("needle 7089336938131513954 (#51) at [13204919056-14855533967] overlaps needle 25186 at [13204919056-14855533967]"),
fmt.Errorf("expected an index file of size 1857, got 1856"),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
idx, err := os.OpenFile(tc.indexPath, os.O_RDONLY, 0)
if err != nil {
t.Fatalf("failed to open index file: %v", err)
}
defer idx.Close()
idxStat, err := idx.Stat()
if err != nil {
t.Fatalf("failed to stat index file: %v", err)
}
got, gotErrs := CheckIndexFile(idx, idxStat.Size(), tc.version)
if got != tc.want {
t.Errorf("expected %d files processed, got %d", tc.want, got)
}
if !reflect.DeepEqual(gotErrs, tc.wantErrs) {
t.Errorf("expected errors %v, got %v", tc.wantErrs, gotErrs)
}
})
}
}

0
weed/storage/erasure_coding/389.ecx → weed/storage/idx/test_files/389.ecx

BIN
weed/storage/idx/test_files/deleted_files.ecx

BIN
weed/storage/idx/test_files/deleted_files.idx

BIN
weed/storage/idx/test_files/deleted_files_bitrot.ecx

BIN
weed/storage/idx/test_files/simple_index.idx

BIN
weed/storage/idx/test_files/simple_index_bitrot.idx

BIN
weed/storage/idx/test_files/simple_index_truncated.idx

22
weed/storage/volume_checking.go

@ -14,6 +14,28 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (v *Volume) CheckIndex() (int64, []error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
idxFileName := v.FileName(".idx")
idxFile, err := os.OpenFile(idxFileName, os.O_RDONLY, 0644)
if err != nil {
return 0, []error{fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)}
}
defer idxFile.Close()
idxStat, err := idxFile.Stat()
if err != nil {
return 0, []error{fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)}
}
if idxStat.Size() == 0 {
return 0, []error{fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)}
}
return idx.CheckIndexFile(idxFile, idxStat.Size(), v.Version())
}
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {

Loading…
Cancel
Save