Browse Source

refactoring

pull/6844/head
chrislu 1 day ago
parent
commit
7439af0eca
  1. 51
      weed/storage/needle/needle_write.go
  2. 25
      weed/storage/needle/needle_write_v1.go
  3. 25
      weed/storage/needle/needle_write_v2.go
  4. 25
      weed/storage/needle/needle_write_v3.go

51
weed/storage/needle/needle_write.go

@ -1,23 +1,44 @@
package needle package needle
import ( import (
"bytes"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
) )
func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) { func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
end, _, e := w.GetStat()
if e != nil {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
offset = uint64(end)
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer func() {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error or log
}
}
buffer_pool.SyncPoolPutBuffer(bytesBuffer)
}()
switch version { switch version {
case Version1: case Version1:
return writeNeedleV1(w, n)
return writeNeedleV1(w, n, offset, bytesBuffer)
case Version2: case Version2:
return writeNeedleV2(w, n)
return writeNeedleV2(w, n, offset, bytesBuffer)
case Version3: case Version3:
return writeNeedleV3(w, n)
return writeNeedleV3(w, n, offset, bytesBuffer)
default: default:
err = fmt.Errorf("unsupported version: %d", version) err = fmt.Errorf("unsupported version: %d", version)
return return
@ -52,3 +73,27 @@ func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size,
return return
} }
// prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions.
func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) {
end, _, e := w.GetStat()
if e != nil {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
offset = uint64(end)
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer = buffer_pool.SyncPoolGetBuffer()
cleanup = func(err error) {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error or log
}
}
buffer_pool.SyncPoolPutBuffer(bytesBuffer)
}
return
}

25
weed/storage/needle/needle_write_v1.go

@ -1,36 +1,15 @@
package needle package needle
import ( import (
"bytes"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
) )
func writeNeedleV1(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error
}
}
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
func writeNeedleV1(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset() bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize) header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie) CookieToBytes(header[0:CookieSize], n.Cookie)

25
weed/storage/needle/needle_write_v2.go

@ -1,37 +1,16 @@
package needle package needle
import ( import (
"bytes"
"fmt" "fmt"
"math" "math"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
) )
func writeNeedleV2(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error
}
}
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
func writeNeedleV2(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset() bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize+TimestampSize) header := make([]byte, NeedleHeaderSize+TimestampSize)
CookieToBytes(header[0:CookieSize], n.Cookie) CookieToBytes(header[0:CookieSize], n.Cookie)

25
weed/storage/needle/needle_write_v3.go

@ -1,37 +1,16 @@
package needle package needle
import ( import (
"bytes"
"fmt" "fmt"
"math" "math"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
) )
func writeNeedleV3(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
// handle error
}
}
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
return
}
if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
return
}
bytesBuffer := buffer_pool.SyncPoolGetBuffer()
defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
func writeNeedleV3(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset() bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize+TimestampSize) header := make([]byte, NeedleHeaderSize+TimestampSize)
CookieToBytes(header[0:CookieSize], n.Cookie) CookieToBytes(header[0:CookieSize], n.Cookie)

Loading…
Cancel
Save