diff --git a/weed/server/volume_grpc_scrub.go b/weed/server/volume_grpc_scrub.go index 5899241da..9d8d42582 100644 --- a/weed/server/volume_grpc_scrub.go +++ b/weed/server/volume_grpc_scrub.go @@ -31,11 +31,11 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S return nil, fmt.Errorf("volume id %d not found", vid) } - var files uint64 + var files int64 var serrs []error switch m := req.GetMode(); m { case volume_server_pb.VolumeScrubMode_INDEX: - files, serrs = scrubVolumeIndex(ctx, v) + files, serrs = v.CheckIndex() case volume_server_pb.VolumeScrubMode_FULL: files, serrs = scrubVolumeFull(ctx, v) default: @@ -43,7 +43,7 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S } totalVolumes += 1 - totalFiles += files + totalFiles += uint64(files) if len(serrs) != 0 { brokenVolumeIds = append(brokenVolumeIds, uint32(vid)) for _, err := range serrs { @@ -61,11 +61,7 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S return res, nil } -func scrubVolumeIndex(ctx context.Context, v *storage.Volume) (uint64, []error) { - return 0, []error{fmt.Errorf("scrubVolumeIndex(): not implemented")} -} - -func scrubVolumeFull(ctx context.Context, v *storage.Volume) (uint64, []error) { +func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) { return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented")} } @@ -91,12 +87,13 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb return nil, fmt.Errorf("EC volume id %d not found", vid) } - var files uint64 + var files int64 var shardInfos []*volume_server_pb.EcShardInfo var serrs []error switch m := req.GetMode(); m { case volume_server_pb.VolumeScrubMode_INDEX: - files, shardInfos, serrs = scrubEcVolumeIndex(v) + // index scrubs do not verify individual EC shards + files, serrs = v.CheckIndex() case volume_server_pb.VolumeScrubMode_FULL: files, shardInfos, serrs = scrubEcVolumeFull(ctx, v) default: @@ -104,7 +101,7 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb } totalVolumes += 1 - totalFiles += files + totalFiles += uint64(files) if len(serrs) != 0 || len(shardInfos) != 0 { brokenVolumeIds = append(brokenVolumeIds, uint32(vid)) brokenShardInfos = append(brokenShardInfos, shardInfos...) @@ -124,10 +121,6 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb return res, nil } -func scrubEcVolumeIndex(ecv *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) { - return 0, nil, []error{fmt.Errorf("scrubEcVolumeIndex(): not implemented")} -} - -func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) { +func scrubEcVolumeFull(ctx context.Context, ecv *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented")} } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index fb585e5ab..7581fb67f 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -332,3 +332,14 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t func (ev *EcVolume) IsTimeToDestroy() bool { return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds) } + +func (ev *EcVolume) CheckIndex() (int64, []error) { + if ev.ecxFile == nil { + return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)} + } + if ev.ecxFileSize == 0 { + return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)} + } + + return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version) +} diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go index 82df0b8b1..323972c1a 100644 --- a/weed/storage/erasure_coding/ec_volume_test.go +++ b/weed/storage/erasure_coding/ec_volume_test.go @@ -13,7 +13,7 @@ import ( func TestPositioning(t *testing.T) { - ecxFile, err := os.OpenFile("389.ecx", os.O_RDONLY, 0) + ecxFile, err := os.OpenFile("../idx/test_files/389.ecx", os.O_RDONLY, 0) if err != nil { t.Errorf("failed to open ecx file: %v", err) } diff --git a/weed/storage/idx/check.go b/weed/storage/idx/check.go new file mode 100644 index 000000000..c0ff3267d --- /dev/null +++ b/weed/storage/idx/check.go @@ -0,0 +1,103 @@ +package idx + +import ( + "fmt" + "io" + "sort" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +type indexEntry struct { + index int + id types.NeedleId + offset int64 + size types.Size +} + +func (ie *indexEntry) Compare(other *indexEntry) int { + if ie.offset < other.offset { + return -1 + } + if ie.offset > other.offset { + return 1 + } + if ie.size < other.size { + return -1 + } + if ie.size > other.size { + return 1 + } + return 0 +} + +// CheckIndexFile verifies the integrity of a IDX/ECX index file. Returns a count of processed file entries, and slice of found errors. +func CheckIndexFile(r io.ReaderAt, indexFileSize int64, version needle.Version) (int64, []error) { + errs := []error{} + + entries := []*indexEntry{} + var i int + err := WalkIndexFile(r, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error { + entries = append(entries, &indexEntry{ + index: i, + id: id, + offset: offset.ToActualOffset(), + size: size, + }) + i++ + return nil + }) + if err != nil { + errs = append(errs, err) + } + + sort.Slice(entries, func(i, j int) bool { + return entries[i].Compare(entries[j]) < 0 + }) + + for i, e := range entries { + if i == 0 { + // nothing to check for the first entry + continue + } + + start, end := e.offset, e.offset + if size := needle.GetActualSize(e.size, version); size != 0 { + end += size - 1 + } + + last := entries[i-1] + lastStart, lastEnd := last.offset, last.offset + if lastSize := needle.GetActualSize(last.size, version); lastSize != 0 { + lastEnd += lastSize - 1 + } + + // check if needles overlap + if start <= lastEnd { + errs = append(errs, fmt.Errorf( + "needle %d (#%d) at [%d-%d] overlaps needle %d at [%d-%d]", + e.id, e.index+1, + start, end, + last.id, + lastStart, lastEnd)) + } + + // The check below is intended to ensure all index entries are contiguous; unfortunately, Seaweed + // can delete index entries for files while keeping their data, so volumes with deleted files + // will fail this test :( + // See https://github.com/seaweedfs/seaweedfs/issues/8204 for details. + /* + if e.offset != lastEnd + 1 { + errs = append(errs, fmt.Errorf("offset %d for needle %d (#%d) doesn't match end of needle %d at %d", e.offset, e.id, e.index+1, last.id, lastEnd)) + } + */ + } + + count := int64(len(entries)) + if got, want := count*types.NeedleMapEntrySize, indexFileSize; got != want { + errs = append(errs, fmt.Errorf("expected an index file of size %d, got %d", want, got)) + } + + return count, errs +} diff --git a/weed/storage/idx/check_test.go b/weed/storage/idx/check_test.go new file mode 100644 index 000000000..c397d8e56 --- /dev/null +++ b/weed/storage/idx/check_test.go @@ -0,0 +1,108 @@ +package idx + +import ( + "fmt" + "os" + "reflect" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +func TestCheckIndexFile(t *testing.T) { + testCases := []struct { + name string + indexPath string + version needle.Version + want int64 + wantErrs []error + }{ + { + name: "healthy index", + indexPath: "./test_files/simple_index.idx", + version: needle.Version3, + want: 161, + wantErrs: []error{}, + }, + { + name: "healthy index with deleted files", + indexPath: "./test_files/deleted_files.idx", + version: needle.Version3, + want: 230, + wantErrs: []error{}, + }, + { + name: "damaged index (bitrot)", + indexPath: "./test_files/simple_index_bitrot.idx", + version: needle.Version3, + want: 161, + wantErrs: []error{ + fmt.Errorf("needle 3544668469065756977 (#2) at [6602459528-7427766999] overlaps needle 49 at [6602459528-7427766999]"), + fmt.Errorf("expected an index file of size 2577, got 2576"), + }, + }, + { + name: "damaged index (truncated)", + indexPath: "./test_files/simple_index_truncated.idx", + version: needle.Version3, + want: 158, + wantErrs: []error{ + fmt.Errorf("expected an index file of size 2540, got 2528"), + }, + }, + { + name: "healthy EC index", + indexPath: "./test_files/389.ecx", + version: needle.Version3, + want: 485098, + wantErrs: []error{}, + }, + { + name: "healthy EC index with deleted files", + indexPath: "./test_files/deleted_files.ecx", + version: needle.Version3, + want: 116, + wantErrs: []error{}, + }, + { + name: "damaged EC index (bitrot)", + indexPath: "./test_files/deleted_files_bitrot.ecx", + version: needle.Version3, + want: 116, + wantErrs: []error{ + fmt.Errorf("needle 3223857 (#110) at [6602459528-7427767055] overlaps needle 12593 at [6601933184-7407907279]"), + fmt.Errorf("needle 3544668469065757234 (#43) at [6737203600-7579354079] overlaps needle 3223857 at [6602459528-7427767055]"), + fmt.Errorf("needle 3421236 (#112) at [7006693800-7899362591] overlaps needle 3544668469065757234 at [6737203600-7579354079]"), + fmt.Errorf("needle 310 (#113) at [7276179888-8185702583] overlaps needle 3421236 at [7006693800-7899362591]"), + fmt.Errorf("needle 7089336938131513954 (#52) at [13204919056-13205053935] overlaps needle 27410143614427489 at [13070174984-14703946887]"), + fmt.Errorf("needle 25186 (#50) at [13204919056-14855533967] overlaps needle 7089336938131513954 at [13204919056-13205053935]"), + fmt.Errorf("needle 7089336938131513954 (#51) at [13204919056-14855533967] overlaps needle 25186 at [13204919056-14855533967]"), + fmt.Errorf("expected an index file of size 1857, got 1856"), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + idx, err := os.OpenFile(tc.indexPath, os.O_RDONLY, 0) + if err != nil { + t.Fatalf("failed to open index file: %v", err) + } + defer idx.Close() + + idxStat, err := idx.Stat() + if err != nil { + t.Fatalf("failed to stat index file: %v", err) + } + + got, gotErrs := CheckIndexFile(idx, idxStat.Size(), tc.version) + + if got != tc.want { + t.Errorf("expected %d files processed, got %d", tc.want, got) + } + if !reflect.DeepEqual(gotErrs, tc.wantErrs) { + t.Errorf("expected errors %v, got %v", tc.wantErrs, gotErrs) + } + }) + } +} diff --git a/weed/storage/erasure_coding/389.ecx b/weed/storage/idx/test_files/389.ecx similarity index 100% rename from weed/storage/erasure_coding/389.ecx rename to weed/storage/idx/test_files/389.ecx diff --git a/weed/storage/idx/test_files/deleted_files.ecx b/weed/storage/idx/test_files/deleted_files.ecx new file mode 100644 index 000000000..e8b29d7a7 Binary files /dev/null and b/weed/storage/idx/test_files/deleted_files.ecx differ diff --git a/weed/storage/idx/test_files/deleted_files.idx b/weed/storage/idx/test_files/deleted_files.idx new file mode 100644 index 000000000..28e0122fe Binary files /dev/null and b/weed/storage/idx/test_files/deleted_files.idx differ diff --git a/weed/storage/idx/test_files/deleted_files_bitrot.ecx b/weed/storage/idx/test_files/deleted_files_bitrot.ecx new file mode 100644 index 000000000..ba8218cec Binary files /dev/null and b/weed/storage/idx/test_files/deleted_files_bitrot.ecx differ diff --git a/weed/storage/idx/test_files/simple_index.idx b/weed/storage/idx/test_files/simple_index.idx new file mode 100644 index 000000000..0b7a10295 Binary files /dev/null and b/weed/storage/idx/test_files/simple_index.idx differ diff --git a/weed/storage/idx/test_files/simple_index_bitrot.idx b/weed/storage/idx/test_files/simple_index_bitrot.idx new file mode 100644 index 000000000..4f821da89 Binary files /dev/null and b/weed/storage/idx/test_files/simple_index_bitrot.idx differ diff --git a/weed/storage/idx/test_files/simple_index_truncated.idx b/weed/storage/idx/test_files/simple_index_truncated.idx new file mode 100644 index 000000000..fcf238305 Binary files /dev/null and b/weed/storage/idx/test_files/simple_index_truncated.idx differ diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 7ac63d6f0..e178e6b1a 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -14,6 +14,28 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +func (v *Volume) CheckIndex() (int64, []error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + idxFileName := v.FileName(".idx") + idxFile, err := os.OpenFile(idxFileName, os.O_RDONLY, 0644) + if err != nil { + return 0, []error{fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)} + } + defer idxFile.Close() + + idxStat, err := idxFile.Stat() + if err != nil { + return 0, []error{fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)} + } + if idxStat.Size() == 0 { + return 0, []error{fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)} + } + + return idx.CheckIndexFile(idxFile, idxStat.Size(), v.Version()) +} + func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {