diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index c0cdae7d3..5a10053d1 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -312,35 +312,37 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{ - MountDirectory: dir, - FilerAddresses: filerAddresses, - GrpcDialOption: grpcDialOption, - FilerMountRootPath: mountRoot, - Collection: *option.collection, - Replication: *option.replication, - TtlSec: int32(*option.ttlSec), - DiskType: types.ToDiskType(*option.diskType), - ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - ConcurrentWriters: *option.concurrentWriters, - ConcurrentReaders: *option.concurrentReaders, - CacheDirForRead: *option.cacheDirForRead, - CacheSizeMBForRead: *option.cacheSizeMBForRead, - CacheDirForWrite: cacheDirForWrite, - CacheMetaTTlSec: *option.cacheMetaTtlSec, - DataCenter: *option.dataCenter, - Quota: int64(*option.collectionQuota) * 1024 * 1024, - MountUid: uid, - MountGid: gid, - MountMode: mountMode, - MountCtime: fileInfo.ModTime(), - MountMtime: time.Now(), - Umask: umask, - VolumeServerAccess: *mountOptions.volumeServerAccess, - Cipher: cipher, - UidGidMapper: uidGidMapper, - DisableXAttr: *option.disableXAttr, - IsMacOs: runtime.GOOS == "darwin", - MetadataFlushSeconds: *option.metadataFlushSeconds, + MountDirectory: dir, + FilerAddresses: filerAddresses, + GrpcDialOption: grpcDialOption, + FilerSigningKey: security.SigningKey(util.GetViper().GetString("jwt.filer_signing.key")), + FilerSigningExpiresAfterSec: util.GetViper().GetInt("jwt.filer_signing.expires_after_seconds"), + FilerMountRootPath: mountRoot, + Collection: *option.collection, + Replication: *option.replication, + TtlSec: int32(*option.ttlSec), + DiskType: types.ToDiskType(*option.diskType), + ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + ConcurrentWriters: *option.concurrentWriters, + ConcurrentReaders: *option.concurrentReaders, + CacheDirForRead: *option.cacheDirForRead, + CacheSizeMBForRead: *option.cacheSizeMBForRead, + CacheDirForWrite: cacheDirForWrite, + CacheMetaTTlSec: *option.cacheMetaTtlSec, + DataCenter: *option.dataCenter, + Quota: int64(*option.collectionQuota) * 1024 * 1024, + MountUid: uid, + MountGid: gid, + MountMode: mountMode, + MountCtime: fileInfo.ModTime(), + MountMtime: time.Now(), + Umask: umask, + VolumeServerAccess: *mountOptions.volumeServerAccess, + Cipher: cipher, + UidGidMapper: uidGidMapper, + DisableXAttr: *option.disableXAttr, + IsMacOs: runtime.GOOS == "darwin", + MetadataFlushSeconds: *option.metadataFlushSeconds, // RDMA acceleration options RdmaEnabled: *option.rdmaEnabled, RdmaSidecarAddr: *option.rdmaSidecarAddr, diff --git a/weed/filer/copy_params.go b/weed/filer/copy_params.go new file mode 100644 index 000000000..1245e149f --- /dev/null +++ b/weed/filer/copy_params.go @@ -0,0 +1,17 @@ +package filer + +const ( + CopyQueryParamFrom = "cp.from" + CopyQueryParamOverwrite = "overwrite" + CopyQueryParamDataOnly = "dataOnly" + CopyQueryParamRequestID = "copy.requestId" + CopyQueryParamSourceInode = "copy.srcInode" + CopyQueryParamSourceMtime = "copy.srcMtime" + CopyQueryParamSourceSize = "copy.srcSize" + CopyQueryParamDestinationInode = "copy.dstInode" + CopyQueryParamDestinationMtime = "copy.dstMtime" + CopyQueryParamDestinationSize = "copy.dstSize" + + CopyResponseHeaderCommitted = "X-SeaweedFS-Copy-Committed" + CopyResponseHeaderRequestID = "X-SeaweedFS-Copy-Request-ID" +) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index e24a49aac..9145b28fe 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" @@ -30,27 +31,29 @@ import ( ) type Option struct { - filerIndex int32 // align memory for atomic read/write - FilerAddresses []pb.ServerAddress - MountDirectory string - GrpcDialOption grpc.DialOption - FilerMountRootPath string - Collection string - Replication string - TtlSec int32 - DiskType types.DiskType - ChunkSizeLimit int64 - ConcurrentWriters int - ConcurrentReaders int - CacheDirForRead string - CacheSizeMBForRead int64 - CacheDirForWrite string - CacheMetaTTlSec int - DataCenter string - Umask os.FileMode - Quota int64 - DisableXAttr bool - IsMacOs bool + filerIndex int32 // align memory for atomic read/write + FilerAddresses []pb.ServerAddress + MountDirectory string + GrpcDialOption grpc.DialOption + FilerSigningKey security.SigningKey + FilerSigningExpiresAfterSec int + FilerMountRootPath string + Collection string + Replication string + TtlSec int32 + DiskType types.DiskType + ChunkSizeLimit int64 + ConcurrentWriters int + ConcurrentReaders int + CacheDirForRead string + CacheSizeMBForRead int64 + CacheDirForWrite string + CacheMetaTTlSec int + DataCenter string + Umask os.FileMode + Quota int64 + DisableXAttr bool + IsMacOs bool MountUid uint32 MountGid uint32 @@ -173,11 +176,11 @@ func NewSeaweedFileSystem(option *Option) *WFS { dhMap: NewDirectoryHandleToInode(), filerClient: filerClient, // nil for proxy mode, initialized for direct access pendingAsyncFlush: make(map[uint64]chan struct{}), - fhLockTable: util.NewLockTable[FileHandleId](), - refreshingDirs: make(map[util.FullPath]struct{}), - dirHotWindow: dirHotWindow, - dirHotThreshold: dirHotThreshold, - dirIdleEvict: dirIdleEvict, + fhLockTable: util.NewLockTable[FileHandleId](), + refreshingDirs: make(map[util.FullPath]struct{}), + dirHotWindow: dirHotWindow, + dirHotThreshold: dirHotThreshold, + dirIdleEvict: dirIdleEvict, } wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses))) diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index 0b8f65589..1b7b8e407 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -1,15 +1,63 @@ package mount import ( + "bytes" + "context" + "fmt" + "io" + "math" "net/http" + "net/url" "time" "github.com/seaweedfs/go-fuse/v2/fuse" + "google.golang.org/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + request_id "github.com/seaweedfs/seaweedfs/weed/util/request_id" ) +type serverSideWholeFileCopyOutcome uint8 + +const ( + serverSideWholeFileCopyNotCommitted serverSideWholeFileCopyOutcome = iota + serverSideWholeFileCopyCommitted + serverSideWholeFileCopyAmbiguous +) + +type wholeFileServerCopyRequest struct { + srcPath util.FullPath + dstPath util.FullPath + sourceSize int64 + srcInode uint64 + srcMtime int64 + dstInode uint64 + dstMtime int64 + dstSize int64 + sourceMime string + sourceMd5 []byte + copyRequestID string +} + +// performServerSideWholeFileCopy is a package-level seam so tests can override +// the filer call without standing up an HTTP endpoint. +var performServerSideWholeFileCopy = func(cancel <-chan struct{}, wfs *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + return wfs.copyEntryViaFiler(cancel, copyRequest) +} + +// filerCopyRequestTimeout bounds the mount->filer POST so a stalled copy does +// not block copy_file_range workers indefinitely. +const filerCopyRequestTimeout = 60 * time.Second + +// filerCopyReadbackTimeout gives the follow-up metadata reload a fresh deadline +// after the filer already accepted the copy request. +const filerCopyReadbackTimeout = 15 * time.Second + // CopyFileRange copies data from one file to another from and to specified offsets. // // See https://man7.org/linux/man-pages/man2/copy_file_range.2.html @@ -70,6 +118,10 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) in.OffOut, in.OffOut+in.Len, ) + if written, handled, status := wfs.tryServerSideWholeFileCopy(cancel, in, fhIn, fhOut); handled { + return written, status + } + // Concurrent copy operations could allocate too much memory, so we want to // throttle our concurrency, scaling with the number of writers the mount // was configured with. @@ -155,3 +207,306 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) written = uint32(totalCopied) return written, fuse.OK } + +func (wfs *WFS) tryServerSideWholeFileCopy(cancel <-chan struct{}, in *fuse.CopyFileRangeIn, fhIn, fhOut *FileHandle) (written uint32, handled bool, code fuse.Status) { + copyRequest, ok := wholeFileServerCopyCandidate(fhIn, fhOut, in) + if !ok { + return 0, false, fuse.OK + } + + glog.V(1).Infof("CopyFileRange server-side copy %s => %s (%d bytes)", copyRequest.srcPath, copyRequest.dstPath, copyRequest.sourceSize) + + entry, outcome, err := performServerSideWholeFileCopy(cancel, wfs, copyRequest) + switch outcome { + case serverSideWholeFileCopyCommitted: + if err != nil { + glog.Warningf("CopyFileRange server-side copy %s => %s committed but local refresh failed: %v", copyRequest.srcPath, copyRequest.dstPath, err) + } else { + glog.V(1).Infof("CopyFileRange server-side copy %s => %s completed (%d bytes)", copyRequest.srcPath, copyRequest.dstPath, copyRequest.sourceSize) + } + wfs.applyServerSideWholeFileCopyResult(fhIn, fhOut, copyRequest.dstPath, entry, copyRequest.sourceSize) + return uint32(copyRequest.sourceSize), true, fuse.OK + case serverSideWholeFileCopyAmbiguous: + glog.Warningf("CopyFileRange server-side copy %s => %s outcome ambiguous: %v", copyRequest.srcPath, copyRequest.dstPath, err) + return 0, true, fuse.EIO + default: + glog.V(0).Infof("CopyFileRange server-side copy %s => %s fallback to chunk copy: %v", copyRequest.srcPath, copyRequest.dstPath, err) + return 0, false, fuse.OK + } +} + +func (wfs *WFS) applyServerSideWholeFileCopyResult(fhIn, fhOut *FileHandle, dstPath util.FullPath, entry *filer_pb.Entry, sourceSize int64) { + if entry == nil { + entry = synthesizeLocalEntryForServerSideWholeFileCopy(fhIn, fhOut, sourceSize) + } + if entry == nil { + glog.Warningf("CopyFileRange server-side copy %s left no local entry to apply", dstPath) + return + } + + fhOut.SetEntry(entry) + fhOut.RememberPath(dstPath) + if entry.Attributes != nil { + fhOut.contentType = entry.Attributes.Mime + } + fhOut.dirtyMetadata = false + wfs.updateServerSideWholeFileCopyMetaCache(dstPath, entry) + wfs.invalidateCopyDestinationCache(fhOut.inode, dstPath) +} + +func (wfs *WFS) updateServerSideWholeFileCopyMetaCache(dstPath util.FullPath, entry *filer_pb.Entry) { + if wfs.metaCache == nil || entry == nil { + return + } + + dir, _ := dstPath.DirAndName() + event := metadataUpdateEvent(dir, entry) + if applyErr := wfs.applyLocalMetadataEvent(context.Background(), event); applyErr != nil { + glog.Warningf("CopyFileRange metadata update %s: %v", dstPath, applyErr) + wfs.markDirectoryReadThrough(util.FullPath(dir)) + } +} + +func synthesizeLocalEntryForServerSideWholeFileCopy(fhIn, fhOut *FileHandle, sourceSize int64) *filer_pb.Entry { + dstEntry := fhOut.GetEntry().GetEntry() + if dstEntry == nil { + return nil + } + + localEntry := proto.Clone(dstEntry).(*filer_pb.Entry) + if localEntry.Attributes == nil { + localEntry.Attributes = &filer_pb.FuseAttributes{} + } + + if srcEntry := fhIn.GetEntry().GetEntry(); srcEntry != nil { + srcEntryCopy := proto.Clone(srcEntry).(*filer_pb.Entry) + localEntry.Content = srcEntryCopy.Content + localEntry.Chunks = srcEntryCopy.Chunks + if srcEntryCopy.Attributes != nil { + localEntry.Attributes.Mime = srcEntryCopy.Attributes.Mime + localEntry.Attributes.Md5 = srcEntryCopy.Attributes.Md5 + } + } + + localEntry.Attributes.FileSize = uint64(sourceSize) + localEntry.Attributes.Mtime = time.Now().Unix() + return localEntry +} + +func wholeFileServerCopyCandidate(fhIn, fhOut *FileHandle, in *fuse.CopyFileRangeIn) (copyRequest wholeFileServerCopyRequest, ok bool) { + if fhIn == nil || fhOut == nil || in == nil { + glog.V(4).Infof("server-side copy: skipped (nil handle or input)") + return wholeFileServerCopyRequest{}, false + } + if fhIn.fh == fhOut.fh { + glog.V(4).Infof("server-side copy: skipped (same file handle)") + return wholeFileServerCopyRequest{}, false + } + if fhIn.dirtyMetadata || fhOut.dirtyMetadata { + glog.V(4).Infof("server-side copy: skipped (dirty metadata: in=%v out=%v)", fhIn.dirtyMetadata, fhOut.dirtyMetadata) + return wholeFileServerCopyRequest{}, false + } + if in.OffIn != 0 || in.OffOut != 0 { + glog.V(4).Infof("server-side copy: skipped (non-zero offsets: in=%d out=%d)", in.OffIn, in.OffOut) + return wholeFileServerCopyRequest{}, false + } + + srcEntry := fhIn.GetEntry() + dstEntry := fhOut.GetEntry() + if srcEntry == nil || dstEntry == nil { + glog.V(4).Infof("server-side copy: skipped (nil entry: src=%v dst=%v)", srcEntry == nil, dstEntry == nil) + return wholeFileServerCopyRequest{}, false + } + if srcEntry.IsDirectory || dstEntry.IsDirectory { + glog.V(4).Infof("server-side copy: skipped (directory)") + return wholeFileServerCopyRequest{}, false + } + + srcPbEntry := srcEntry.GetEntry() + dstPbEntry := dstEntry.GetEntry() + if srcPbEntry == nil || dstPbEntry == nil || srcPbEntry.Attributes == nil || dstPbEntry.Attributes == nil { + glog.V(4).Infof("server-side copy: skipped (missing entry attributes)") + return wholeFileServerCopyRequest{}, false + } + + sourceSize := int64(filer.FileSize(srcPbEntry)) + // go-fuse exposes CopyFileRange's return value as uint32, so the fast path + // should only claim copies that can be reported without truncation. + if sourceSize <= 0 || sourceSize > math.MaxUint32 || int64(in.Len) < sourceSize { + glog.V(4).Infof("server-side copy: skipped (size mismatch: sourceSize=%d len=%d)", sourceSize, in.Len) + return wholeFileServerCopyRequest{}, false + } + + dstSize := int64(filer.FileSize(dstPbEntry)) + if dstSize != 0 || len(dstPbEntry.GetChunks()) > 0 || len(dstPbEntry.Content) > 0 { + glog.V(4).Infof("server-side copy: skipped (destination not empty)") + return wholeFileServerCopyRequest{}, false + } + + srcPath := fhIn.FullPath() + dstPath := fhOut.FullPath() + if srcPath == "" || dstPath == "" || srcPath == dstPath { + glog.V(4).Infof("server-side copy: skipped (invalid paths: src=%q dst=%q)", srcPath, dstPath) + return wholeFileServerCopyRequest{}, false + } + + if srcPbEntry.Attributes.Inode == 0 || dstPbEntry.Attributes.Inode == 0 { + glog.V(4).Infof("server-side copy: skipped (missing inode preconditions: src=%d dst=%d)", srcPbEntry.Attributes.Inode, dstPbEntry.Attributes.Inode) + return wholeFileServerCopyRequest{}, false + } + + return wholeFileServerCopyRequest{ + srcPath: srcPath, + dstPath: dstPath, + sourceSize: sourceSize, + srcInode: srcPbEntry.Attributes.Inode, + srcMtime: srcPbEntry.Attributes.Mtime, + dstInode: dstPbEntry.Attributes.Inode, + dstMtime: dstPbEntry.Attributes.Mtime, + dstSize: dstSize, + sourceMime: srcPbEntry.Attributes.Mime, + sourceMd5: append([]byte(nil), srcPbEntry.Attributes.Md5...), + copyRequestID: request_id.New(), + }, true +} + +func (wfs *WFS) copyEntryViaFiler(cancel <-chan struct{}, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + baseCtx, baseCancel := context.WithCancel(context.Background()) + defer baseCancel() + + if cancel != nil { + go func() { + select { + case <-cancel: + baseCancel() + case <-baseCtx.Done(): + } + }() + } + + postCtx, postCancel := context.WithTimeout(baseCtx, filerCopyRequestTimeout) + defer postCancel() + + httpClient := util_http.GetGlobalHttpClient() + if httpClient == nil { + var err error + httpClient, err = util_http.NewGlobalHttpClient() + if err != nil { + return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("create filer copy http client: %w", err) + } + } + + copyURL := &url.URL{ + Scheme: httpClient.GetHttpScheme(), + Host: wfs.getCurrentFiler().ToHttpAddress(), + Path: string(copyRequest.dstPath), + } + query := copyURL.Query() + query.Set(filer.CopyQueryParamFrom, string(copyRequest.srcPath)) + query.Set(filer.CopyQueryParamOverwrite, "true") + query.Set(filer.CopyQueryParamDataOnly, "true") + query.Set(filer.CopyQueryParamRequestID, copyRequest.copyRequestID) + query.Set(filer.CopyQueryParamSourceInode, fmt.Sprintf("%d", copyRequest.srcInode)) + query.Set(filer.CopyQueryParamSourceMtime, fmt.Sprintf("%d", copyRequest.srcMtime)) + query.Set(filer.CopyQueryParamSourceSize, fmt.Sprintf("%d", copyRequest.sourceSize)) + query.Set(filer.CopyQueryParamDestinationInode, fmt.Sprintf("%d", copyRequest.dstInode)) + query.Set(filer.CopyQueryParamDestinationMtime, fmt.Sprintf("%d", copyRequest.dstMtime)) + query.Set(filer.CopyQueryParamDestinationSize, fmt.Sprintf("%d", copyRequest.dstSize)) + copyURL.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(postCtx, http.MethodPost, copyURL.String(), nil) + if err != nil { + return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("create filer copy request: %w", err) + } + if jwt := wfs.filerCopyJWT(); jwt != "" { + req.Header.Set("Authorization", "Bearer "+string(jwt)) + } + + resp, err := httpClient.Do(req) + if err != nil { + return wfs.confirmServerSideWholeFileCopyAfterAmbiguousRequest(baseCtx, copyRequest, fmt.Errorf("execute filer copy request: %w", err)) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, serverSideWholeFileCopyNotCommitted, fmt.Errorf("filer copy %s => %s failed: status %d: %s", copyRequest.srcPath, copyRequest.dstPath, resp.StatusCode, string(body)) + } + + readbackCtx, readbackCancel := context.WithTimeout(baseCtx, filerCopyReadbackTimeout) + defer readbackCancel() + + entry, err := filer_pb.GetEntry(readbackCtx, wfs, copyRequest.dstPath) + if err != nil { + return nil, serverSideWholeFileCopyCommitted, fmt.Errorf("reload copied entry %s: %w", copyRequest.dstPath, err) + } + if entry == nil { + return nil, serverSideWholeFileCopyCommitted, fmt.Errorf("reload copied entry %s: not found", copyRequest.dstPath) + } + if entry.Attributes != nil && wfs.option != nil && wfs.option.UidGidMapper != nil { + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) + } + + return entry, serverSideWholeFileCopyCommitted, nil +} + +func (wfs *WFS) confirmServerSideWholeFileCopyAfterAmbiguousRequest(baseCtx context.Context, copyRequest wholeFileServerCopyRequest, requestErr error) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + readbackCtx, readbackCancel := context.WithTimeout(baseCtx, filerCopyReadbackTimeout) + defer readbackCancel() + + entry, err := filer_pb.GetEntry(readbackCtx, wfs, copyRequest.dstPath) + if err == nil && entry != nil && entryMatchesServerSideWholeFileCopy(copyRequest, entry) { + if entry.Attributes != nil && wfs.option != nil && wfs.option.UidGidMapper != nil { + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) + } + return entry, serverSideWholeFileCopyCommitted, nil + } + + if err != nil { + return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; post-copy readback failed: %v", requestErr, err) + } + if entry == nil { + return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; destination %s was not readable after the ambiguous request", requestErr, copyRequest.dstPath) + } + return nil, serverSideWholeFileCopyAmbiguous, fmt.Errorf("%w; destination %s did not match the requested copy after the ambiguous request", requestErr, copyRequest.dstPath) +} + +func entryMatchesServerSideWholeFileCopy(copyRequest wholeFileServerCopyRequest, entry *filer_pb.Entry) bool { + if entry == nil || entry.Attributes == nil { + return false + } + if copyRequest.dstInode != 0 && entry.Attributes.Inode != copyRequest.dstInode { + return false + } + if entry.Attributes.FileSize != uint64(copyRequest.sourceSize) { + return false + } + if copyRequest.sourceMime != "" && entry.Attributes.Mime != copyRequest.sourceMime { + return false + } + if len(copyRequest.sourceMd5) > 0 && !bytes.Equal(entry.Attributes.Md5, copyRequest.sourceMd5) { + return false + } + return true +} + +func (wfs *WFS) filerCopyJWT() security.EncodedJwt { + if wfs.option == nil || len(wfs.option.FilerSigningKey) == 0 { + return "" + } + return security.GenJwtForFilerServer(wfs.option.FilerSigningKey, wfs.option.FilerSigningExpiresAfterSec) +} + +func (wfs *WFS) invalidateCopyDestinationCache(inode uint64, fullPath util.FullPath) { + if wfs.fuseServer != nil { + if status := wfs.fuseServer.InodeNotify(inode, 0, -1); status != fuse.OK { + glog.V(4).Infof("CopyFileRange invalidate inode %d: %v", inode, status) + } + dir, name := fullPath.DirAndName() + if parentInode, found := wfs.inodeToPath.GetInode(util.FullPath(dir)); found { + if status := wfs.fuseServer.EntryNotify(parentInode, name); status != fuse.OK { + glog.V(4).Infof("CopyFileRange invalidate entry %s: %v", fullPath, status) + } + } + } +} diff --git a/weed/mount/weedfs_file_copy_range_test.go b/weed/mount/weedfs_file_copy_range_test.go new file mode 100644 index 000000000..a596eadf8 --- /dev/null +++ b/weed/mount/weedfs_file_copy_range_test.go @@ -0,0 +1,355 @@ +package mount + +import ( + "context" + "errors" + "path/filepath" + "testing" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestWholeFileServerCopyCandidate(t *testing.T) { + wfs := newCopyRangeTestWFS() + + srcPath := util.FullPath("/src.txt") + dstPath := util.FullPath("/dst.txt") + srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true) + dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true) + + srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{ + Name: "src.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + FileSize: 5, + Inode: srcInode, + }, + Content: []byte("hello"), + }) + dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{ + Name: "dst.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + Inode: dstInode, + }, + }) + + srcHandle.RememberPath(srcPath) + dstHandle.RememberPath(dstPath) + + in := &fuse.CopyFileRangeIn{ + FhIn: uint64(srcHandle.fh), + FhOut: uint64(dstHandle.fh), + OffIn: 0, + OffOut: 0, + Len: 8, + } + + copyRequest, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in) + if !ok { + t.Fatal("expected whole-file server copy candidate") + } + if copyRequest.srcPath != srcPath { + t.Fatalf("source path = %q, want %q", copyRequest.srcPath, srcPath) + } + if copyRequest.dstPath != dstPath { + t.Fatalf("destination path = %q, want %q", copyRequest.dstPath, dstPath) + } + if copyRequest.sourceSize != 5 { + t.Fatalf("source size = %d, want 5", copyRequest.sourceSize) + } + if copyRequest.srcInode == 0 || copyRequest.dstInode == 0 { + t.Fatalf("expected inode preconditions, got src=%d dst=%d", copyRequest.srcInode, copyRequest.dstInode) + } + + srcHandle.dirtyMetadata = true + if _, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in); ok { + t.Fatal("dirty source handle should disable whole-file server copy") + } + srcHandle.dirtyMetadata = false + + in.Len = 4 + if _, ok := wholeFileServerCopyCandidate(srcHandle, dstHandle, in); ok { + t.Fatal("short copy request should disable whole-file server copy") + } +} + +func TestCopyFileRangeUsesServerSideWholeFileCopy(t *testing.T) { + wfs := newCopyRangeTestWFS() + + srcPath := util.FullPath("/src.txt") + dstPath := util.FullPath("/dst.txt") + srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true) + dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true) + + srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{ + Name: "src.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + FileSize: 5, + Inode: srcInode, + }, + Content: []byte("hello"), + }) + dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{ + Name: "dst.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + Inode: dstInode, + }, + }) + + srcHandle.RememberPath(srcPath) + dstHandle.RememberPath(dstPath) + + originalCopy := performServerSideWholeFileCopy + defer func() { + performServerSideWholeFileCopy = originalCopy + }() + + var called bool + performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + called = true + if gotWFS != wfs { + t.Fatalf("wfs = %p, want %p", gotWFS, wfs) + } + if copyRequest.srcPath != srcPath { + t.Fatalf("source path = %q, want %q", copyRequest.srcPath, srcPath) + } + if copyRequest.dstPath != dstPath { + t.Fatalf("destination path = %q, want %q", copyRequest.dstPath, dstPath) + } + return &filer_pb.Entry{ + Name: "dst.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + FileSize: 5, + Mime: "text/plain; charset=utf-8", + }, + Content: []byte("hello"), + }, serverSideWholeFileCopyCommitted, nil + } + + written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{ + FhIn: uint64(srcHandle.fh), + FhOut: uint64(dstHandle.fh), + OffIn: 0, + OffOut: 0, + Len: 8, + }) + if status != fuse.OK { + t.Fatalf("CopyFileRange status = %v, want OK", status) + } + if written != 5 { + t.Fatalf("CopyFileRange wrote %d bytes, want 5", written) + } + if !called { + t.Fatal("expected server-side whole-file copy path to be used") + } + + gotEntry := dstHandle.GetEntry().GetEntry() + if gotEntry.Attributes == nil || gotEntry.Attributes.FileSize != 5 { + t.Fatalf("destination size = %v, want 5", gotEntry.GetAttributes().GetFileSize()) + } + if string(gotEntry.Content) != "hello" { + t.Fatalf("destination content = %q, want %q", string(gotEntry.Content), "hello") + } + if dstHandle.dirtyMetadata { + t.Fatal("server-side whole-file copy should leave destination handle clean") + } +} + +func TestCopyFileRangeDoesNotFallbackAfterCommittedServerCopyRefreshFailure(t *testing.T) { + wfs := newCopyRangeTestWFSWithMetaCache(t) + + srcPath := util.FullPath("/src.txt") + dstPath := util.FullPath("/dst.txt") + srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true) + dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true) + + srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{ + Name: "src.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + FileSize: 5, + Mime: "text/plain; charset=utf-8", + Md5: []byte("abcde"), + Inode: srcInode, + }, + Content: []byte("hello"), + }) + dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{ + Name: "dst.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100600, + Inode: dstInode, + }, + }) + + srcHandle.RememberPath(srcPath) + dstHandle.RememberPath(dstPath) + + originalCopy := performServerSideWholeFileCopy + defer func() { + performServerSideWholeFileCopy = originalCopy + }() + + performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + if gotWFS != wfs || copyRequest.srcPath != srcPath || copyRequest.dstPath != dstPath { + t.Fatalf("unexpected server-side copy call: wfs=%p src=%q dst=%q", gotWFS, copyRequest.srcPath, copyRequest.dstPath) + } + return nil, serverSideWholeFileCopyCommitted, errors.New("reload copied entry: transient filer read failure") + } + + written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{ + FhIn: uint64(srcHandle.fh), + FhOut: uint64(dstHandle.fh), + OffIn: 0, + OffOut: 0, + Len: 8, + }) + if status != fuse.OK { + t.Fatalf("CopyFileRange status = %v, want OK", status) + } + if written != 5 { + t.Fatalf("CopyFileRange wrote %d bytes, want 5", written) + } + if dstHandle.dirtyMetadata { + t.Fatal("committed server-side copy should not fall back to dirty-page copy") + } + + gotEntry := dstHandle.GetEntry().GetEntry() + if gotEntry.GetAttributes().GetFileSize() != 5 { + t.Fatalf("destination size = %d, want 5", gotEntry.GetAttributes().GetFileSize()) + } + if gotEntry.GetAttributes().GetFileMode() != 0100600 { + t.Fatalf("destination mode = %#o, want %#o", gotEntry.GetAttributes().GetFileMode(), uint32(0100600)) + } + if string(gotEntry.GetContent()) != "hello" { + t.Fatalf("destination content = %q, want %q", string(gotEntry.GetContent()), "hello") + } + + cachedEntry, err := wfs.metaCache.FindEntry(context.Background(), dstPath) + if err != nil { + t.Fatalf("metaCache find entry: %v", err) + } + if cachedEntry.FileSize != 5 { + t.Fatalf("metaCache destination size = %d, want 5", cachedEntry.FileSize) + } + if cachedEntry.Mime != "text/plain; charset=utf-8" { + t.Fatalf("metaCache destination mime = %q, want %q", cachedEntry.Mime, "text/plain; charset=utf-8") + } +} + +func TestCopyFileRangeReturnsEIOForAmbiguousServerSideCopy(t *testing.T) { + wfs := newCopyRangeTestWFS() + + srcPath := util.FullPath("/src.txt") + dstPath := util.FullPath("/dst.txt") + srcInode := wfs.inodeToPath.Lookup(srcPath, 1, false, false, 0, true) + dstInode := wfs.inodeToPath.Lookup(dstPath, 1, false, false, 0, true) + + srcHandle := wfs.fhMap.AcquireFileHandle(wfs, srcInode, &filer_pb.Entry{ + Name: "src.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100644, + FileSize: 5, + Inode: srcInode, + }, + Content: []byte("hello"), + }) + dstHandle := wfs.fhMap.AcquireFileHandle(wfs, dstInode, &filer_pb.Entry{ + Name: "dst.txt", + Attributes: &filer_pb.FuseAttributes{ + FileMode: 0100600, + Inode: dstInode, + }, + }) + + srcHandle.RememberPath(srcPath) + dstHandle.RememberPath(dstPath) + + originalCopy := performServerSideWholeFileCopy + defer func() { + performServerSideWholeFileCopy = originalCopy + }() + + performServerSideWholeFileCopy = func(cancel <-chan struct{}, gotWFS *WFS, copyRequest wholeFileServerCopyRequest) (*filer_pb.Entry, serverSideWholeFileCopyOutcome, error) { + if gotWFS != wfs || copyRequest.srcPath != srcPath || copyRequest.dstPath != dstPath { + t.Fatalf("unexpected server-side copy call: wfs=%p src=%q dst=%q", gotWFS, copyRequest.srcPath, copyRequest.dstPath) + } + return nil, serverSideWholeFileCopyAmbiguous, errors.New("transport timeout after request dispatch") + } + + written, status := wfs.CopyFileRange(make(chan struct{}), &fuse.CopyFileRangeIn{ + FhIn: uint64(srcHandle.fh), + FhOut: uint64(dstHandle.fh), + OffIn: 0, + OffOut: 0, + Len: 8, + }) + if status != fuse.EIO { + t.Fatalf("CopyFileRange status = %v, want EIO", status) + } + if written != 0 { + t.Fatalf("CopyFileRange wrote %d bytes, want 0", written) + } + if dstHandle.dirtyMetadata { + t.Fatal("ambiguous server-side copy should not fall back to dirty-page copy") + } + if dstHandle.GetEntry().GetEntry().GetAttributes().GetFileSize() != 0 { + t.Fatalf("destination size = %d, want 0", dstHandle.GetEntry().GetEntry().GetAttributes().GetFileSize()) + } +} + +func newCopyRangeTestWFS() *WFS { + wfs := &WFS{ + option: &Option{ + ChunkSizeLimit: 1024, + ConcurrentReaders: 1, + VolumeServerAccess: "filerProxy", + FilerAddresses: []pb.ServerAddress{"127.0.0.1:8888"}, + }, + inodeToPath: NewInodeToPath(util.FullPath("/"), 0), + fhMap: NewFileHandleToInode(), + fhLockTable: util.NewLockTable[FileHandleId](), + } + wfs.copyBufferPool.New = func() any { + return make([]byte, 1024) + } + return wfs +} + +func newCopyRangeTestWFSWithMetaCache(t *testing.T) *WFS { + t.Helper() + + wfs := newCopyRangeTestWFS() + root := util.FullPath("/") + wfs.inodeToPath.MarkChildrenCached(root) + uidGidMapper, err := meta_cache.NewUidGidMapper("", "") + if err != nil { + t.Fatalf("create uid/gid mapper: %v", err) + } + wfs.metaCache = meta_cache.NewMetaCache( + filepath.Join(t.TempDir(), "meta"), + uidGidMapper, + root, + func(path util.FullPath) { + wfs.inodeToPath.MarkChildrenCached(path) + }, + func(path util.FullPath) bool { + return wfs.inodeToPath.IsChildrenCached(path) + }, + func(util.FullPath, *filer_pb.Entry) {}, + nil, + ) + t.Cleanup(func() { + wfs.metaCache.Shutdown() + }) + + return wfs +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 757374ea1..e637b2c0d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -115,6 +115,9 @@ type FilerServer struct { // deduplicates concurrent remote object caching operations remoteCacheGroup singleflight.Group + recentCopyRequestsMu sync.Mutex + recentCopyRequests map[string]recentCopyRequest + // credential manager for IAM operations CredentialManager *credential.CredentialManager } @@ -153,6 +156,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), knownListeners: make(map[int32]int32), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + recentCopyRequests: make(map[string]recentCopyRequest), CredentialManager: option.CredentialManager, } fs.listenersCond = sync.NewCond(&fs.listenersLock) diff --git a/weed/server/filer_server_handlers_copy.go b/weed/server/filer_server_handlers_copy.go index 6320d62fb..50de72c48 100644 --- a/weed/server/filer_server_handlers_copy.go +++ b/weed/server/filer_server_handlers_copy.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "net/http" + "net/url" + "strconv" "strings" "time" @@ -19,9 +21,28 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +const recentCopyRequestTTL = 2 * time.Minute + +type copyRequestPreconditions struct { + requestID string + srcInode *uint64 + srcMtime *int64 + srcSize *int64 + dstInode *uint64 + dstMtime *int64 + dstSize *int64 +} + +type recentCopyRequest struct { + fingerprint string + expiresAt time.Time +} + func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { - src := r.URL.Query().Get("cp.from") + src := r.URL.Query().Get(filer.CopyQueryParamFrom) dst := r.URL.Path + overwrite := r.URL.Query().Get(filer.CopyQueryParamOverwrite) == "true" + dataOnly := r.URL.Query().Get(filer.CopyQueryParamDataOnly) == "true" glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst) @@ -84,15 +105,54 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http. finalDstPath = util.FullPath(newDir).Child(newName) } + if srcPath == finalDstPath { + glog.V(2).InfofCtx(ctx, "FilerServer.copy rejected self-copy for %s", finalDstPath) + err = fmt.Errorf("source and destination are the same path: %s; choose a different destination file name or directory", finalDstPath) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + preconditions, err := parseCopyRequestPreconditions(r.URL.Query()) + if err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + copyFingerprint := preconditions.fingerprint(srcPath, finalDstPath) + if handled, recentErr := fs.handleRecentCopyRequest(preconditions.requestID, copyFingerprint); handled || recentErr != nil { + if recentErr != nil { + writeJsonError(w, r, http.StatusConflict, recentErr) + return + } + setCopyResponseHeaders(w, preconditions.requestID) + w.WriteHeader(http.StatusNoContent) + return + } + if err = validateCopySourcePreconditions(preconditions, srcEntry); err != nil { + writeJsonError(w, r, http.StatusPreconditionFailed, err) + return + } + // Check if destination file already exists - // TODO: add an overwrite parameter to allow overwriting + var existingDstEntry *filer.Entry if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound { err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err) writeJsonError(w, r, http.StatusInternalServerError, err) return } else if dstEntry != nil { - err = fmt.Errorf("destination file %s already exists", finalDstPath) - writeJsonError(w, r, http.StatusConflict, err) + existingDstEntry = dstEntry + if dstEntry.IsDirectory() { + err = fmt.Errorf("destination file %s is a directory", finalDstPath) + writeJsonError(w, r, http.StatusConflict, err) + return + } + if !overwrite { + err = fmt.Errorf("destination file %s already exists", finalDstPath) + writeJsonError(w, r, http.StatusConflict, err) + return + } + } + if err = validateCopyDestinationPreconditions(preconditions, existingDstEntry); err != nil { + writeJsonError(w, r, http.StatusPreconditionFailed, err) return } @@ -103,8 +163,13 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http. writeJsonError(w, r, http.StatusInternalServerError, err) return } + if dataOnly && existingDstEntry != nil { + preserveDestinationMetadataForDataCopy(existingDstEntry, newEntry) + } - if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil { + // Pass o_excl = !overwrite so the default copy refuses to replace an + // existing destination, while overwrite=true updates the pre-created target. + if createErr := fs.filer.CreateEntry(ctx, newEntry, !overwrite, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil { err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr) writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -112,11 +177,15 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http. glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath) + fs.rememberRecentCopyRequest(preconditions.requestID, copyFingerprint) + setCopyResponseHeaders(w, preconditions.requestID) w.WriteHeader(http.StatusNoContent) } // copyEntry creates a new entry with copied content and chunks func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) { + now := time.Now() + // Create the base entry structure // Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter // This creates an independent copy rather than another hard link to the same content @@ -126,6 +195,8 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst Attr: func(a filer.Attr) filer.Attr { a.GroupNames = append([]string(nil), a.GroupNames...) a.Md5 = append([]byte(nil), a.Md5...) + a.Crtime = now + a.Mtime = now return a }(srcEntry.Attr), Quota: srcEntry.Quota, @@ -201,6 +272,208 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst return newEntry, nil } +func preserveDestinationMetadataForDataCopy(dstEntry, copiedEntry *filer.Entry) { + if dstEntry == nil || copiedEntry == nil { + return + } + + copiedEntry.Mode = dstEntry.Mode + copiedEntry.Uid = dstEntry.Uid + copiedEntry.Gid = dstEntry.Gid + copiedEntry.TtlSec = dstEntry.TtlSec + copiedEntry.UserName = dstEntry.UserName + copiedEntry.GroupNames = append([]string(nil), dstEntry.GroupNames...) + copiedEntry.SymlinkTarget = dstEntry.SymlinkTarget + copiedEntry.Rdev = dstEntry.Rdev + copiedEntry.Crtime = dstEntry.Crtime + copiedEntry.Inode = dstEntry.Inode + copiedEntry.Mtime = time.Now() + copiedEntry.Extended = cloneEntryExtended(dstEntry.Extended) + copiedEntry.Remote = cloneRemoteEntry(dstEntry.Remote) + copiedEntry.Quota = dstEntry.Quota + copiedEntry.WORMEnforcedAtTsNs = dstEntry.WORMEnforcedAtTsNs + copiedEntry.HardLinkId = append(filer.HardLinkId(nil), dstEntry.HardLinkId...) + copiedEntry.HardLinkCounter = dstEntry.HardLinkCounter +} + +func cloneEntryExtended(extended map[string][]byte) map[string][]byte { + if extended == nil { + return nil + } + cloned := make(map[string][]byte, len(extended)) + for k, v := range extended { + cloned[k] = append([]byte(nil), v...) + } + return cloned +} + +func cloneRemoteEntry(remote *filer_pb.RemoteEntry) *filer_pb.RemoteEntry { + if remote == nil { + return nil + } + return proto.Clone(remote).(*filer_pb.RemoteEntry) +} + +func parseCopyRequestPreconditions(values url.Values) (copyRequestPreconditions, error) { + var preconditions copyRequestPreconditions + var err error + + preconditions.requestID = values.Get(filer.CopyQueryParamRequestID) + if preconditions.srcInode, err = parseOptionalUint64(values, filer.CopyQueryParamSourceInode); err != nil { + return copyRequestPreconditions{}, err + } + if preconditions.srcMtime, err = parseOptionalInt64(values, filer.CopyQueryParamSourceMtime); err != nil { + return copyRequestPreconditions{}, err + } + if preconditions.srcSize, err = parseOptionalInt64(values, filer.CopyQueryParamSourceSize); err != nil { + return copyRequestPreconditions{}, err + } + if preconditions.dstInode, err = parseOptionalUint64(values, filer.CopyQueryParamDestinationInode); err != nil { + return copyRequestPreconditions{}, err + } + if preconditions.dstMtime, err = parseOptionalInt64(values, filer.CopyQueryParamDestinationMtime); err != nil { + return copyRequestPreconditions{}, err + } + if preconditions.dstSize, err = parseOptionalInt64(values, filer.CopyQueryParamDestinationSize); err != nil { + return copyRequestPreconditions{}, err + } + + return preconditions, nil +} + +func parseOptionalUint64(values url.Values, key string) (*uint64, error) { + raw := values.Get(key) + if raw == "" { + return nil, nil + } + value, err := strconv.ParseUint(raw, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid %s: %w", key, err) + } + return &value, nil +} + +func parseOptionalInt64(values url.Values, key string) (*int64, error) { + raw := values.Get(key) + if raw == "" { + return nil, nil + } + value, err := strconv.ParseInt(raw, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid %s: %w", key, err) + } + return &value, nil +} + +func validateCopySourcePreconditions(preconditions copyRequestPreconditions, srcEntry *filer.Entry) error { + if srcEntry == nil { + return fmt.Errorf("source entry disappeared") + } + if preconditions.srcInode != nil && srcEntry.Inode != *preconditions.srcInode { + return fmt.Errorf("source inode changed from %d to %d", *preconditions.srcInode, srcEntry.Inode) + } + if preconditions.srcMtime != nil && srcEntry.Mtime.Unix() != *preconditions.srcMtime { + return fmt.Errorf("source mtime changed from %d to %d", *preconditions.srcMtime, srcEntry.Mtime.Unix()) + } + if preconditions.srcSize != nil && int64(srcEntry.Size()) != *preconditions.srcSize { + return fmt.Errorf("source size changed from %d to %d", *preconditions.srcSize, srcEntry.Size()) + } + return nil +} + +func validateCopyDestinationPreconditions(preconditions copyRequestPreconditions, dstEntry *filer.Entry) error { + if preconditions.dstInode == nil && preconditions.dstMtime == nil && preconditions.dstSize == nil { + return nil + } + if dstEntry == nil { + return fmt.Errorf("destination entry disappeared") + } + if preconditions.dstInode != nil && dstEntry.Inode != *preconditions.dstInode { + return fmt.Errorf("destination inode changed from %d to %d", *preconditions.dstInode, dstEntry.Inode) + } + if preconditions.dstMtime != nil && dstEntry.Mtime.Unix() != *preconditions.dstMtime { + return fmt.Errorf("destination mtime changed from %d to %d", *preconditions.dstMtime, dstEntry.Mtime.Unix()) + } + if preconditions.dstSize != nil && int64(dstEntry.Size()) != *preconditions.dstSize { + return fmt.Errorf("destination size changed from %d to %d", *preconditions.dstSize, dstEntry.Size()) + } + return nil +} + +func (preconditions copyRequestPreconditions) fingerprint(srcPath, dstPath util.FullPath) string { + return fmt.Sprintf( + "%s|%s|%s|%s|%s|%s|%s|%s|%s", + srcPath, + dstPath, + formatOptionalUint64(preconditions.srcInode), + formatOptionalInt64(preconditions.srcMtime), + formatOptionalInt64(preconditions.srcSize), + formatOptionalUint64(preconditions.dstInode), + formatOptionalInt64(preconditions.dstMtime), + formatOptionalInt64(preconditions.dstSize), + preconditions.requestID, + ) +} + +func formatOptionalUint64(value *uint64) string { + if value == nil { + return "" + } + return strconv.FormatUint(*value, 10) +} + +func formatOptionalInt64(value *int64) string { + if value == nil { + return "" + } + return strconv.FormatInt(*value, 10) +} + +func (fs *FilerServer) handleRecentCopyRequest(requestID, fingerprint string) (bool, error) { + if requestID == "" { + return false, nil + } + + fs.recentCopyRequestsMu.Lock() + defer fs.recentCopyRequestsMu.Unlock() + + now := time.Now() + for id, request := range fs.recentCopyRequests { + if now.After(request.expiresAt) { + delete(fs.recentCopyRequests, id) + } + } + + request, found := fs.recentCopyRequests[requestID] + if !found { + return false, nil + } + if request.fingerprint != fingerprint { + return false, fmt.Errorf("copy request id %q was already used for a different copy", requestID) + } + return true, nil +} + +func (fs *FilerServer) rememberRecentCopyRequest(requestID, fingerprint string) { + if requestID == "" { + return + } + + fs.recentCopyRequestsMu.Lock() + defer fs.recentCopyRequestsMu.Unlock() + fs.recentCopyRequests[requestID] = recentCopyRequest{ + fingerprint: fingerprint, + expiresAt: time.Now().Add(recentCopyRequestTTL), + } +} + +func setCopyResponseHeaders(w http.ResponseWriter, requestID string) { + w.Header().Set(filer.CopyResponseHeaderCommitted, "true") + if requestID != "" { + w.Header().Set(filer.CopyResponseHeaderRequestID, requestID) + } +} + // copyChunks creates new chunks by copying data from source chunks using parallel streaming approach func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) { if len(srcChunks) == 0 { diff --git a/weed/server/filer_server_handlers_copy_test.go b/weed/server/filer_server_handlers_copy_test.go new file mode 100644 index 000000000..9c0153be7 --- /dev/null +++ b/weed/server/filer_server_handlers_copy_test.go @@ -0,0 +1,223 @@ +package weed_server + +import ( + "context" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestCopyEntryRefreshesDestinationTimestamps(t *testing.T) { + fs := &FilerServer{} + + oldTime := time.Unix(123, 0) + srcEntry := &filer.Entry{ + FullPath: util.FullPath("/src.txt"), + Attr: filer.Attr{ + Mtime: oldTime, + Crtime: oldTime, + }, + Content: []byte("hello"), + } + + before := time.Now().Add(-time.Second) + copied, err := fs.copyEntry(context.Background(), srcEntry, util.FullPath("/dst.txt"), nil) + after := time.Now().Add(time.Second) + if err != nil { + t.Fatalf("copyEntry: %v", err) + } + + if copied.Crtime.Before(before) || copied.Crtime.After(after) { + t.Fatalf("copied Crtime = %v, want between %v and %v", copied.Crtime, before, after) + } + if copied.Mtime.Before(before) || copied.Mtime.After(after) { + t.Fatalf("copied Mtime = %v, want between %v and %v", copied.Mtime, before, after) + } + if copied.Crtime.Equal(oldTime) || copied.Mtime.Equal(oldTime) { + t.Fatalf("destination timestamps should differ from source timestamps: src=%v copied=(%v,%v)", oldTime, copied.Crtime, copied.Mtime) + } +} + +func TestPreserveDestinationMetadataForDataCopy(t *testing.T) { + dstTime := time.Unix(100, 0) + srcTime := time.Unix(200, 0) + + dstEntry := &filer.Entry{ + FullPath: util.FullPath("/dst.txt"), + Attr: filer.Attr{ + Mtime: dstTime, + Crtime: dstTime, + Mode: 0100600, + Uid: 101, + Gid: 202, + TtlSec: 17, + UserName: "dst-user", + GroupNames: []string{"dst-group"}, + SymlinkTarget: "", + Rdev: 9, + Inode: 1234, + }, + Extended: map[string][]byte{ + "user.color": []byte("blue"), + }, + Remote: &filer_pb.RemoteEntry{ + StorageName: "remote-store", + RemoteETag: "remote-etag", + RemoteSize: 7, + }, + Quota: 77, + WORMEnforcedAtTsNs: 88, + HardLinkId: filer.HardLinkId([]byte("hard-link")), + HardLinkCounter: 3, + } + copiedEntry := &filer.Entry{ + FullPath: util.FullPath("/dst.txt"), + Attr: filer.Attr{ + Mtime: srcTime, + Crtime: srcTime, + Mode: 0100644, + Uid: 11, + Gid: 22, + Mime: "text/plain", + Md5: []byte("source-md5"), + FileSize: 5, + UserName: "src-user", + GroupNames: []string{"src-group"}, + }, + Content: []byte("hello"), + Extended: map[string][]byte{ + "user.color": []byte("red"), + }, + Quota: 5, + } + + before := time.Now().Add(-time.Second) + preserveDestinationMetadataForDataCopy(dstEntry, copiedEntry) + after := time.Now().Add(time.Second) + + if copiedEntry.Mode != dstEntry.Mode || copiedEntry.Uid != dstEntry.Uid || copiedEntry.Gid != dstEntry.Gid { + t.Fatalf("destination ownership/mode not preserved: got mode=%#o uid=%d gid=%d", copiedEntry.Mode, copiedEntry.Uid, copiedEntry.Gid) + } + if copiedEntry.Crtime != dstEntry.Crtime || copiedEntry.Inode != dstEntry.Inode { + t.Fatalf("destination identity not preserved: got crtime=%v inode=%d", copiedEntry.Crtime, copiedEntry.Inode) + } + if copiedEntry.Mtime.Before(before) || copiedEntry.Mtime.After(after) { + t.Fatalf("destination mtime = %v, want between %v and %v", copiedEntry.Mtime, before, after) + } + if copiedEntry.FileSize != 5 || copiedEntry.Mime != "text/plain" || string(copiedEntry.Md5) != "source-md5" { + t.Fatalf("copied data attributes changed unexpectedly: size=%d mime=%q md5=%q", copiedEntry.FileSize, copiedEntry.Mime, string(copiedEntry.Md5)) + } + if string(copiedEntry.Extended["user.color"]) != "blue" { + t.Fatalf("destination xattrs not preserved: got %q", string(copiedEntry.Extended["user.color"])) + } + if copiedEntry.Remote == nil || copiedEntry.Remote.StorageName != "remote-store" { + t.Fatalf("destination remote metadata not preserved: %+v", copiedEntry.Remote) + } + if copiedEntry.Quota != 77 || copiedEntry.WORMEnforcedAtTsNs != 88 { + t.Fatalf("destination quota/WORM not preserved: quota=%d worm=%d", copiedEntry.Quota, copiedEntry.WORMEnforcedAtTsNs) + } + if string(copiedEntry.HardLinkId) != "hard-link" || copiedEntry.HardLinkCounter != 3 { + t.Fatalf("destination hard-link metadata not preserved: id=%q count=%d", string(copiedEntry.HardLinkId), copiedEntry.HardLinkCounter) + } + + dstEntry.GroupNames[0] = "mutated" + dstEntry.Extended["user.color"][0] = 'g' + dstEntry.Remote.StorageName = "mutated-remote" + if copiedEntry.GroupNames[0] != "dst-group" { + t.Fatalf("group names should be cloned, got %q", copiedEntry.GroupNames[0]) + } + if string(copiedEntry.Extended["user.color"]) != "blue" { + t.Fatalf("extended metadata should be cloned, got %q", string(copiedEntry.Extended["user.color"])) + } + if copiedEntry.Remote.StorageName != "remote-store" { + t.Fatalf("remote metadata should be cloned, got %q", copiedEntry.Remote.StorageName) + } +} + +func TestValidateCopySourcePreconditions(t *testing.T) { + srcInode := uint64(101) + srcMtime := int64(200) + srcSize := int64(5) + preconditions := copyRequestPreconditions{ + srcInode: &srcInode, + srcMtime: &srcMtime, + srcSize: &srcSize, + } + + srcEntry := &filer.Entry{ + FullPath: util.FullPath("/src.txt"), + Attr: filer.Attr{ + Inode: srcInode, + Mtime: time.Unix(srcMtime, 0), + FileSize: uint64(srcSize), + }, + Content: []byte("hello"), + } + + if err := validateCopySourcePreconditions(preconditions, srcEntry); err != nil { + t.Fatalf("validate source preconditions: %v", err) + } + + changedSize := int64(6) + preconditions.srcSize = &changedSize + if err := validateCopySourcePreconditions(preconditions, srcEntry); err == nil { + t.Fatal("expected source size mismatch to fail") + } +} + +func TestValidateCopyDestinationPreconditions(t *testing.T) { + dstInode := uint64(202) + dstMtime := int64(300) + dstSize := int64(0) + preconditions := copyRequestPreconditions{ + dstInode: &dstInode, + dstMtime: &dstMtime, + dstSize: &dstSize, + } + + dstEntry := &filer.Entry{ + FullPath: util.FullPath("/dst.txt"), + Attr: filer.Attr{ + Inode: dstInode, + Mtime: time.Unix(dstMtime, 0), + FileSize: uint64(dstSize), + }, + } + + if err := validateCopyDestinationPreconditions(preconditions, dstEntry); err != nil { + t.Fatalf("validate destination preconditions: %v", err) + } + + if err := validateCopyDestinationPreconditions(preconditions, nil); err == nil { + t.Fatal("expected missing destination to fail") + } +} + +func TestRecentCopyRequestDeduplicatesByRequestID(t *testing.T) { + fs := &FilerServer{ + recentCopyRequests: make(map[string]recentCopyRequest), + } + + requestID := "copy-req" + fingerprint := "src|dst|1" + fs.rememberRecentCopyRequest(requestID, fingerprint) + + handled, err := fs.handleRecentCopyRequest(requestID, fingerprint) + if err != nil { + t.Fatalf("handle recent copy request: %v", err) + } + if !handled { + t.Fatal("expected recent copy request to be recognized") + } + + handled, err = fs.handleRecentCopyRequest(requestID, "different") + if err == nil { + t.Fatal("expected reused request id with different fingerprint to fail") + } + if handled { + t.Fatal("reused request id with different fingerprint should not be treated as handled") + } +}