From c9f344869246ca4d5c72f97f63226b41e0e9c190 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 21 Nov 2024 00:37:38 -0800 Subject: [PATCH] ReadAt may return io.EOF t end of file related to https://github.com/seaweedfs/seaweedfs/issues/6219 --- weed/mount/page_writer/page_chunk_swapfile.go | 6 +++++- weed/storage/backend/disk_file.go | 7 ++++++- weed/storage/erasure_coding/ec_shard.go | 7 ++++++- weed/storage/erasure_coding/ec_volume.go | 6 ++++-- weed/storage/needle/needle_read.go | 10 +++++++++- weed/storage/needle/needle_read_page.go | 6 ++++++ weed/storage/needle_map_metric.go | 6 +++++- weed/storage/store_ec.go | 8 +++++--- .../storage/super_block/super_block_read.go.go | 8 +++++--- weed/storage/volume_checking.go | 12 ++++++++++-- weed/util/chunk_cache/chunk_cache_on_disk.go | 18 ++++++++++++------ 11 files changed, 73 insertions(+), 21 deletions(-) diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 350821757..bdd941117 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -4,6 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" + "io" "os" "sync" ) @@ -130,7 +131,10 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + if err == io.EOF && n == int(logicStop-logicStart) { + err = nil + } glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) break } diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 68ffbd7e7..070f79865 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -3,6 +3,7 @@ package backend import ( "github.com/seaweedfs/seaweedfs/weed/glog" . "github.com/seaweedfs/seaweedfs/weed/storage/types" + "io" "os" "runtime" "time" @@ -43,7 +44,11 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { if df.File == nil { return 0, os.ErrClosed } - return df.File.ReadAt(p, off) + n, err = df.File.ReadAt(p, off) + if err == io.EOF && n == len(p) { + err = nil + } + return } func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 9fcb11525..0a7f4d09a 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -2,6 +2,7 @@ package erasure_coding import ( "fmt" + "io" "os" "path" "strconv" @@ -93,6 +94,10 @@ func (shard *EcVolumeShard) Destroy() { func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { - return shard.ecdFile.ReadAt(buf, offset) + n, err := shard.ecdFile.ReadAt(buf, offset) + if err == io.EOF && n == len(buf) { + err = nil + } + return n, err } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index d3a76b561..385d30abe 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -255,8 +255,10 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize for l < h { m := (l + h) / 2 - if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { - return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) + if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil { + if n != types.NeedleMapEntrySize { + return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err) + } } key, offset, size = idx.IdxFileEntry(buf) if key == needleId { diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index f8468e9e2..1907efad3 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -196,6 +196,9 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6 var count int count, err = r.ReadAt(bytes, offset) + if err == io.EOF && count == NeedleHeaderSize { + err = nil + } if count <= 0 || err != nil { return nil, bytes, 0, err } @@ -230,7 +233,12 @@ func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, o return nil, nil } bytes = make([]byte, bodyLength) - if _, err = r.ReadAt(bytes, offset); err != nil { + readCount, err := r.ReadAt(bytes, offset) + if err == io.EOF && int64(readCount) == bodyLength { + err = nil + } + if err != nil { + glog.Errorf("%s read %d bodyLength %d offset %d: %v", r.Name(), readCount, bodyLength, offset, err) return } diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index 36ddc3320..4e1032de8 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -21,6 +21,9 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64 startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset count, err = r.ReadAt(data[:sizeToRead], startOffset) + if err == io.EOF && int64(count) == sizeToRead { + err = nil + } if err != nil { fileSize, _, _ := r.GetStat() glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err) @@ -40,6 +43,9 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size bytes := make([]byte, NeedleHeaderSize+DataSizeSize) count, err := r.ReadAt(bytes, offset) + if err == io.EOF && count == NeedleHeaderSize+DataSizeSize { + err = nil + } if count != NeedleHeaderSize+DataSizeSize || err != nil { return err } diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index 7669180ba..d6d0a8730 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "io" "os" "sync/atomic" @@ -152,8 +153,11 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key remainingCount := entryCount - nextBatchSize for remainingCount >= 0 { - _, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount) + n, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount) // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e) + if e == io.EOF && n == int(NeedleMapEntrySize*nextBatchSize) { + e = nil + } if e != nil { return e } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 7f4d9797a..014a870db 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -201,9 +201,11 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { - if _, err = shard.ReadAt(data, actualOffset); err != nil { - glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) - return + if n, err = shard.ReadAt(data, actualOffset); err != nil { + if n != interval.Size { + glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) + return + } } } else { ecVolume.ShardLocationsLock.RLock() diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go index fd1a874dd..dba1cba72 100644 --- a/weed/storage/super_block/super_block_read.go.go +++ b/weed/storage/super_block/super_block_read.go.go @@ -15,9 +15,11 @@ import ( func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) { header := make([]byte, SuperBlockSize) - if _, e := datBackend.ReadAt(header, 0); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e) - return + if n, e := datBackend.ReadAt(header, 0); e != nil { + if n != SuperBlockSize { + err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e) + return + } } superBlock.Version = needle.Version(header[0]) diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 9bd432f85..6d2335f70 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -80,7 +80,11 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } bytes = make([]byte, NeedleMapEntrySize) - _, err = indexFile.ReadAt(bytes, offset) + var readCount int + readCount, err = indexFile.ReadAt(bytes, offset) + if err == io.EOF && readCount == NeedleMapEntrySize { + err = nil + } return } @@ -97,7 +101,11 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, } if v == needle.Version3 { bytes := make([]byte, TimestampSize) - _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + var readCount int + readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF && readCount == TimestampSize { + err = nil + } if err == io.EOF { return 0, err } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 3968f62e9..87f05d399 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -110,8 +110,10 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { } data := make([]byte, nv.Size) if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil { - return nil, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr) + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr) + } } else { if readSize != int(nv.Size) { return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) @@ -133,8 +135,10 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin } data := make([]byte, wanted) if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); readErr != nil { - return nil, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr) + if readSize != wanted { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr) + } } else { if readSize != wanted { return nil, fmt.Errorf("read %d, expected %d", readSize, wanted) @@ -155,8 +159,10 @@ func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, of return 0, ErrorOutOfBounds } if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil { - return n, fmt.Errorf("read %s.dat [%d,%d): %v", - v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + if n != wanted { + return n, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + } } else { if n != wanted { return n, fmt.Errorf("read %d, expected %d", n, wanted)