From 7439af0eca7cb0f3914dce970fa542a08e35269a Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 6 Jun 2025 01:35:48 -0700 Subject: [PATCH] refactoring --- weed/storage/needle/needle_write.go | 51 ++++++++++++++++++++++++-- weed/storage/needle/needle_write_v1.go | 25 +------------ weed/storage/needle/needle_write_v2.go | 25 +------------ weed/storage/needle/needle_write_v3.go | 25 +------------ 4 files changed, 54 insertions(+), 72 deletions(-) diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go index 2364d2589..5fe9f6a79 100644 --- a/weed/storage/needle/needle_write.go +++ b/weed/storage/needle/needle_write.go @@ -1,23 +1,44 @@ package needle import ( + "bytes" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) { + end, _, e := w.GetStat() + if e != nil { + err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) + return + } + offset = uint64(end) + if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { + err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) + return + } + bytesBuffer := buffer_pool.SyncPoolGetBuffer() + defer func() { + if err != nil { + if te := w.Truncate(end); te != nil { + // handle error or log + } + } + buffer_pool.SyncPoolPutBuffer(bytesBuffer) + }() switch version { case Version1: - return writeNeedleV1(w, n) + return writeNeedleV1(w, n, offset, bytesBuffer) case Version2: - return writeNeedleV2(w, n) + return writeNeedleV2(w, n, offset, bytesBuffer) case Version3: - return writeNeedleV3(w, n) + return writeNeedleV3(w, n, offset, bytesBuffer) default: err = fmt.Errorf("unsupported version: %d", version) return @@ -52,3 +73,27 @@ func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, return } + +// prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions. +func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) { + end, _, e := w.GetStat() + if e != nil { + err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) + return + } + offset = uint64(end) + if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { + err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) + return + } + bytesBuffer = buffer_pool.SyncPoolGetBuffer() + cleanup = func(err error) { + if err != nil { + if te := w.Truncate(end); te != nil { + // handle error or log + } + } + buffer_pool.SyncPoolPutBuffer(bytesBuffer) + } + return +} diff --git a/weed/storage/needle/needle_write_v1.go b/weed/storage/needle/needle_write_v1.go index a29c925f8..db912ee0f 100644 --- a/weed/storage/needle/needle_write_v1.go +++ b/weed/storage/needle/needle_write_v1.go @@ -1,36 +1,15 @@ package needle import ( + "bytes" "fmt" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) -func writeNeedleV1(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) { - if end, _, e := w.GetStat(); e == nil { - defer func(w backend.BackendStorageFile, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - // handle error - } - } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) - return - } - if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { - err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) - return - } - - bytesBuffer := buffer_pool.SyncPoolGetBuffer() - defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) - +func writeNeedleV1(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) { bytesBuffer.Reset() header := make([]byte, NeedleHeaderSize) CookieToBytes(header[0:CookieSize], n.Cookie) diff --git a/weed/storage/needle/needle_write_v2.go b/weed/storage/needle/needle_write_v2.go index fac50cb4f..d332bc1c2 100644 --- a/weed/storage/needle/needle_write_v2.go +++ b/weed/storage/needle/needle_write_v2.go @@ -1,37 +1,16 @@ package needle import ( + "bytes" "fmt" "math" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) -func writeNeedleV2(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) { - if end, _, e := w.GetStat(); e == nil { - defer func(w backend.BackendStorageFile, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - // handle error - } - } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) - return - } - if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { - err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) - return - } - - bytesBuffer := buffer_pool.SyncPoolGetBuffer() - defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) - +func writeNeedleV2(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) { bytesBuffer.Reset() header := make([]byte, NeedleHeaderSize+TimestampSize) CookieToBytes(header[0:CookieSize], n.Cookie) diff --git a/weed/storage/needle/needle_write_v3.go b/weed/storage/needle/needle_write_v3.go index 5e3ecfb4a..6927a1431 100644 --- a/weed/storage/needle/needle_write_v3.go +++ b/weed/storage/needle/needle_write_v3.go @@ -1,37 +1,16 @@ package needle import ( + "bytes" "fmt" "math" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) -func writeNeedleV3(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) { - if end, _, e := w.GetStat(); e == nil { - defer func(w backend.BackendStorageFile, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - // handle error - } - } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) - return - } - if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { - err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) - return - } - - bytesBuffer := buffer_pool.SyncPoolGetBuffer() - defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) - +func writeNeedleV3(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) { bytesBuffer.Reset() header := make([]byte, NeedleHeaderSize+TimestampSize) CookieToBytes(header[0:CookieSize], n.Cookie)