From 1478d7ea211f4b390d3906e8c773f26074900d1a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 31 Dec 2018 15:08:32 -0800 Subject: [PATCH] reduce file seek when writing --- weed/storage/needle_read_write.go | 30 ++++++++++++-------------- weed/storage/store.go | 2 +- weed/storage/volume_read_write.go | 34 +++++++----------------------- weed/storage/volume_vacuum.go | 6 +++--- weed/storage/volume_vacuum_test.go | 2 +- 5 files changed, 27 insertions(+), 47 deletions(-) diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 34cf71dc2..6441cb501 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -3,7 +3,6 @@ package storage import ( "errors" "fmt" - "io" "os" "github.com/chrislusf/seaweedfs/weed/glog" @@ -28,20 +27,19 @@ func (n *Needle) DiskSize(version Version) int64 { return getActualSize(n.Size, version) } -func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) { - if s, ok := w.(io.Seeker); ok { - if end, e := s.Seek(0, 1); e == nil { - defer func(s io.Seeker, off int64) { - if err != nil { - if _, e = s.Seek(off, 0); e != nil { - glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e) - } +func (n *Needle) Append(w *os.File, version Version) (offset Offset, size uint32, actualSize int64, err error) { + if end, e := w.Seek(0, 2); e == nil { + defer func(w *os.File, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) } - }(s, end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } + } + }(w, end) + offset = Offset(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return } switch version { case Version1: @@ -159,9 +157,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i _, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding]) } - return n.DataSize, getActualSize(n.Size, version), err + return offset, n.DataSize, getActualSize(n.Size, version), err } - return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) + return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) { diff --git a/weed/storage/store.go b/weed/storage/store.go index 8c0299545..7ae5dcee6 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -200,7 +200,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } // TODO: count needle size ahead if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { - size, err = v.writeNeedle(n) + _, size, err = v.writeNeedle(n) } else { err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize()) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index ae8dd1d40..e90e26144 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -74,7 +74,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { return } -func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { +func (v *Volume) writeNeedle(n *Needle) (offset Offset, size uint32, err error) { glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) @@ -87,32 +87,15 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(4).Infof("needle is unchanged!") return } - var offset int64 - if offset, err = v.dataFile.Seek(0, 2); err != nil { - glog.V(0).Infof("failed to seek the end of file: %v", err) - return - } - - //ensure file writing starting from aligned positions - if offset%NeedlePaddingSize != 0 { - offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) - if offset, err = v.dataFile.Seek(offset, 0); err != nil { - glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) - return - } - } n.AppendAtNs = uint64(time.Now().UnixNano()) - if size, _, err = n.Append(v.dataFile, v.Version()); err != nil { - if e := v.dataFile.Truncate(offset); e != nil { - err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) - } + if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { return } nv, ok := v.nm.Get(n.Id) - if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil { + if !ok || Offset(nv.Offset)*NeedlePaddingSize < offset { + if err = v.nm.Put(n.Id, offset/NeedlePaddingSize, n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -133,16 +116,15 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok && nv.Size != TombstoneFileSize { size := nv.Size - offset, err := v.dataFile.Seek(0, 2) + n.Data = nil + n.AppendAtNs = uint64(time.Now().UnixNano()) + offset, _, _, err := n.Append(v.dataFile, v.Version()) if err != nil { return size, err } - if err := v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil { + if err = v.nm.Delete(n.Id, offset/NeedlePaddingSize); err != nil { return size, err } - n.Data = nil - n.AppendAtNs = uint64(time.Now().UnixNano()) - _, _, err = n.Append(v.dataFile, v.Version()) return size, err } return 0, nil diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index fda568126..5e0b19b66 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -215,7 +215,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI fakeDelNeedle.Id = key fakeDelNeedle.Cookie = 0x12345678 fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano()) - _, _, err = fakeDelNeedle.Append(dst, v.Version()) + _, _, _, err = fakeDelNeedle.Append(dst, v.Version()) if err != nil { return fmt.Errorf("append deleted %d failed: %v", key, err) } @@ -269,7 +269,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, err := n.Append(dst, v.Version()); err != nil { + if _, _, _, err := n.Append(dst, v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } new_offset += n.DiskSize(version) @@ -329,7 +329,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, err = n.Append(dst, v.Version()); err != nil { + if _, _, _, err = n.Append(dst, v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } new_offset += n.DiskSize(v.Version()) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index a5c2b2025..0bc24037d 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -122,7 +122,7 @@ func TestCompaction(t *testing.T) { } func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) - size, err := v.writeNeedle(n) + _, size, err := v.writeNeedle(n) if err != nil { t.Fatalf("write file %d: %v", i, err) }