Browse Source

ReadAt may return io.EOF t end of file

related to https://github.com/seaweedfs/seaweedfs/issues/6219
pull/6267/head
chrislu 1 month ago
parent
commit
c9f3448692
  1. 6
      weed/mount/page_writer/page_chunk_swapfile.go
  2. 7
      weed/storage/backend/disk_file.go
  3. 7
      weed/storage/erasure_coding/ec_shard.go
  4. 6
      weed/storage/erasure_coding/ec_volume.go
  5. 10
      weed/storage/needle/needle_read.go
  6. 6
      weed/storage/needle/needle_read_page.go
  7. 6
      weed/storage/needle_map_metric.go
  8. 8
      weed/storage/store_ec.go
  9. 8
      weed/storage/super_block/super_block_read.go.go
  10. 12
      weed/storage/volume_checking.go
  11. 18
      weed/util/chunk_cache/chunk_cache_on_disk.go

6
weed/mount/page_writer/page_chunk_swapfile.go

@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"os"
"sync"
)
@ -130,7 +131,10 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop in
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
if err == io.EOF && n == int(logicStop-logicStart) {
err = nil
}
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}

7
weed/storage/backend/disk_file.go

@ -3,6 +3,7 @@ package backend
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"io"
"os"
"runtime"
"time"
@ -43,7 +44,11 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
if df.File == nil {
return 0, os.ErrClosed
}
return df.File.ReadAt(p, off)
n, err = df.File.ReadAt(p, off)
if err == io.EOF && n == len(p) {
err = nil
}
return
}
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {

7
weed/storage/erasure_coding/ec_shard.go

@ -2,6 +2,7 @@ package erasure_coding
import (
"fmt"
"io"
"os"
"path"
"strconv"
@ -93,6 +94,10 @@ func (shard *EcVolumeShard) Destroy() {
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
return shard.ecdFile.ReadAt(buf, offset)
n, err := shard.ecdFile.ReadAt(buf, offset)
if err == io.EOF && n == len(buf) {
err = nil
}
return n, err
}

6
weed/storage/erasure_coding/ec_volume.go

@ -255,8 +255,10 @@ func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId t
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
if n != types.NeedleMapEntrySize {
return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
}
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {

10
weed/storage/needle/needle_read.go

@ -196,6 +196,9 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6
var count int
count, err = r.ReadAt(bytes, offset)
if err == io.EOF && count == NeedleHeaderSize {
err = nil
}
if count <= 0 || err != nil {
return nil, bytes, 0, err
}
@ -230,7 +233,12 @@ func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, o
return nil, nil
}
bytes = make([]byte, bodyLength)
if _, err = r.ReadAt(bytes, offset); err != nil {
readCount, err := r.ReadAt(bytes, offset)
if err == io.EOF && int64(readCount) == bodyLength {
err = nil
}
if err != nil {
glog.Errorf("%s read %d bodyLength %d offset %d: %v", r.Name(), readCount, bodyLength, offset, err)
return
}

6
weed/storage/needle/needle_read_page.go

@ -21,6 +21,9 @@ func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64
startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
count, err = r.ReadAt(data[:sizeToRead], startOffset)
if err == io.EOF && int64(count) == sizeToRead {
err = nil
}
if err != nil {
fileSize, _, _ := r.GetStat()
glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
@ -40,6 +43,9 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
count, err := r.ReadAt(bytes, offset)
if err == io.EOF && count == NeedleHeaderSize+DataSizeSize {
err = nil
}
if count != NeedleHeaderSize+DataSizeSize || err != nil {
return err
}

6
weed/storage/needle_map_metric.go

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"io"
"os"
"sync/atomic"
@ -152,8 +153,11 @@ func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key
remainingCount := entryCount - nextBatchSize
for remainingCount >= 0 {
_, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
n, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
// glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
if e == io.EOF && n == int(NeedleMapEntrySize*nextBatchSize) {
e = nil
}
if e != nil {
return e
}

8
weed/storage/store_ec.go

@ -201,9 +201,11 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
if _, err = shard.ReadAt(data, actualOffset); err != nil {
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
return
if n, err = shard.ReadAt(data, actualOffset); err != nil {
if n != interval.Size {
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
return
}
}
} else {
ecVolume.ShardLocationsLock.RLock()

8
weed/storage/super_block/super_block_read.go.go

@ -15,9 +15,11 @@ import (
func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
header := make([]byte, SuperBlockSize)
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
return
if n, e := datBackend.ReadAt(header, 0); e != nil {
if n != SuperBlockSize {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
return
}
}
superBlock.Version = needle.Version(header[0])

12
weed/storage/volume_checking.go

@ -80,7 +80,11 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
bytes = make([]byte, NeedleMapEntrySize)
_, err = indexFile.ReadAt(bytes, offset)
var readCount int
readCount, err = indexFile.ReadAt(bytes, offset)
if err == io.EOF && readCount == NeedleMapEntrySize {
err = nil
}
return
}
@ -97,7 +101,11 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
}
if v == needle.Version3 {
bytes := make([]byte, TimestampSize)
_, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
var readCount int
readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
if err == io.EOF && readCount == TimestampSize {
err = nil
}
if err == io.EOF {
return 0, err
}

18
weed/util/chunk_cache/chunk_cache_on_disk.go

@ -110,8 +110,10 @@ func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
}
data := make([]byte, nv.Size)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()); readErr != nil {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr)
if readSize != int(nv.Size) {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset(), nv.Offset.ToActualOffset()+int64(nv.Size), readErr)
}
} else {
if readSize != int(nv.Size) {
return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
@ -133,8 +135,10 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
}
data := make([]byte, wanted)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); readErr != nil {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr)
if readSize != wanted {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, readErr)
}
} else {
if readSize != wanted {
return nil, fmt.Errorf("read %d, expected %d", readSize, wanted)
@ -155,8 +159,10 @@ func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, of
return 0, ErrorOutOfBounds
}
if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {
return n, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
if n != wanted {
return n, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
}
} else {
if n != wanted {
return n, fmt.Errorf("read %d, expected %d", n, wanted)

Loading…
Cancel
Save