From af3300e063cc922f554b5fed873458bfd5b74fce Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 Aug 2025 16:40:46 -0700 Subject: [PATCH] filer: server side copying (#7121) * copy * address comments * remove unused functions, reuse http clients * address hardlink, checking existing directory * destination is directory * check for the key's existence in the map first before accessing its members * address comments * deep copy remote entry * address comments * copying chunks in parallel * handle manifest chunks * address comments * errgroup * there could be large chunks * address comments * address comments --- weed/server/filer_server_handlers_copy.go | 550 +++++++++++++++++++++ weed/server/filer_server_handlers_write.go | 2 + 2 files changed, 552 insertions(+) create mode 100644 weed/server/filer_server_handlers_copy.go diff --git a/weed/server/filer_server_handlers_copy.go b/weed/server/filer_server_handlers_copy.go new file mode 100644 index 000000000..063d30439 --- /dev/null +++ b/weed/server/filer_server_handlers_copy.go @@ -0,0 +1,550 @@ +package weed_server + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "time" + + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { + src := r.URL.Query().Get("cp.from") + dst := r.URL.Path + + glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst) + + var err error + if src, err = clearName(src); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + if dst, err = clearName(dst); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + src = strings.TrimRight(src, "/") + if src == "" { + err = fmt.Errorf("invalid source '/'") + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + srcPath := util.FullPath(src) + dstPath := util.FullPath(dst) + if dstPath.IsLongerFileName(so.MaxFileNameLength) { + err = fmt.Errorf("dst name too long") + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + srcEntry, err := fs.filer.FindEntry(ctx, srcPath) + if err != nil { + err = fmt.Errorf("failed to get src entry '%s': %w", src, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + glog.V(1).InfofCtx(ctx, "FilerServer.copy source entry: content_len=%d, chunks_len=%d", len(srcEntry.Content), len(srcEntry.GetChunks())) + + // Check if source is a directory - currently not supported for recursive copying + if srcEntry.IsDirectory() { + err = fmt.Errorf("copy: directory copying not yet supported for '%s'", src) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + _, oldName := srcPath.DirAndName() + finalDstPath := dstPath + + // Check if destination is a directory + dstPathEntry, findErr := fs.filer.FindEntry(ctx, dstPath) + if findErr != nil && findErr != filer_pb.ErrNotFound { + err = fmt.Errorf("failed to check destination path %s: %w", dstPath, findErr) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + if findErr == nil && dstPathEntry.IsDirectory() { + finalDstPath = dstPath.Child(oldName) + } else { + newDir, newName := dstPath.DirAndName() + newName = util.Nvl(newName, oldName) + finalDstPath = util.FullPath(newDir).Child(newName) + } + + // Check if destination file already exists + // TODO: add an overwrite parameter to allow overwriting + if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound { + err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } else if dstEntry != nil { + err = fmt.Errorf("destination file %s already exists", finalDstPath) + writeJsonError(w, r, http.StatusConflict, err) + return + } + + // Copy the file content and chunks + newEntry, err := fs.copyEntry(ctx, srcEntry, finalDstPath, so) + if err != nil { + err = fmt.Errorf("failed to copy entry from '%s' to '%s': %w", src, dst, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil { + err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath) + + w.WriteHeader(http.StatusNoContent) +} + +// copyEntry creates a new entry with copied content and chunks +func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) { + // Create the base entry structure + // Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter + // This creates an independent copy rather than another hard link to the same content + newEntry := &filer.Entry{ + FullPath: dstPath, + // Deep copy Attr field to ensure slice independence (GroupNames, Md5) + Attr: func(a filer.Attr) filer.Attr { + a.GroupNames = append([]string(nil), a.GroupNames...) + a.Md5 = append([]byte(nil), a.Md5...) + return a + }(srcEntry.Attr), + Quota: srcEntry.Quota, + // Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy + } + + // Deep copy Extended fields to ensure independence + if srcEntry.Extended != nil { + newEntry.Extended = make(map[string][]byte, len(srcEntry.Extended)) + for k, v := range srcEntry.Extended { + newEntry.Extended[k] = append([]byte(nil), v...) + } + } + + // Deep copy Remote field to ensure independence + if srcEntry.Remote != nil { + newEntry.Remote = &filer_pb.RemoteEntry{ + StorageName: srcEntry.Remote.StorageName, + LastLocalSyncTsNs: srcEntry.Remote.LastLocalSyncTsNs, + RemoteETag: srcEntry.Remote.RemoteETag, + RemoteMtime: srcEntry.Remote.RemoteMtime, + RemoteSize: srcEntry.Remote.RemoteSize, + } + } + + // Log if we're copying a hard link so we can track this behavior + if len(srcEntry.HardLinkId) > 0 { + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file", srcEntry.FullPath, srcEntry.HardLinkCounter) + } + + // Handle small files stored in Content field + if len(srcEntry.Content) > 0 { + // For small files, just copy the content directly + newEntry.Content = make([]byte, len(srcEntry.Content)) + copy(newEntry.Content, srcEntry.Content) + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied content directly, size=%d", len(newEntry.Content)) + return newEntry, nil + } + + // Handle files stored as chunks (including resolved hard link content) + if len(srcEntry.GetChunks()) > 0 { + srcChunks := srcEntry.GetChunks() + + // Check if any chunks are manifest chunks - these require special handling + if filer.HasChunkManifest(srcChunks) { + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks") + newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so) + if err != nil { + return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err) + } + newEntry.Chunks = newChunks + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks)) + } else { + // Regular chunks without manifest - copy directly + newChunks, err := fs.copyChunks(ctx, srcChunks, so) + if err != nil { + return nil, fmt.Errorf("failed to copy chunks: %w", err) + } + newEntry.Chunks = newChunks + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied regular chunks, count=%d", len(newChunks)) + } + return newEntry, nil + } + + // Empty file case (or hard link with no content - should not happen if hard link was properly resolved) + if len(srcEntry.HardLinkId) > 0 { + glog.WarningfCtx(ctx, "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution", srcEntry.FullPath) + } + glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: empty file, no content or chunks to copy") + return newEntry, nil +} + +// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach +func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { + if len(srcChunks) == 0 { + return nil, nil + } + + // Create HTTP client once for reuse across all chunk copies + client := &http.Client{Timeout: 60 * time.Second} + + // Optimize: Batch volume lookup for all chunks to reduce RPC calls + volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks) + if err != nil { + return nil, fmt.Errorf("failed to lookup volume locations: %w", err) + } + + // Parallel chunk copying with concurrency control using errgroup + const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations + + // Pre-allocate result slice to maintain order + newChunks := make([]*filer_pb.FileChunk, len(srcChunks)) + + // Use errgroup for cleaner concurrency management + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrentChunks) // Limit concurrent goroutines + + // Validate that all chunk locations are available before starting any concurrent work + for _, chunk := range srcChunks { + volumeId := chunk.Fid.VolumeId + locations, ok := volumeLocationsMap[volumeId] + if !ok || len(locations) == 0 { + return nil, fmt.Errorf("no locations found for volume %d", volumeId) + } + } + + glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d", len(srcChunks), maxConcurrentChunks) + + // Launch goroutines for each chunk + for i, srcChunk := range srcChunks { + // Capture loop variables for goroutine closure + chunkIndex := i + chunk := srcChunk + chunkLocations := volumeLocationsMap[srcChunk.Fid.VolumeId] + + g.Go(func() error { + glog.V(3).InfofCtx(gCtx, "FilerServer.copyChunks: copying chunk %d/%d, size=%d", chunkIndex+1, len(srcChunks), chunk.Size) + + // Use streaming copy to avoid loading entire chunk into memory + newChunk, err := fs.streamCopyChunk(gCtx, chunk, so, client, chunkLocations) + if err != nil { + return fmt.Errorf("failed to copy chunk %d (%s): %w", chunkIndex+1, chunk.GetFileIdString(), err) + } + + // Store result at correct index to maintain order + newChunks[chunkIndex] = newChunk + + glog.V(4).InfofCtx(gCtx, "FilerServer.copyChunks: successfully copied chunk %d/%d", chunkIndex+1, len(srcChunks)) + return nil + }) + } + + // Wait for all chunks to complete and return first error (if any) + if err := g.Wait(); err != nil { + return nil, err + } + + // Verify all chunks were copied (shouldn't happen if no errors, but safety check) + for i, chunk := range newChunks { + if chunk == nil { + return nil, fmt.Errorf("chunk %d was not copied (internal error)", i) + } + } + + glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: successfully completed parallel copy of %d chunks", len(srcChunks)) + return newChunks, nil +} + +// copyChunksWithManifest handles copying chunks that include manifest chunks +func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { + if len(srcChunks) == 0 { + return nil, nil + } + + glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)", len(srcChunks)) + + // Separate manifest chunks from regular data chunks + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(srcChunks) + + var newChunks []*filer_pb.FileChunk + + // First, copy all non-manifest chunks directly + if len(nonManifestChunks) > 0 { + glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks)) + newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so) + if err != nil { + return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err) + } + newChunks = append(newChunks, newNonManifestChunks...) + } + + // Process each manifest chunk separately + for i, manifestChunk := range manifestChunks { + glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d", i+1, len(manifestChunks)) + + // Resolve the manifest chunk to get the actual data chunks it references + lookupFileIdFn := func(ctx context.Context, fileId string) (urls []string, err error) { + return fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId) + } + + resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFileIdFn, manifestChunk) + if err != nil { + return nil, fmt.Errorf("failed to resolve manifest chunk %s: %w", manifestChunk.GetFileIdString(), err) + } + + glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks", + manifestChunk.GetFileIdString(), len(resolvedChunks)) + + // Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests) + newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so) + if err != nil { + return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err) + } + + // Create a new manifest chunk that references the copied data chunks + newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so) + if err != nil { + return nil, fmt.Errorf("failed to create new manifest chunk: %w", err) + } + + newChunks = append(newChunks, newManifestChunk) + + glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks", + newManifestChunk.GetFileIdString(), len(newResolvedChunks)) + } + + glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)", + len(newChunks), len(manifestChunks), len(nonManifestChunks)) + + return newChunks, nil +} + +// createManifestChunk creates a new manifest chunk that references the provided data chunks +func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption) (*filer_pb.FileChunk, error) { + // Create the manifest data structure + filer_pb.BeforeEntrySerialization(dataChunks) + + manifestData := &filer_pb.FileChunkManifest{ + Chunks: dataChunks, + } + + // Serialize the manifest + data, err := proto.Marshal(manifestData) + if err != nil { + return nil, fmt.Errorf("failed to marshal manifest: %w", err) + } + + // Create HTTP client once for reuse + client := &http.Client{Timeout: 60 * time.Second} + + // Save the manifest data as a new chunk + saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + // Assign a new file ID + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) + if assignErr != nil { + return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr) + } + + // Upload the manifest data + err = fs.uploadData(ctx, reader, urlLocation, string(auth), client) + if err != nil { + return nil, fmt.Errorf("failed to upload manifest data: %w", err) + } + + // Create the chunk metadata + chunk = &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(len(data)), + } + return chunk, nil + } + + manifestChunk, err := saveFunc(bytes.NewReader(data), "", originalManifest.Offset, 0) + if err != nil { + return nil, fmt.Errorf("failed to save manifest chunk: %w", err) + } + + // Set manifest-specific properties + manifestChunk.IsChunkManifest = true + manifestChunk.Size = originalManifest.Size + + return manifestChunk, nil +} + +// uploadData uploads data to a volume server +func (fs *FilerServer) uploadData(ctx context.Context, reader io.Reader, urlLocation, auth string, client *http.Client) error { + req, err := http.NewRequestWithContext(ctx, "PUT", urlLocation, reader) + if err != nil { + return fmt.Errorf("failed to create upload request: %w", err) + } + + if auth != "" { + req.Header.Set("Authorization", "Bearer "+auth) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to upload data: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("upload failed with status %d, and failed to read response: %w", resp.StatusCode, readErr) + } + return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks +func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks []*filer_pb.FileChunk) (map[uint32][]operation.Location, error) { + // Collect unique volume IDs and their string representations to avoid repeated conversions + volumeIdMap := make(map[uint32]string) + for _, chunk := range chunks { + vid := chunk.Fid.VolumeId + if _, found := volumeIdMap[vid]; !found { + volumeIdMap[vid] = fmt.Sprintf("%d", vid) + } + } + + if len(volumeIdMap) == 0 { + return make(map[uint32][]operation.Location), nil + } + + // Convert to slice of strings for the lookup call + volumeIdStrs := make([]string, 0, len(volumeIdMap)) + for _, vidStr := range volumeIdMap { + volumeIdStrs = append(volumeIdStrs, vidStr) + } + + // Perform single batched lookup + lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster, fs.grpcDialOption, volumeIdStrs) + if err != nil { + return nil, fmt.Errorf("failed to lookup volumes: %w", err) + } + + // Convert result to map of volumeId -> locations + volumeLocationsMap := make(map[uint32][]operation.Location) + for volumeId, volumeIdStr := range volumeIdMap { + if volumeLocations, ok := lookupResult[volumeIdStr]; ok && len(volumeLocations.Locations) > 0 { + volumeLocationsMap[volumeId] = volumeLocations.Locations + } + } + + return volumeLocationsMap, nil +} + +// streamCopyChunk copies a chunk using streaming to minimize memory usage +func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) { + // Assign a new file ID for destination + fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so) + if err != nil { + return nil, fmt.Errorf("failed to assign new file ID: %w", err) + } + + // Try all available locations for source chunk until one succeeds + fileIdString := srcChunk.GetFileIdString() + var lastErr error + + for i, location := range locations { + srcUrl := fmt.Sprintf("http://%s/%s", location.Url, fileIdString) + glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)", srcUrl, urlLocation, i+1, len(locations)) + + // Perform streaming copy using HTTP client + err := fs.performStreamCopy(ctx, srcUrl, urlLocation, string(auth), srcChunk.Size, client) + if err != nil { + lastErr = err + glog.V(2).InfofCtx(ctx, "FilerServer.streamCopyChunk: failed streaming copy from %s: %v", srcUrl, err) + continue + } + + // Success - create chunk metadata + newChunk := &filer_pb.FileChunk{ + FileId: fileId, + Offset: srcChunk.Offset, + Size: srcChunk.Size, + ETag: srcChunk.ETag, + } + + glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: successfully streamed %d bytes", srcChunk.Size) + return newChunk, nil + } + + // All locations failed + return nil, fmt.Errorf("failed to stream copy chunk from any location: %w", lastErr) +} + +// performStreamCopy performs the actual streaming copy from source URL to destination URL +func (fs *FilerServer) performStreamCopy(ctx context.Context, srcUrl, dstUrl, auth string, expectedSize uint64, client *http.Client) error { + // Create HTTP request to read from source + req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil) + if err != nil { + return fmt.Errorf("failed to create source request: %v", err) + } + + // Perform source request + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to read from source: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("source returned status %d", resp.StatusCode) + } + + // Create HTTP request to write to destination + dstReq, err := http.NewRequestWithContext(ctx, "PUT", dstUrl, resp.Body) + if err != nil { + return fmt.Errorf("failed to create destination request: %v", err) + } + dstReq.ContentLength = int64(expectedSize) + + // Set authorization header if provided + if auth != "" { + dstReq.Header.Set("Authorization", "Bearer "+auth) + } + dstReq.Header.Set("Content-Type", "application/octet-stream") + + // Perform destination request + dstResp, err := client.Do(dstReq) + if err != nil { + return fmt.Errorf("failed to write to destination: %v", err) + } + defer dstResp.Body.Close() + + if dstResp.StatusCode != http.StatusCreated && dstResp.StatusCode != http.StatusOK { + // Read error response body for more details + body, readErr := io.ReadAll(dstResp.Body) + if readErr != nil { + return fmt.Errorf("destination returned status %d, and failed to read body: %w", dstResp.StatusCode, readErr) + } + return fmt.Errorf("destination returned status %d: %s", dstResp.StatusCode, string(body)) + } + + glog.V(4).InfofCtx(ctx, "FilerServer.performStreamCopy: successfully streamed data from %s to %s", srcUrl, dstUrl) + return nil +} diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index cdbac0abb..923f2c0eb 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -116,6 +116,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte if query.Has("mv.from") { fs.move(ctx, w, r, so) + } else if query.Has("cp.from") { + fs.copy(ctx, w, r, so) } else { fs.autoChunk(ctx, w, r, contentLength, so) }