diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index afe651c4e..56342a0cb 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -92,7 +92,7 @@ func main() { header := superBlock.Bytes() - if n, e := datFile.WriteAt(header, 0); n == 0 || e != nil { + if n, e := datBackend.WriteAt(header, 0); n == 0 || e != nil { glog.Fatalf("cannot write super block: %v", e) } diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 2b04c8df2..498963c31 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -1,6 +1,8 @@ package backend import ( + "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "time" ) @@ -12,12 +14,25 @@ var ( type DiskFile struct { File *os.File fullFilePath string + fileSize int64 + modTime time.Time } func NewDiskFile(f *os.File) *DiskFile { + stat, err := f.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", f.Name(), err) + } + offset := stat.Size() + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + } + return &DiskFile{ fullFilePath: f.Name(), File: f, + fileSize: offset, + modTime: stat.ModTime(), } } @@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { } func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { - return df.File.WriteAt(p, off) + n, err = df.File.WriteAt(p, off) + if err == nil { + waterMark := off + int64(n) + if waterMark > df.fileSize { + df.fileSize = waterMark + df.modTime = time.Now() + } + } + return +} + +func (df *DiskFile) Append(p []byte) (n int, err error) { + return df.WriteAt(p, df.fileSize) } func (df *DiskFile) Truncate(off int64) error { - return df.File.Truncate(off) + err := df.File.Truncate(off) + if err == nil { + df.fileSize = off + df.modTime = time.Now() + } + return err } func (df *DiskFile) Close() error { @@ -38,11 +70,7 @@ func (df *DiskFile) Close() error { } func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { - stat, e := df.File.Stat() - if e == nil { - return stat.Size(), stat.ModTime(), nil - } - return 0, time.Time{}, err + return df.fileSize, df.modTime, nil } func (df *DiskFile) Name() string { diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index e758a6fee..0f72bc0bb 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -161,7 +161,15 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) - _, err = r.ReadAt(dataSlice, offset) + var n int + n, err = r.ReadAt(dataSlice, offset) + if err != nil && int64(n) == dataSize { + err = nil + } + if err != nil { + fileSize, _, _ := r.GetStat() + println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) + } return dataSlice, err } diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go index 47582dd26..afcea5a05 100644 --- a/weed/storage/needle/needle_read_write_test.go +++ b/weed/storage/needle/needle_read_write_test.go @@ -48,7 +48,7 @@ func TestAppend(t *testing.T) { int64 : -9223372036854775808 to 9223372036854775807 */ - fileSize := int64(4294967295) + 10000 + fileSize := int64(4294967296) + 10000 tempFile.Truncate(fileSize) defer func() { tempFile.Close() diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 5b41286ea..d35391f66 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "io" "os" "sync" @@ -41,6 +40,7 @@ type baseNeedleMapper struct { indexFile *os.File indexFileAccessLock sync.Mutex + indexFileOffset int64 } func (nm *baseNeedleMapper) IndexFileSize() uint64 { @@ -56,11 +56,10 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size nm.indexFileAccessLock.Lock() defer nm.indexFileAccessLock.Unlock() - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - nm.indexFile.Name(), err) + written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset) + if err == nil { + nm.indexFileOffset += int64(written) } - _, err := nm.indexFile.Write(bytes) return err } diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 415cd14dd..9716e9729 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -31,6 +31,11 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option generateLevelDbFile(dbFileName, indexFile) glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) } + if stat, err := indexFile.Stat(); err != nil { + glog.Fatalf("stat file %s: %v", indexFile.Name(), err) + } else { + m.indexFileOffset = stat.Size() + } glog.V(1).Infof("Opening %s...", dbFileName) if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index d0891dc98..1b58708c6 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { m: needle_map.NewCompactMap(), } nm.indexFile = file + stat, err := file.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", file.Name(), err) + } + nm.indexFileOffset = stat.Size() return nm } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index c17c9c937..0ee1e61c6 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if err != nil { return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } - dst.Write(needleBytes) + dstDatBackend.Append(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil