package needle import ( "bytes" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) { end, _, e := w.GetStat() if e != nil { err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) return } offset = uint64(end) if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) return } bytesBuffer := buffer_pool.SyncPoolGetBuffer() defer func() { if err != nil { if te := w.Truncate(end); te != nil { // handle error or log } } buffer_pool.SyncPoolPutBuffer(bytesBuffer) }() switch version { case Version1: return writeNeedleV1(w, n, offset, bytesBuffer) case Version2: return writeNeedleV2(w, n, offset, bytesBuffer) case Version3: return writeNeedleV3(w, n, offset, bytesBuffer) default: err = fmt.Errorf("unsupported version: %d", version) return } } func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) { if end, _, e := w.GetStat(); e == nil { defer func(w backend.BackendStorageFile, off int64) { if err != nil { if te := w.Truncate(end); te != nil { glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) } } }(w, end) offset = uint64(end) } else { err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) return } if version == Version3 { tsOffset := NeedleHeaderSize + size + NeedleChecksumSize util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs) } if err == nil { _, err = w.WriteAt(dataSlice, int64(offset)) } return } // prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions. func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) { end, _, e := w.GetStat() if e != nil { err = fmt.Errorf("Cannot Read Current Volume Position: %w", e) return } offset = uint64(end) if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 { err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) return } bytesBuffer = buffer_pool.SyncPoolGetBuffer() cleanup = func(err error) { if err != nil { if te := w.Truncate(end); te != nil { // handle error or log } } buffer_pool.SyncPoolPutBuffer(bytesBuffer) } return }