Browse Source

Use filer-side copy for mounted whole-file copy_file_range (#8747)

* Optimize mounted whole-file copy_file_range

* Address mounted copy review feedback

* Harden mounted copy fast path

---------

Co-authored-by: Copilot <copilot@github.com>
pull/8711/merge
Chris Lu 11 hours ago
committed by GitHub
parent
commit
c31e6b4684
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 60
      weed/command/mount_std.go
  2. 17
      weed/filer/copy_params.go
  3. 55
      weed/mount/weedfs.go
  4. 355
      weed/mount/weedfs_file_copy_range.go
  5. 355
      weed/mount/weedfs_file_copy_range_test.go
  6. 4
      weed/server/filer_server.go
  7. 283
      weed/server/filer_server_handlers_copy.go
  8. 223
      weed/server/filer_server_handlers_copy_test.go

60
weed/command/mount_std.go

@ -312,35 +312,37 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
ConcurrentReaders: *option.concurrentReaders,
CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite,
CacheMetaTTlSec: *option.cacheMetaTtlSec,
DataCenter: *option.dataCenter,
Quota: int64(*option.collectionQuota) * 1024 * 1024,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
DisableXAttr: *option.disableXAttr,
IsMacOs: runtime.GOOS == "darwin",
MetadataFlushSeconds: *option.metadataFlushSeconds,
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerSigningKey: security.SigningKey(util.GetViper().GetString("jwt.filer_signing.key")),
FilerSigningExpiresAfterSec: util.GetViper().GetInt("jwt.filer_signing.expires_after_seconds"),
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
ConcurrentReaders: *option.concurrentReaders,
CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite,
CacheMetaTTlSec: *option.cacheMetaTtlSec,
DataCenter: *option.dataCenter,
Quota: int64(*option.collectionQuota) * 1024 * 1024,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
DisableXAttr: *option.disableXAttr,
IsMacOs: runtime.GOOS == "darwin",
MetadataFlushSeconds: *option.metadataFlushSeconds,
// RDMA acceleration options
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,

17
weed/filer/copy_params.go

@ -0,0 +1,17 @@
package filer
const (
CopyQueryParamFrom = "cp.from"
CopyQueryParamOverwrite = "overwrite"
CopyQueryParamDataOnly = "dataOnly"
CopyQueryParamRequestID = "copy.requestId"
CopyQueryParamSourceInode = "copy.srcInode"
CopyQueryParamSourceMtime = "copy.srcMtime"
CopyQueryParamSourceSize = "copy.srcSize"
CopyQueryParamDestinationInode = "copy.dstInode"
CopyQueryParamDestinationMtime = "copy.dstMtime"
CopyQueryParamDestinationSize = "copy.dstSize"
CopyResponseHeaderCommitted = "X-SeaweedFS-Copy-Committed"
CopyResponseHeaderRequestID = "X-SeaweedFS-Copy-Request-ID"
)

55
weed/mount/weedfs.go

@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
@ -30,27 +31,29 @@ import (
)
type Option struct {
filerIndex int32 // align memory for atomic read/write
FilerAddresses []pb.ServerAddress
MountDirectory string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
ConcurrentReaders int
CacheDirForRead string
CacheSizeMBForRead int64
CacheDirForWrite string
CacheMetaTTlSec int
DataCenter string
Umask os.FileMode
Quota int64
DisableXAttr bool
IsMacOs bool
filerIndex int32 // align memory for atomic read/write
FilerAddresses []pb.ServerAddress
MountDirectory string
GrpcDialOption grpc.DialOption
FilerSigningKey security.SigningKey
FilerSigningExpiresAfterSec int
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
ConcurrentReaders int
CacheDirForRead string
CacheSizeMBForRead int64
CacheDirForWrite string
CacheMetaTTlSec int
DataCenter string
Umask os.FileMode
Quota int64
DisableXAttr bool
IsMacOs bool
MountUid uint32
MountGid uint32
@ -173,11 +176,11 @@ func NewSeaweedFileSystem(option *Option) *WFS {
dhMap: NewDirectoryHandleToInode(),
filerClient: filerClient, // nil for proxy mode, initialized for direct access
pendingAsyncFlush: make(map[uint64]chan struct{}),
fhLockTable: util.NewLockTable[FileHandleId](),
refreshingDirs: make(map[util.FullPath]struct{}),
dirHotWindow: dirHotWindow,
dirHotThreshold: dirHotThreshold,
dirIdleEvict: dirIdleEvict,
fhLockTable: util.NewLockTable[FileHandleId](),
refreshingDirs: make(map[util.FullPath]struct{}),
dirHotWindow: dirHotWindow,
dirHotThreshold: dirHotThreshold,
dirIdleEvict: dirIdleEvict,
}
wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses)))

355
weed/mount/weedfs_file_copy_range.go

@ -1,15 +1,63 @@
package mount
import (
"bytes"
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"time"
"github.com/seaweedfs/go-fuse/v2/fuse"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
request_id "github.com/seaweedfs/seaweedfs/weed/util/request_id"
)
type serverSideWholeFileCopyOutcome uint8
const (
serverSideWholeFileCopyNotCommitted serverSideWholeFileCopyOutcome = iota
serverSideWholeFileCopyCommitted
serverSideWholeFileCopyAmbiguous
)
type wholeFileServerCopyRequest struct {
srcPath util.FullPath
dstPath util.FullPath
sourceSize int64
srcInode uint64
srcMtime int64
dstInode uint64
dstMtime int64
dstSize int64
sourceMime string
sourceMd5 []byte
copyRequestID string
}
// performServerSideWholeFileCopy is a package-level seam so tests can override
// the filer call without standing up an HTTP endpoint.
var performServerSideWholeFileCopy = func(cancel <-chan struct{}, wfs *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
return wfs.copyEntryViaFiler(cancel, copyRequest)
}
// filerCopyRequestTimeout bounds the mount->filer POST so a stalled copy does
// not block copy_file_range workers indefinitely.
const filerCopyRequestTimeout = 60 * time.Second
// filerCopyReadbackTimeout gives the follow-up metadata reload a fresh deadline
// after the filer already accepted the copy request.
const filerCopyReadbackTimeout = 15 * time.Second
// CopyFileRange copies data from one file to another from and to specified offsets.
//
// See https://man7.org/linux/man-pages/man2/copy_file_range.2.html
@ -70,6 +118,10 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
in.OffOut, in.OffOut+in.Len,
)
if written, handled, status := wfs.tryServerSideWholeFileCopy(cancel, in, fhIn, fhOut); handled {
return written, status
}
// Concurrent copy operations could allocate too much memory, so we want to
// throttle our concurrency, scaling with the number of writers the mount
// was configured with.
@ -155,3 +207,306 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
written = uint32(totalCopied)
return written, fuse.OK
}
func (wfs *WFS) tryServerSideWholeFileCopy(cancel <-chan struct{}, in *fuse.CopyFileRangeIn, fhIn, fhOut *FileHandle) (written uint32, handled bool, code fuse.Status) {
copyRequest, ok := wholeFileServerCopyCandidate(fhIn, fhOut, in)
if !ok {
return 0, false, fuse.OK
}
glog.V(1).Infof("CopyFileRange server-side copy %s => %s (%d bytes)", copyRequest.srcPath, copyRequest.dstPath, copyRequest.sourceSize)
entry, outcome, err := performServerSideWholeFileCopy(cancel, wfs, copyRequest)
switch outcome {
case serverSideWholeFileCopyCommitted:
if err != nil {
glog.Warningf("CopyFileRange server-side copy %s => %s committed but local refresh failed: %v", copyRequest.srcPath, copyRequest.dstPath, err)
} else {
glog.V(1).Infof("CopyFileRange server-side copy %s => %s completed (%d bytes)", copyRequest.srcPath, copyRequest.dstPath, copyRequest.sourceSize)
}
wfs.applyServerSideWholeFileCopyResult(fhIn, fhOut, copyRequest.dstPath, entry, copyRequest.sourceSize)
return uint32(copyRequest.sourceSize), true, fuse.OK
case serverSideWholeFileCopyAmbiguous:
glog.Warningf("CopyFileRange server-side copy %s => %s outcome ambiguous: %v", copyRequest.srcPath, copyRequest.dstPath, err)
return 0, true, fuse.EIO
default:
glog.V(0).Infof("CopyFileRange server-side copy %s => %s fallback to chunk copy: %v", copyRequest.srcPath, copyRequest.dstPath, err)
return 0, false, fuse.OK
}
}
func (wfs *WFS) applyServerSideWholeFileCopyResult(fhIn, fhOut *FileHandle, dstPath util.FullPath, entry *filer_pb.Entry, sourceSize int64) {
if entry == nil {
entry = synthesizeLocalEntryForServerSideWholeFileCopy(fhIn, fhOut, sourceSize)
}
if entry == nil {
glog.Warningf("CopyFileRange server-side copy %s left no local entry to apply", dstPath)
return
}
fhOut.SetEntry(entry)
fhOut.RememberPath(dstPath)
if entry.Attributes != nil {
fhOut.contentType = entry.Attributes.Mime
}
fhOut.dirtyMetadata = false
wfs.updateServerSideWholeFileCopyMetaCache(dstPath, entry)
wfs.invalidateCopyDestinationCache(fhOut.inode, dstPath)
}
func (wfs *WFS) updateServerSideWholeFileCopyMetaCache(dstPath util.FullPath, entry *filer_pb.Entry) {
if wfs.metaCache == nil || entry == nil {
return
}
dir, _ := dstPath.DirAndName()
event := metadataUpdateEvent(dir, entry)
if applyErr := wfs.applyLocalMetadataEvent(context.Background(), event); applyErr != nil {
glog.Warningf("CopyFileRange metadata update %s: %v", dstPath, applyErr)
wfs.markDirectoryReadThrough(util.FullPath(dir))
}
}
func synthesizeLocalEntryForServerSideWholeFileCopy(fhIn, fhOut *FileHandle, sourceSize int64) *filer_pb.Entry {
dstEntry := fhOut.GetEntry().GetEntry()
if dstEntry == nil {
return nil
}
localEntry := proto.Clone(dstEntry).(*filer_pb.Entry)
if localEntry.Attributes == nil {
localEntry.Attributes = &filer_pb.FuseAttributes{}
}
if srcEntry := fhIn.GetEntry().GetEntry(); srcEntry != nil {
srcEntryCopy := proto.Clone(srcEntry).(*filer_pb.Entry)
localEntry.Content = srcEntryCopy.Content
localEntry.Chunks = srcEntryCopy.Chunks
if srcEntryCopy.Attributes != nil {
localEntry.Attributes.Mime = srcEntryCopy.Attributes.Mime
localEntry.Attributes.Md5 = srcEntryCopy.Attributes.Md5
}
}
localEntry.Attributes.FileSize = uint64(sourceSize)
localEntry.Attributes.Mtime = time.Now().Unix()
return localEntry
}
func wholeFileServerCopyCandidate(fhIn, fhOut *FileHandle, in *fuse.CopyFileRangeIn) (copyRequest wholeFileServerCopyRequest, ok bool) {
if fhIn == nil || fhOut == nil || in == nil {
glog.V(4).Infof("server-side copy: skipped (nil handle or input)")
return wholeFileServerCopyRequest{}, false
}
if fhIn.fh == fhOut.fh {
glog.V(4).Infof("server-side copy: skipped (same file handle)")
return wholeFileServerCopyRequest{}, false
}
if fhIn.dirtyMetadata || fhOut.dirtyMetadata {
glog.V(4).Infof("server-side copy: skipped (dirty metadata: in=%v out=%v)", fhIn.dirtyMetadata, fhOut.dirtyMetadata)
return wholeFileServerCopyRequest{}, false
}
if in.OffIn != 0 || in.OffOut != 0 {
glog.V(4).Infof("server-side copy: skipped (non-zero offsets: in=%d out=%d)", in.OffIn, in.OffOut)
return wholeFileServerCopyRequest{}, false
}
srcEntry := fhIn.GetEntry()
dstEntry := fhOut.GetEntry()
if srcEntry == nil || dstEntry == nil {
glog.V(4).Infof("server-side copy: skipped (nil entry: src=%v dst=%v)", srcEntry == nil, dstEntry == nil)
return wholeFileServerCopyRequest{}, false
}
if srcEntry.IsDirectory || dstEntry.IsDirectory {
glog.V(4).Infof("server-side copy: skipped (directory)")
return wholeFileServerCopyRequest{}, false
}
srcPbEntry := srcEntry.GetEntry()
dstPbEntry := dstEntry.GetEntry()
if srcPbEntry == nil || dstPbEntry == nil || srcPbEntry.Attributes == nil || dstPbEntry.Attributes == nil {
glog.V(4).Infof("server-side copy: skipped (missing entry attributes)")
return wholeFileServerCopyRequest{}, false
}
sourceSize := int64(filer.FileSize(srcPbEntry))
// go-fuse exposes CopyFileRange's return value as uint32, so the fast path
// should only claim copies that can be reported without truncation.
if sourceSize <= 0 || sourceSize > math.MaxUint32 || int64(in.Len) < sourceSize {
glog.V(4).Infof("server-side copy: skipped (size mismatch: sourceSize=%d len=%d)", sourceSize, in.Len)
return wholeFileServerCopyRequest{}, false
}
dstSize := int64(filer.FileSize(dstPbEntry))
if dstSize != 0 || len(dstPbEntry.GetChunks()) > 0 || len(dstPbEntry.Content) > 0 {
glog.V(4).Infof("server-side copy: skipped (destination not empty)")
return wholeFileServerCopyRequest{}, false
}
srcPath := fhIn.FullPath()
dstPath := fhOut.FullPath()
if srcPath == "" || dstPath == "" || srcPath == dstPath {
glog.V(4).Infof("server-side copy: skipped (invalid paths: src=%q dst=%q)", srcPath, dstPath)
return wholeFileServerCopyRequest{}, false
}
if srcPbEntry.Attributes.Inode == 0 || dstPbEntry.Attributes.Inode == 0 {
glog.V(4).Infof("server-side copy: skipped (missing inode preconditions: src=%d dst=%d)", srcPbEntry.Attributes.Inode, dstPbEntry.Attributes.Inode)
return wholeFileServerCopyRequest{}, false
}
return wholeFileServerCopyRequest{
srcPath: srcPath,
dstPath: dstPath,
sourceSize: sourceSize,
srcInode: srcPbEntry.Attributes.Inode,
srcMtime: srcPbEntry.Attributes.Mtime,
dstInode: dstPbEntry.Attributes.Inode,
dstMtime: dstPbEntry.Attributes.Mtime,
dstSize: dstSize,
sourceMime: srcPbEntry.Attributes.Mime,
sourceMd5: append([]byte(nil), srcPbEntry.Attributes.Md5...),
copyRequestID: request_id.New(),
}, true
}
func (wfs *WFS) copyEntryViaFiler(cancel <-chan struct{}, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
baseCtx, baseCancel := context.WithCancel(context.Background())
defer baseCancel()
if cancel != nil {
go func() {
select {
case <-cancel:
baseCancel()
case <-baseCtx.Done():
}
}()
}
postCtx, postCancel := context.WithTimeout(baseCtx, filerCopyRequestTimeout)
defer postCancel()
httpClient := util_http.GetGlobalHttpClient()
if httpClient == nil {
var err error
httpClient, err = util_http.NewGlobalHttpClient()
if err != nil {
return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("create filer copy http client: %w", err)
}
}
copyURL := &url.URL{
Scheme: httpClient.GetHttpScheme(),
Host: wfs.getCurrentFiler().ToHttpAddress(),
Path: string(copyRequest.dstPath),
}
query := copyURL.Query()
query.Set(filer.CopyQueryParamFrom, string(copyRequest.srcPath))
query.Set(filer.CopyQueryParamOverwrite, "true")
query.Set(filer.CopyQueryParamDataOnly, "true")
query.Set(filer.CopyQueryParamRequestID, copyRequest.copyRequestID)
query.Set(filer.CopyQueryParamSourceInode, fmt.Sprintf("%d", copyRequest.srcInode))
query.Set(filer.CopyQueryParamSourceMtime, fmt.Sprintf("%d", copyRequest.srcMtime))
query.Set(filer.CopyQueryParamSourceSize, fmt.Sprintf("%d", copyRequest.sourceSize))
query.Set(filer.CopyQueryParamDestinationInode, fmt.Sprintf("%d", copyRequest.dstInode))
query.Set(filer.CopyQueryParamDestinationMtime, fmt.Sprintf("%d", copyRequest.dstMtime))
query.Set(filer.CopyQueryParamDestinationSize, fmt.Sprintf("%d", copyRequest.dstSize))
copyURL.RawQuery = query.Encode()
req, err := http.NewRequestWithContext(postCtx, http.MethodPost, copyURL.String(), nil)
if err != nil {
return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("create filer copy request: %w", err)
}
if jwt := wfs.filerCopyJWT(); jwt != "" {
req.Header.Set("Authorization", "Bearer "+string(jwt))
}
resp, err := httpClient.Do(req)
if err != nil {
return wfs.confirmServerSideWholeFileCopyAfterAmbiguousRequest(baseCtx, copyRequest, fmt.Errorf("execute filer copy request: %w", err))
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("filer copy %s => %s failed: status %d: %s", copyRequest.srcPath, copyRequest.dstPath, resp.StatusCode, string(body))
}
readbackCtx, readbackCancel := context.WithTimeout(baseCtx, filerCopyReadbackTimeout)
defer readbackCancel()
entry, err := filer_pb.GetEntry(readbackCtx, wfs, copyRequest.dstPath)
if err != nil {
return nil, serverSideWholeFileCopyCommitted, fmt.Errorf("reload copied entry %s: %w", copyRequest.dstPath, err)
}
if entry == nil {
return nil, serverSideWholeFileCopyCommitted, fmt.Errorf("reload copied entry %s: not found", copyRequest.dstPath)
}
if entry.Attributes != nil && wfs.option != nil && wfs.option.UidGidMapper != nil {
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
return entry, serverSideWholeFileCopyCommitted, nil
}
func (wfs *WFS) confirmServerSideWholeFileCopyAfterAmbiguousRequest(baseCtx context.Context, copyRequest wholeFileServerCopyRequest, requestErr error) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
readbackCtx, readbackCancel := context.WithTimeout(baseCtx, filerCopyReadbackTimeout)
defer readbackCancel()
entry, err := filer_pb.GetEntry(readbackCtx, wfs, copyRequest.dstPath)
if err == nil && entry != nil && entryMatchesServerSideWholeFileCopy(copyRequest, entry) {
if entry.Attributes != nil && wfs.option != nil && wfs.option.UidGidMapper != nil {
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
return entry, serverSideWholeFileCopyCommitted, nil
}
if err != nil {
return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; post-copy readback failed: %v", requestErr, err)
}
if entry == nil {
return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; destination %s was not readable after the ambiguous request", requestErr, copyRequest.dstPath)
}
return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; destination %s did not match the requested copy after the ambiguous request", requestErr, copyRequest.dstPath)
}
func entryMatchesServerSideWholeFileCopy(copyRequest wholeFileServerCopyRequest, entry *filer_pb.Entry) bool {
if entry == nil || entry.Attributes == nil {
return false
}
if copyRequest.dstInode != 0 && entry.Attributes.Inode != copyRequest.dstInode {
return false
}
if entry.Attributes.FileSize != uint64(copyRequest.sourceSize) {
return false
}
if copyRequest.sourceMime != "" && entry.Attributes.Mime != copyRequest.sourceMime {
return false
}
if len(copyRequest.sourceMd5) > 0 && !bytes.Equal(entry.Attributes.Md5, copyRequest.sourceMd5) {
return false
}
return true
}
func (wfs *WFS) filerCopyJWT() security.EncodedJwt {
if wfs.option == nil || len(wfs.option.FilerSigningKey) == 0 {
return ""
}
return security.GenJwtForFilerServer(wfs.option.FilerSigningKey, wfs.option.FilerSigningExpiresAfterSec)
}
func (wfs *WFS) invalidateCopyDestinationCache(inode uint64, fullPath util.FullPath) {
if wfs.fuseServer != nil {
if status := wfs.fuseServer.InodeNotify(inode, 0, -1); status != fuse.OK {
glog.V(4).Infof("CopyFileRange invalidate inode %d: %v", inode, status)
}
dir, name := fullPath.DirAndName()
if parentInode, found := wfs.inodeToPath.GetInode(util.FullPath(dir)); found {
if status := wfs.fuseServer.EntryNotify(parentInode, name); status != fuse.OK {
glog.V(4).Infof("CopyFileRange invalidate entry %s: %v", fullPath, status)
}
}
}
}

355
weed/mount/weedfs_file_copy_range_test.go

@ -0,0 +1,355 @@
package mount
import (
"context"
"errors"
"path/filepath"
"testing"
"github.com/seaweedfs/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestWholeFileServerCopyCandidate(t *testing.T) {
wfs := newCopyRangeTestWFS()
srcPath := util.FullPath("/src.txt")
dstPath := util.FullPath("/dst.txt")
srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true)
dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true)
srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{
Name: "src.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
FileSize: 5,
Inode: srcInode,
},
Content: []byte("hello"),
})
dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{
Name: "dst.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
Inode: dstInode,
},
})
srcHandle.RememberPath(srcPath)
dstHandle.RememberPath(dstPath)
in := &fuse.CopyFileRangeIn{
FhIn: uint64(srcHandle.fh),
FhOut: uint64(dstHandle.fh),
OffIn: 0,
OffOut: 0,
Len: 8,
}
copyRequest, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in)
if !ok {
t.Fatal("expected whole-file server copy candidate")
}
if copyRequest.srcPath != srcPath {
t.Fatalf("source path = %q, want %q", copyRequest.srcPath, srcPath)
}
if copyRequest.dstPath != dstPath {
t.Fatalf("destination path = %q, want %q", copyRequest.dstPath, dstPath)
}
if copyRequest.sourceSize != 5 {
t.Fatalf("source size = %d, want 5", copyRequest.sourceSize)
}
if copyRequest.srcInode == 0 || copyRequest.dstInode == 0 {
t.Fatalf("expected inode preconditions, got src=%d dst=%d", copyRequest.srcInode, copyRequest.dstInode)
}
srcHandle.dirtyMetadata = true
if _, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in); ok {
t.Fatal("dirty source handle should disable whole-file server copy")
}
srcHandle.dirtyMetadata = false
in.Len = 4
if _, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in); ok {
t.Fatal("short copy request should disable whole-file server copy")
}
}
func TestCopyFileRangeUsesServerSideWholeFileCopy(t *testing.T) {
wfs := newCopyRangeTestWFS()
srcPath := util.FullPath("/src.txt")
dstPath := util.FullPath("/dst.txt")
srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true)
dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true)
srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{
Name: "src.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
FileSize: 5,
Inode: srcInode,
},
Content: []byte("hello"),
})
dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{
Name: "dst.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
Inode: dstInode,
},
})
srcHandle.RememberPath(srcPath)
dstHandle.RememberPath(dstPath)
originalCopy := performServerSideWholeFileCopy
defer func() {
performServerSideWholeFileCopy = originalCopy
}()
var called bool
performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
called = true
if gotWFS != wfs {
t.Fatalf("wfs = %p, want %p", gotWFS, wfs)
}
if copyRequest.srcPath != srcPath {
t.Fatalf("source path = %q, want %q", copyRequest.srcPath, srcPath)
}
if copyRequest.dstPath != dstPath {
t.Fatalf("destination path = %q, want %q", copyRequest.dstPath, dstPath)
}
return &filer_pb.Entry{
Name: "dst.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
FileSize: 5,
Mime: "text/plain; charset=utf-8",
},
Content: []byte("hello"),
}, serverSideWholeFileCopyCommitted, nil
}
written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{
FhIn: uint64(srcHandle.fh),
FhOut: uint64(dstHandle.fh),
OffIn: 0,
OffOut: 0,
Len: 8,
})
if status != fuse.OK {
t.Fatalf("CopyFileRange status = %v, want OK", status)
}
if written != 5 {
t.Fatalf("CopyFileRange wrote %d bytes, want 5", written)
}
if !called {
t.Fatal("expected server-side whole-file copy path to be used")
}
gotEntry := dstHandle.GetEntry().GetEntry()
if gotEntry.Attributes == nil || gotEntry.Attributes.FileSize != 5 {
t.Fatalf("destination size = %v, want 5", gotEntry.GetAttributes().GetFileSize())
}
if string(gotEntry.Content) != "hello" {
t.Fatalf("destination content = %q, want %q", string(gotEntry.Content), "hello")
}
if dstHandle.dirtyMetadata {
t.Fatal("server-side whole-file copy should leave destination handle clean")
}
}
func TestCopyFileRangeDoesNotFallbackAfterCommittedServerCopyRefreshFailure(t *testing.T) {
wfs := newCopyRangeTestWFSWithMetaCache(t)
srcPath := util.FullPath("/src.txt")
dstPath := util.FullPath("/dst.txt")
srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true)
dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true)
srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{
Name: "src.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
FileSize: 5,
Mime: "text/plain; charset=utf-8",
Md5: []byte("abcde"),
Inode: srcInode,
},
Content: []byte("hello"),
})
dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{
Name: "dst.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100600,
Inode: dstInode,
},
})
srcHandle.RememberPath(srcPath)
dstHandle.RememberPath(dstPath)
originalCopy := performServerSideWholeFileCopy
defer func() {
performServerSideWholeFileCopy = originalCopy
}()
performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
if gotWFS != wfs || copyRequest.srcPath != srcPath || copyRequest.dstPath != dstPath {
t.Fatalf("unexpected server-side copy call: wfs=%p src=%q dst=%q", gotWFS, copyRequest.srcPath, copyRequest.dstPath)
}
return nil, serverSideWholeFileCopyCommitted, errors.New("reload copied entry: transient filer read failure")
}
written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{
FhIn: uint64(srcHandle.fh),
FhOut: uint64(dstHandle.fh),
OffIn: 0,
OffOut: 0,
Len: 8,
})
if status != fuse.OK {
t.Fatalf("CopyFileRange status = %v, want OK", status)
}
if written != 5 {
t.Fatalf("CopyFileRange wrote %d bytes, want 5", written)
}
if dstHandle.dirtyMetadata {
t.Fatal("committed server-side copy should not fall back to dirty-page copy")
}
gotEntry := dstHandle.GetEntry().GetEntry()
if gotEntry.GetAttributes().GetFileSize() != 5 {
t.Fatalf("destination size = %d, want 5", gotEntry.GetAttributes().GetFileSize())
}
if gotEntry.GetAttributes().GetFileMode() != 0100600 {
t.Fatalf("destination mode = %#o, want %#o", gotEntry.GetAttributes().GetFileMode(), uint32(0100600))
}
if string(gotEntry.GetContent()) != "hello" {
t.Fatalf("destination content = %q, want %q", string(gotEntry.GetContent()), "hello")
}
cachedEntry, err := wfs.metaCache.FindEntry(context.Background(), dstPath)
if err != nil {
t.Fatalf("metaCache find entry: %v", err)
}
if cachedEntry.FileSize != 5 {
t.Fatalf("metaCache destination size = %d, want 5", cachedEntry.FileSize)
}
if cachedEntry.Mime != "text/plain; charset=utf-8" {
t.Fatalf("metaCache destination mime = %q, want %q", cachedEntry.Mime, "text/plain; charset=utf-8")
}
}
func TestCopyFileRangeReturnsEIOForAmbiguousServerSideCopy(t *testing.T) {
wfs := newCopyRangeTestWFS()
srcPath := util.FullPath("/src.txt")
dstPath := util.FullPath("/dst.txt")
srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true)
dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true)
srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{
Name: "src.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100644,
FileSize: 5,
Inode: srcInode,
},
Content: []byte("hello"),
})
dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{
Name: "dst.txt",
Attributes: &filer_pb.FuseAttributes{
FileMode: 0100600,
Inode: dstInode,
},
})
srcHandle.RememberPath(srcPath)
dstHandle.RememberPath(dstPath)
originalCopy := performServerSideWholeFileCopy
defer func() {
performServerSideWholeFileCopy = originalCopy
}()
performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) {
if gotWFS != wfs || copyRequest.srcPath != srcPath || copyRequest.dstPath != dstPath {
t.Fatalf("unexpected server-side copy call: wfs=%p src=%q dst=%q", gotWFS, copyRequest.srcPath, copyRequest.dstPath)
}
return nil, serverSideWholeFileCopyAmbiguous, errors.New("transport timeout after request dispatch")
}
written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{
FhIn: uint64(srcHandle.fh),
FhOut: uint64(dstHandle.fh),
OffIn: 0,
OffOut: 0,
Len: 8,
})
if status != fuse.EIO {
t.Fatalf("CopyFileRange status = %v, want EIO", status)
}
if written != 0 {
t.Fatalf("CopyFileRange wrote %d bytes, want 0", written)
}
if dstHandle.dirtyMetadata {
t.Fatal("ambiguous server-side copy should not fall back to dirty-page copy")
}
if dstHandle.GetEntry().GetEntry().GetAttributes().GetFileSize() != 0 {
t.Fatalf("destination size = %d, want 0", dstHandle.GetEntry().GetEntry().GetAttributes().GetFileSize())
}
}
func newCopyRangeTestWFS() *WFS {
wfs := &WFS{
option: &Option{
ChunkSizeLimit: 1024,
ConcurrentReaders: 1,
VolumeServerAccess: "filerProxy",
FilerAddresses: []pb.ServerAddress{"127.0.0.1:8888"},
},
inodeToPath: NewInodeToPath(util.FullPath("/"), 0),
fhMap: NewFileHandleToInode(),
fhLockTable: util.NewLockTable[FileHandleId](),
}
wfs.copyBufferPool.New = func() any {
return make([]byte, 1024)
}
return wfs
}
func newCopyRangeTestWFSWithMetaCache(t *testing.T) *WFS {
t.Helper()
wfs := newCopyRangeTestWFS()
root := util.FullPath("/")
wfs.inodeToPath.MarkChildrenCached(root)
uidGidMapper, err := meta_cache.NewUidGidMapper("", "")
if err != nil {
t.Fatalf("create uid/gid mapper: %v", err)
}
wfs.metaCache = meta_cache.NewMetaCache(
filepath.Join(t.TempDir(), "meta"),
uidGidMapper,
root,
func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
},
func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
},
func(util.FullPath, *filer_pb.Entry) {},
nil,
)
t.Cleanup(func() {
wfs.metaCache.Shutdown()
})
return wfs
}

4
weed/server/filer_server.go

@ -115,6 +115,9 @@ type FilerServer struct {
// deduplicates concurrent remote object caching operations
remoteCacheGroup singleflight.Group
recentCopyRequestsMu sync.Mutex
recentCopyRequests map[string]recentCopyRequest
// credential manager for IAM operations
CredentialManager *credential.CredentialManager
}
@ -153,6 +156,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
knownListeners: make(map[int32]int32),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
recentCopyRequests: make(map[string]recentCopyRequest),
CredentialManager: option.CredentialManager,
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)

283
weed/server/filer_server_handlers_copy.go

@ -6,6 +6,8 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@ -19,9 +21,28 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
const recentCopyRequestTTL = 2 * time.Minute
type copyRequestPreconditions struct {
requestID string
srcInode *uint64
srcMtime *int64
srcSize *int64
dstInode *uint64
dstMtime *int64
dstSize *int64
}
type recentCopyRequest struct {
fingerprint string
expiresAt time.Time
}
func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
src := r.URL.Query().Get("cp.from")
src := r.URL.Query().Get(filer.CopyQueryParamFrom)
dst := r.URL.Path
overwrite := r.URL.Query().Get(filer.CopyQueryParamOverwrite) == "true"
dataOnly := r.URL.Query().Get(filer.CopyQueryParamDataOnly) == "true"
glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst)
@ -84,15 +105,54 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.
finalDstPath = util.FullPath(newDir).Child(newName)
}
if srcPath == finalDstPath {
glog.V(2).InfofCtx(ctx, "FilerServer.copy rejected self-copy for %s", finalDstPath)
err = fmt.Errorf("source and destination are the same path: %s; choose a different destination file name or directory", finalDstPath)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
preconditions, err := parseCopyRequestPreconditions(r.URL.Query())
if err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
copyFingerprint := preconditions.fingerprint(srcPath, finalDstPath)
if handled, recentErr := fs.handleRecentCopyRequest(preconditions.requestID, copyFingerprint); handled || recentErr != nil {
if recentErr != nil {
writeJsonError(w, r, http.StatusConflict, recentErr)
return
}
setCopyResponseHeaders(w, preconditions.requestID)
w.WriteHeader(http.StatusNoContent)
return
}
if err = validateCopySourcePreconditions(preconditions, srcEntry); err != nil {
writeJsonError(w, r, http.StatusPreconditionFailed, err)
return
}
// Check if destination file already exists
// TODO: add an overwrite parameter to allow overwriting
var existingDstEntry *filer.Entry
if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound {
err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
} else if dstEntry != nil {
err = fmt.Errorf("destination file %s already exists", finalDstPath)
writeJsonError(w, r, http.StatusConflict, err)
existingDstEntry = dstEntry
if dstEntry.IsDirectory() {
err = fmt.Errorf("destination file %s is a directory", finalDstPath)
writeJsonError(w, r, http.StatusConflict, err)
return
}
if !overwrite {
err = fmt.Errorf("destination file %s already exists", finalDstPath)
writeJsonError(w, r, http.StatusConflict, err)
return
}
}
if err = validateCopyDestinationPreconditions(preconditions, existingDstEntry); err != nil {
writeJsonError(w, r, http.StatusPreconditionFailed, err)
return
}
@ -103,8 +163,13 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
if dataOnly && existingDstEntry != nil {
preserveDestinationMetadataForDataCopy(existingDstEntry, newEntry)
}
if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
// Pass o_excl = !overwrite so the default copy refuses to replace an
// existing destination, while overwrite=true updates the pre-created target.
if createErr := fs.filer.CreateEntry(ctx, newEntry, !overwrite, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@ -112,11 +177,15 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.
glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath)
fs.rememberRecentCopyRequest(preconditions.requestID, copyFingerprint)
setCopyResponseHeaders(w, preconditions.requestID)
w.WriteHeader(http.StatusNoContent)
}
// copyEntry creates a new entry with copied content and chunks
func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) {
now := time.Now()
// Create the base entry structure
// Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
// This creates an independent copy rather than another hard link to the same content
@ -126,6 +195,8 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst
Attr: func(a filer.Attr) filer.Attr {
a.GroupNames = append([]string(nil), a.GroupNames...)
a.Md5 = append([]byte(nil), a.Md5...)
a.Crtime = now
a.Mtime = now
return a
}(srcEntry.Attr),
Quota: srcEntry.Quota,
@ -201,6 +272,208 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst
return newEntry, nil
}
func preserveDestinationMetadataForDataCopy(dstEntry, copiedEntry *filer.Entry) {
if dstEntry == nil || copiedEntry == nil {
return
}
copiedEntry.Mode = dstEntry.Mode
copiedEntry.Uid = dstEntry.Uid
copiedEntry.Gid = dstEntry.Gid
copiedEntry.TtlSec = dstEntry.TtlSec
copiedEntry.UserName = dstEntry.UserName
copiedEntry.GroupNames = append([]string(nil), dstEntry.GroupNames...)
copiedEntry.SymlinkTarget = dstEntry.SymlinkTarget
copiedEntry.Rdev = dstEntry.Rdev
copiedEntry.Crtime = dstEntry.Crtime
copiedEntry.Inode = dstEntry.Inode
copiedEntry.Mtime = time.Now()
copiedEntry.Extended = cloneEntryExtended(dstEntry.Extended)
copiedEntry.Remote = cloneRemoteEntry(dstEntry.Remote)
copiedEntry.Quota = dstEntry.Quota
copiedEntry.WORMEnforcedAtTsNs = dstEntry.WORMEnforcedAtTsNs
copiedEntry.HardLinkId = append(filer.HardLinkId(nil), dstEntry.HardLinkId...)
copiedEntry.HardLinkCounter = dstEntry.HardLinkCounter
}
func cloneEntryExtended(extended map[string][]byte) map[string][]byte {
if extended == nil {
return nil
}
cloned := make(map[string][]byte, len(extended))
for k, v := range extended {
cloned[k] = append([]byte(nil), v...)
}
return cloned
}
func cloneRemoteEntry(remote *filer_pb.RemoteEntry) *filer_pb.RemoteEntry {
if remote == nil {
return nil
}
return proto.Clone(remote).(*filer_pb.RemoteEntry)
}
func parseCopyRequestPreconditions(values url.Values) (copyRequestPreconditions, error) {
var preconditions copyRequestPreconditions
var err error
preconditions.requestID = values.Get(filer.CopyQueryParamRequestID)
if preconditions.srcInode, err = parseOptionalUint64(values, filer.CopyQueryParamSourceInode); err != nil {
return copyRequestPreconditions{}, err
}
if preconditions.srcMtime, err = parseOptionalInt64(values, filer.CopyQueryParamSourceMtime); err != nil {
return copyRequestPreconditions{}, err
}
if preconditions.srcSize, err = parseOptionalInt64(values, filer.CopyQueryParamSourceSize); err != nil {
return copyRequestPreconditions{}, err
}
if preconditions.dstInode, err = parseOptionalUint64(values, filer.CopyQueryParamDestinationInode); err != nil {
return copyRequestPreconditions{}, err
}
if preconditions.dstMtime, err = parseOptionalInt64(values, filer.CopyQueryParamDestinationMtime); err != nil {
return copyRequestPreconditions{}, err
}
if preconditions.dstSize, err = parseOptionalInt64(values, filer.CopyQueryParamDestinationSize); err != nil {
return copyRequestPreconditions{}, err
}
return preconditions, nil
}
func parseOptionalUint64(values url.Values, key string) (*uint64, error) {
raw := values.Get(key)
if raw == "" {
return nil, nil
}
value, err := strconv.ParseUint(raw, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid %s: %w", key, err)
}
return &value, nil
}
func parseOptionalInt64(values url.Values, key string) (*int64, error) {
raw := values.Get(key)
if raw == "" {
return nil, nil
}
value, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid %s: %w", key, err)
}
return &value, nil
}
func validateCopySourcePreconditions(preconditions copyRequestPreconditions, srcEntry *filer.Entry) error {
if srcEntry == nil {
return fmt.Errorf("source entry disappeared")
}
if preconditions.srcInode != nil && srcEntry.Inode != *preconditions.srcInode {
return fmt.Errorf("source inode changed from %d to %d", *preconditions.srcInode, srcEntry.Inode)
}
if preconditions.srcMtime != nil && srcEntry.Mtime.Unix() != *preconditions.srcMtime {
return fmt.Errorf("source mtime changed from %d to %d", *preconditions.srcMtime, srcEntry.Mtime.Unix())
}
if preconditions.srcSize != nil && int64(srcEntry.Size()) != *preconditions.srcSize {
return fmt.Errorf("source size changed from %d to %d", *preconditions.srcSize, srcEntry.Size())
}
return nil
}
func validateCopyDestinationPreconditions(preconditions copyRequestPreconditions, dstEntry *filer.Entry) error {
if preconditions.dstInode == nil && preconditions.dstMtime == nil && preconditions.dstSize == nil {
return nil
}
if dstEntry == nil {
return fmt.Errorf("destination entry disappeared")
}
if preconditions.dstInode != nil && dstEntry.Inode != *preconditions.dstInode {
return fmt.Errorf("destination inode changed from %d to %d", *preconditions.dstInode, dstEntry.Inode)
}
if preconditions.dstMtime != nil && dstEntry.Mtime.Unix() != *preconditions.dstMtime {
return fmt.Errorf("destination mtime changed from %d to %d", *preconditions.dstMtime, dstEntry.Mtime.Unix())
}
if preconditions.dstSize != nil && int64(dstEntry.Size()) != *preconditions.dstSize {
return fmt.Errorf("destination size changed from %d to %d", *preconditions.dstSize, dstEntry.Size())
}
return nil
}
func (preconditions copyRequestPreconditions) fingerprint(srcPath, dstPath util.FullPath) string {
return fmt.Sprintf(
"%s|%s|%s|%s|%s|%s|%s|%s|%s",
srcPath,
dstPath,
formatOptionalUint64(preconditions.srcInode),
formatOptionalInt64(preconditions.srcMtime),
formatOptionalInt64(preconditions.srcSize),
formatOptionalUint64(preconditions.dstInode),
formatOptionalInt64(preconditions.dstMtime),
formatOptionalInt64(preconditions.dstSize),
preconditions.requestID,
)
}
func formatOptionalUint64(value *uint64) string {
if value == nil {
return ""
}
return strconv.FormatUint(*value, 10)
}
func formatOptionalInt64(value *int64) string {
if value == nil {
return ""
}
return strconv.FormatInt(*value, 10)
}
func (fs *FilerServer) handleRecentCopyRequest(requestID, fingerprint string) (bool, error) {
if requestID == "" {
return false, nil
}
fs.recentCopyRequestsMu.Lock()
defer fs.recentCopyRequestsMu.Unlock()
now := time.Now()
for id, request := range fs.recentCopyRequests {
if now.After(request.expiresAt) {
delete(fs.recentCopyRequests, id)
}
}
request, found := fs.recentCopyRequests[requestID]
if !found {
return false, nil
}
if request.fingerprint != fingerprint {
return false, fmt.Errorf("copy request id %q was already used for a different copy", requestID)
}
return true, nil
}
func (fs *FilerServer) rememberRecentCopyRequest(requestID, fingerprint string) {
if requestID == "" {
return
}
fs.recentCopyRequestsMu.Lock()
defer fs.recentCopyRequestsMu.Unlock()
fs.recentCopyRequests[requestID] = recentCopyRequest{
fingerprint: fingerprint,
expiresAt: time.Now().Add(recentCopyRequestTTL),
}
}
func setCopyResponseHeaders(w http.ResponseWriter, requestID string) {
w.Header().Set(filer.CopyResponseHeaderCommitted, "true")
if requestID != "" {
w.Header().Set(filer.CopyResponseHeaderRequestID, requestID)
}
}
// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
if len(srcChunks) == 0 {

223
weed/server/filer_server_handlers_copy_test.go

@ -0,0 +1,223 @@
package weed_server
import (
"context"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestCopyEntryRefreshesDestinationTimestamps(t *testing.T) {
fs := &FilerServer{}
oldTime := time.Unix(123, 0)
srcEntry := &filer.Entry{
FullPath: util.FullPath("/src.txt"),
Attr: filer.Attr{
Mtime: oldTime,
Crtime: oldTime,
},
Content: []byte("hello"),
}
before := time.Now().Add(-time.Second)
copied, err := fs.copyEntry(context.Background(), srcEntry, util.FullPath("/dst.txt"), nil)
after := time.Now().Add(time.Second)
if err != nil {
t.Fatalf("copyEntry: %v", err)
}
if copied.Crtime.Before(before) || copied.Crtime.After(after) {
t.Fatalf("copied Crtime = %v, want between %v and %v", copied.Crtime, before, after)
}
if copied.Mtime.Before(before) || copied.Mtime.After(after) {
t.Fatalf("copied Mtime = %v, want between %v and %v", copied.Mtime, before, after)
}
if copied.Crtime.Equal(oldTime) || copied.Mtime.Equal(oldTime) {
t.Fatalf("destination timestamps should differ from source timestamps: src=%v copied=(%v,%v)", oldTime, copied.Crtime, copied.Mtime)
}
}
func TestPreserveDestinationMetadataForDataCopy(t *testing.T) {
dstTime := time.Unix(100, 0)
srcTime := time.Unix(200, 0)
dstEntry := &filer.Entry{
FullPath: util.FullPath("/dst.txt"),
Attr: filer.Attr{
Mtime: dstTime,
Crtime: dstTime,
Mode: 0100600,
Uid: 101,
Gid: 202,
TtlSec: 17,
UserName: "dst-user",
GroupNames: []string{"dst-group"},
SymlinkTarget: "",
Rdev: 9,
Inode: 1234,
},
Extended: map[string][]byte{
"user.color": []byte("blue"),
},
Remote: &filer_pb.RemoteEntry{
StorageName: "remote-store",
RemoteETag: "remote-etag",
RemoteSize: 7,
},
Quota: 77,
WORMEnforcedAtTsNs: 88,
HardLinkId: filer.HardLinkId([]byte("hard-link")),
HardLinkCounter: 3,
}
copiedEntry := &filer.Entry{
FullPath: util.FullPath("/dst.txt"),
Attr: filer.Attr{
Mtime: srcTime,
Crtime: srcTime,
Mode: 0100644,
Uid: 11,
Gid: 22,
Mime: "text/plain",
Md5: []byte("source-md5"),
FileSize: 5,
UserName: "src-user",
GroupNames: []string{"src-group"},
},
Content: []byte("hello"),
Extended: map[string][]byte{
"user.color": []byte("red"),
},
Quota: 5,
}
before := time.Now().Add(-time.Second)
preserveDestinationMetadataForDataCopy(dstEntry, copiedEntry)
after := time.Now().Add(time.Second)
if copiedEntry.Mode != dstEntry.Mode || copiedEntry.Uid != dstEntry.Uid || copiedEntry.Gid != dstEntry.Gid {
t.Fatalf("destination ownership/mode not preserved: got mode=%#o uid=%d gid=%d", copiedEntry.Mode, copiedEntry.Uid, copiedEntry.Gid)
}
if copiedEntry.Crtime != dstEntry.Crtime || copiedEntry.Inode != dstEntry.Inode {
t.Fatalf("destination identity not preserved: got crtime=%v inode=%d", copiedEntry.Crtime, copiedEntry.Inode)
}
if copiedEntry.Mtime.Before(before) || copiedEntry.Mtime.After(after) {
t.Fatalf("destination mtime = %v, want between %v and %v", copiedEntry.Mtime, before, after)
}
if copiedEntry.FileSize != 5 || copiedEntry.Mime != "text/plain" || string(copiedEntry.Md5) != "source-md5" {
t.Fatalf("copied data attributes changed unexpectedly: size=%d mime=%q md5=%q", copiedEntry.FileSize, copiedEntry.Mime, string(copiedEntry.Md5))
}
if string(copiedEntry.Extended["user.color"]) != "blue" {
t.Fatalf("destination xattrs not preserved: got %q", string(copiedEntry.Extended["user.color"]))
}
if copiedEntry.Remote == nil || copiedEntry.Remote.StorageName != "remote-store" {
t.Fatalf("destination remote metadata not preserved: %+v", copiedEntry.Remote)
}
if copiedEntry.Quota != 77 || copiedEntry.WORMEnforcedAtTsNs != 88 {
t.Fatalf("destination quota/WORM not preserved: quota=%d worm=%d", copiedEntry.Quota, copiedEntry.WORMEnforcedAtTsNs)
}
if string(copiedEntry.HardLinkId) != "hard-link" || copiedEntry.HardLinkCounter != 3 {
t.Fatalf("destination hard-link metadata not preserved: id=%q count=%d", string(copiedEntry.HardLinkId), copiedEntry.HardLinkCounter)
}
dstEntry.GroupNames[0] = "mutated"
dstEntry.Extended["user.color"][0] = 'g'
dstEntry.Remote.StorageName = "mutated-remote"
if copiedEntry.GroupNames[0] != "dst-group" {
t.Fatalf("group names should be cloned, got %q", copiedEntry.GroupNames[0])
}
if string(copiedEntry.Extended["user.color"]) != "blue" {
t.Fatalf("extended metadata should be cloned, got %q", string(copiedEntry.Extended["user.color"]))
}
if copiedEntry.Remote.StorageName != "remote-store" {
t.Fatalf("remote metadata should be cloned, got %q", copiedEntry.Remote.StorageName)
}
}
func TestValidateCopySourcePreconditions(t *testing.T) {
srcInode := uint64(101)
srcMtime := int64(200)
srcSize := int64(5)
preconditions := copyRequestPreconditions{
srcInode: &srcInode,
srcMtime: &srcMtime,
srcSize: &srcSize,
}
srcEntry := &filer.Entry{
FullPath: util.FullPath("/src.txt"),
Attr: filer.Attr{
Inode: srcInode,
Mtime: time.Unix(srcMtime, 0),
FileSize: uint64(srcSize),
},
Content: []byte("hello"),
}
if err := validateCopySourcePreconditions(preconditions, srcEntry); err != nil {
t.Fatalf("validate source preconditions: %v", err)
}
changedSize := int64(6)
preconditions.srcSize = &changedSize
if err := validateCopySourcePreconditions(preconditions, srcEntry); err == nil {
t.Fatal("expected source size mismatch to fail")
}
}
func TestValidateCopyDestinationPreconditions(t *testing.T) {
dstInode := uint64(202)
dstMtime := int64(300)
dstSize := int64(0)
preconditions := copyRequestPreconditions{
dstInode: &dstInode,
dstMtime: &dstMtime,
dstSize: &dstSize,
}
dstEntry := &filer.Entry{
FullPath: util.FullPath("/dst.txt"),
Attr: filer.Attr{
Inode: dstInode,
Mtime: time.Unix(dstMtime, 0),
FileSize: uint64(dstSize),
},
}
if err := validateCopyDestinationPreconditions(preconditions, dstEntry); err != nil {
t.Fatalf("validate destination preconditions: %v", err)
}
if err := validateCopyDestinationPreconditions(preconditions, nil); err == nil {
t.Fatal("expected missing destination to fail")
}
}
func TestRecentCopyRequestDeduplicatesByRequestID(t *testing.T) {
fs := &FilerServer{
recentCopyRequests: make(map[string]recentCopyRequest),
}
requestID := "copy-req"
fingerprint := "src|dst|1"
fs.rememberRecentCopyRequest(requestID, fingerprint)
handled, err := fs.handleRecentCopyRequest(requestID, fingerprint)
if err != nil {
t.Fatalf("handle recent copy request: %v", err)
}
if !handled {
t.Fatal("expected recent copy request to be recognized")
}
handled, err = fs.handleRecentCopyRequest(requestID, "different")
if err == nil {
t.Fatal("expected reused request id with different fingerprint to fail")
}
if handled {
t.Fatal("reused request id with different fingerprint should not be treated as handled")
}
}
Loading…
Cancel
Save