diff --git a/weed/server/volume_grpc_scrub.go b/weed/server/volume_grpc_scrub.go index dfb317da1..1e0b32dc5 100644 --- a/weed/server/volume_grpc_scrub.go +++ b/weed/server/volume_grpc_scrub.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -35,11 +34,12 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S var serrs []error switch m := req.GetMode(); m { case volume_server_pb.VolumeScrubMode_INDEX: - files, serrs = v.CheckIndex() + files, serrs = v.ScrubIndex() case volume_server_pb.VolumeScrubMode_LOCAL: - files, serrs = scrubVolumeLocal(ctx, v) + // LOCAL is equivalent to FULL for regular volumes + fallthrough case volume_server_pb.VolumeScrubMode_FULL: - files, serrs = scrubVolumeFull(ctx, v) + files, serrs = v.Scrub() default: return nil, fmt.Errorf("unsupported volume scrub mode %d", m) } @@ -63,14 +63,6 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S return res, nil } -func scrubVolumeLocal(ctx context.Context, v *storage.Volume) (int64, []error) { - return 0, []error{fmt.Errorf("scrubVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} -} - -func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) { - return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} -} - func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb.ScrubEcVolumeRequest) (*volume_server_pb.ScrubEcVolumeResponse, error) { vids := []needle.VolumeId{} if len(req.GetVolumeIds()) == 0 { diff --git a/weed/shell/command_volume_scrub.go b/weed/shell/command_volume_scrub.go index c6a24e13a..f5334798a 100644 --- a/weed/shell/command_volume_scrub.go +++ b/weed/shell/command_volume_scrub.go @@ -50,8 +50,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server : (optional)") volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)") - // TODO: switch default mode to LOCAL, once implemented. - mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") + mode := volScrubCommand.String("mode", "full", "scrubbing mode (index/local/full)") maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") if err = volScrubCommand.Parse(args); err != nil { diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 25eabb2b3..47918260e 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -55,7 +55,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio if n.Size != size { if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() - glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) + glog.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) return ErrorSizeMismatch } stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc() diff --git a/weed/storage/test_files/bitrot_volume.dat b/weed/storage/test_files/bitrot_volume.dat new file mode 100644 index 000000000..5e92451a1 Binary files /dev/null and b/weed/storage/test_files/bitrot_volume.dat differ diff --git a/weed/storage/test_files/bitrot_volume.idx b/weed/storage/test_files/bitrot_volume.idx new file mode 100644 index 000000000..5aa300f13 Binary files /dev/null and b/weed/storage/test_files/bitrot_volume.idx differ diff --git a/weed/storage/test_files/healthy_volume.dat b/weed/storage/test_files/healthy_volume.dat new file mode 100644 index 000000000..dafe0da08 Binary files /dev/null and b/weed/storage/test_files/healthy_volume.dat differ diff --git a/weed/storage/test_files/healthy_volume.idx b/weed/storage/test_files/healthy_volume.idx new file mode 100644 index 000000000..5aa300f13 Binary files /dev/null and b/weed/storage/test_files/healthy_volume.idx differ diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index e178e6b1a..4ae83394c 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -10,30 +10,108 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) -func (v *Volume) CheckIndex() (int64, []error) { - v.dataFileAccessLock.RLock() - defer v.dataFileAccessLock.RUnlock() - +// openIndex returns a file descriptor for the volume's index, and the index size in bytes. +func (v *Volume) openIndex() (*os.File, int64, error) { idxFileName := v.FileName(".idx") idxFile, err := os.OpenFile(idxFileName, os.O_RDONLY, 0644) if err != nil { - return 0, []error{fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)} + return nil, 0, fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err) } - defer idxFile.Close() idxStat, err := idxFile.Stat() if err != nil { - return 0, []error{fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)} + idxFile.Close() + return nil, 0, fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err) } if idxStat.Size() == 0 { - return 0, []error{fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)} + idxFile.Close() + return nil, 0, fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName) + } + + return idxFile, idxStat.Size(), nil +} + +// ScrubIndex checks the volume's index for issues. +func (v *Volume) ScrubIndex() (int64, []error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + idxFile, idxFileSize, err := v.openIndex() + if err != nil { + return 0, []error{err} } + defer idxFile.Close() + + return idx.CheckIndexFile(idxFile, idxFileSize, v.Version()) +} + +// scrubVolumeData checks a volume content + index for issues. +func (v *Volume) scrubVolumeData(dataFile backend.BackendStorageFile, idxFile *os.File, idxFileSize int64) (int64, []error) { + // full scrubbing means also scrubbing the index + var count int64 + _, errs := idx.CheckIndexFile(idxFile, idxFileSize, v.Version()) + + // read and check every indexed needle + var totalRead int64 + version := v.Version() + err := idx.WalkIndexFile(idxFile, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error { + count++ + // compute the actual size of the needle in disk, including needle header, body and alignment padding. + actualSize := int64(needle.GetActualSize(size, version)) + + // TODO: Needle.ReadData() is currently broken for deleted files, which have a types.Size < 0. Fix + // so deleted needles get properly scrubbed as well. + // TODO: idx.WalkIndexFile() returns a size -1 (and actual size of 32 bytes) for deleted needles. We + // want to scrub deleted needles whenever possible. + if size.IsDeleted() { + totalRead += actualSize + return nil + } + + n := needle.Needle{} + if err := n.ReadData(dataFile, offset.ToActualOffset(), size, version); err != nil { + errs = append(errs, fmt.Errorf("needle %d on volume %d: %v", id, v.Id, err)) + } + + totalRead += actualSize + return nil + }) + if err != nil { + errs = append(errs, err) + } + + // check total volume file size + wantSize := totalRead + super_block.SuperBlockSize + dataSize, _, err := dataFile.GetStat() + if err != nil { + errs = append(errs, fmt.Errorf("failed to stat data file for volume %d: %v", v.Id, err)) + } else { + if dataSize < wantSize { + errs = append(errs, fmt.Errorf("data file for volume %d is smaller (%d) than the %d needles it contains (%d)", v.Id, dataSize, count, wantSize)) + } else if dataSize != wantSize { + errs = append(errs, fmt.Errorf("data file size for volume %d (%d) doesn't match the size for %d needles read (%d)", v.Id, dataSize, count, wantSize)) + } + } + + return count, errs +} + +// Scrub checks the entire volume content for issues. +func (v *Volume) Scrub() (int64, []error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + idxFile, idxFileSize, err := v.openIndex() + if err != nil { + return 0, []error{err} + } + defer idxFile.Close() - return idx.CheckIndexFile(idxFile, idxStat.Size(), v.Version()) + return v.scrubVolumeData(v.DataBackend, idxFile, idxFileSize) } func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { @@ -45,11 +123,11 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin return 0, nil } healthyIndexSize := indexSize - for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ { + for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ { // check and fix last 10 entries - lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*types.NeedleMapEntrySize) if err == io.EOF { - healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize + healthyIndexSize = indexSize - int64(i)*types.NeedleMapEntrySize continue } if err != ErrorSizeMismatch { @@ -79,7 +157,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) ( } else { if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil { if err == ErrorSizeMismatch { - return verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset()+int64(MaxPossibleVolumeSize), key, size) + return verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset()+int64(types.MaxPossibleVolumeSize), key, size) } return lastAppendAtNs, err } @@ -89,7 +167,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) ( func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { if indexSize, err = util.GetFileSize(indexFile); err == nil { - if indexSize%NeedleMapEntrySize != 0 { + if indexSize%types.NeedleMapEntrySize != 0 { err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize) } } @@ -101,16 +179,16 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err err = fmt.Errorf("offset %d for index file is invalid", offset) return } - bytes = make([]byte, NeedleMapEntrySize) + bytes = make([]byte, types.NeedleMapEntrySize) var readCount int readCount, err = indexFile.ReadAt(bytes, offset) - if err == io.EOF && readCount == NeedleMapEntrySize { + if err == io.EOF && readCount == types.NeedleMapEntrySize { err = nil } return } -func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { +func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key types.NeedleId, size types.Size) (lastAppendAtNs uint64, err error) { n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) if err == io.EOF { return 0, err @@ -122,10 +200,10 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, return 0, ErrorSizeMismatch } if v == needle.Version3 { - bytes := make([]byte, TimestampSize) + bytes := make([]byte, types.TimestampSize) var readCount int - readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) - if err == io.EOF && readCount == TimestampSize { + readCount, err = datFile.ReadAt(bytes, offset+types.NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF && readCount == types.TimestampSize { err = nil } if err == io.EOF { @@ -158,7 +236,7 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, return n.AppendAtNs, err } -func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) { +func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key types.NeedleId) (lastAppendAtNs uint64, err error) { n := new(needle.Needle) size := n.DiskSize(v) var fileSize int64 @@ -166,7 +244,7 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V if err != nil { return 0, fmt.Errorf("GetStat: %w", err) } - if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil { + if err = n.ReadData(datFile, fileSize-size, types.Size(0), v); err != nil { return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err) } if n.Id != key { diff --git a/weed/storage/volume_checking_test.go b/weed/storage/volume_checking_test.go new file mode 100644 index 000000000..2945823c8 --- /dev/null +++ b/weed/storage/volume_checking_test.go @@ -0,0 +1,80 @@ +package storage + +import ( + "fmt" + "os" + "reflect" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" +) + +func TestScrubVolumeData(t *testing.T) { + testCases := []struct { + name string + dataPath string + indexPath string + version needle.Version + want int64 + wantErrs []error + }{ + { + name: "healthy volume", + dataPath: "./test_files/healthy_volume.dat", + indexPath: "./test_files/healthy_volume.idx", + version: needle.Version3, + want: 27, + wantErrs: []error{}, + }, + { + name: "bitrot volume", + dataPath: "./test_files/bitrot_volume.dat", + indexPath: "./test_files/bitrot_volume.idx", + version: needle.Version3, + want: 27, + wantErrs: []error{ + fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted"), + fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted"), + fmt.Errorf("data file size for volume 0 (942864) doesn't match the size for 27 needles read (942856)"), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + datFile, err := os.OpenFile(tc.dataPath, os.O_RDONLY, 0) + if err != nil { + t.Fatalf("failed to open data file: %v", err) + } + defer datFile.Close() + + idxFile, err := os.OpenFile(tc.indexPath, os.O_RDONLY, 0) + if err != nil { + t.Fatalf("failed to open index file: %v", err) + } + defer idxFile.Close() + + idxStat, err := idxFile.Stat() + if err != nil { + t.Fatalf("failed to stat index file: %v", err) + } + + v := Volume{ + volumeInfo: &volume_server_pb.VolumeInfo{ + Version: uint32(tc.version), + }, + } + + got, gotErrs := v.scrubVolumeData(backend.NewDiskFile(datFile), idxFile, idxStat.Size()) + + if got != tc.want { + t.Errorf("expected %d files processed, got %d", tc.want, got) + } + if !reflect.DeepEqual(gotErrs, tc.wantErrs) { + t.Errorf("expected errors %v, got %v", tc.wantErrs, gotErrs) + } + }) + } +}