You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							547 lines
						
					
					
						
							20 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							547 lines
						
					
					
						
							20 KiB
						
					
					
				
								package weed_server
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"bytes"
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"io"
							 | 
						|
									"net/http"
							 | 
						|
									"strings"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"golang.org/x/sync/errgroup"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/operation"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
							 | 
						|
									src := r.URL.Query().Get("cp.from")
							 | 
						|
									dst := r.URL.Path
							 | 
						|
								
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst)
							 | 
						|
								
							 | 
						|
									var err error
							 | 
						|
									if src, err = clearName(src); err != nil {
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
									if dst, err = clearName(dst); err != nil {
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
									src = strings.TrimRight(src, "/")
							 | 
						|
									if src == "" {
							 | 
						|
										err = fmt.Errorf("invalid source '/'")
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									srcPath := util.FullPath(src)
							 | 
						|
									dstPath := util.FullPath(dst)
							 | 
						|
									if dstPath.IsLongerFileName(so.MaxFileNameLength) {
							 | 
						|
										err = fmt.Errorf("dst name too long")
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
							 | 
						|
									if err != nil {
							 | 
						|
										err = fmt.Errorf("failed to get src entry '%s': %w", src, err)
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(1).InfofCtx(ctx, "FilerServer.copy source entry: content_len=%d, chunks_len=%d", len(srcEntry.Content), len(srcEntry.GetChunks()))
							 | 
						|
								
							 | 
						|
									// Check if source is a directory - currently not supported for recursive copying
							 | 
						|
									if srcEntry.IsDirectory() {
							 | 
						|
										err = fmt.Errorf("copy: directory copying not yet supported for '%s'", src)
							 | 
						|
										writeJsonError(w, r, http.StatusBadRequest, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									_, oldName := srcPath.DirAndName()
							 | 
						|
									finalDstPath := dstPath
							 | 
						|
								
							 | 
						|
									// Check if destination is a directory
							 | 
						|
									dstPathEntry, findErr := fs.filer.FindEntry(ctx, dstPath)
							 | 
						|
									if findErr != nil && findErr != filer_pb.ErrNotFound {
							 | 
						|
										err = fmt.Errorf("failed to check destination path %s: %w", dstPath, findErr)
							 | 
						|
										writeJsonError(w, r, http.StatusInternalServerError, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if findErr == nil && dstPathEntry.IsDirectory() {
							 | 
						|
										finalDstPath = dstPath.Child(oldName)
							 | 
						|
									} else {
							 | 
						|
										newDir, newName := dstPath.DirAndName()
							 | 
						|
										newName = util.Nvl(newName, oldName)
							 | 
						|
										finalDstPath = util.FullPath(newDir).Child(newName)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check if destination file already exists
							 | 
						|
									// TODO: add an overwrite parameter to allow overwriting
							 | 
						|
									if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound {
							 | 
						|
										err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err)
							 | 
						|
										writeJsonError(w, r, http.StatusInternalServerError, err)
							 | 
						|
										return
							 | 
						|
									} else if dstEntry != nil {
							 | 
						|
										err = fmt.Errorf("destination file %s already exists", finalDstPath)
							 | 
						|
										writeJsonError(w, r, http.StatusConflict, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Copy the file content and chunks
							 | 
						|
									newEntry, err := fs.copyEntry(ctx, srcEntry, finalDstPath, so)
							 | 
						|
									if err != nil {
							 | 
						|
										err = fmt.Errorf("failed to copy entry from '%s' to '%s': %w", src, dst, err)
							 | 
						|
										writeJsonError(w, r, http.StatusInternalServerError, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
							 | 
						|
										err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
							 | 
						|
										writeJsonError(w, r, http.StatusInternalServerError, err)
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath)
							 | 
						|
								
							 | 
						|
									w.WriteHeader(http.StatusNoContent)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// copyEntry creates a new entry with copied content and chunks
							 | 
						|
								func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) {
							 | 
						|
									// Create the base entry structure
							 | 
						|
									// Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
							 | 
						|
									// This creates an independent copy rather than another hard link to the same content
							 | 
						|
									newEntry := &filer.Entry{
							 | 
						|
										FullPath: dstPath,
							 | 
						|
										// Deep copy Attr field to ensure slice independence (GroupNames, Md5)
							 | 
						|
										Attr: func(a filer.Attr) filer.Attr {
							 | 
						|
											a.GroupNames = append([]string(nil), a.GroupNames...)
							 | 
						|
											a.Md5 = append([]byte(nil), a.Md5...)
							 | 
						|
											return a
							 | 
						|
										}(srcEntry.Attr),
							 | 
						|
										Quota: srcEntry.Quota,
							 | 
						|
										// Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Deep copy Extended fields to ensure independence
							 | 
						|
									if srcEntry.Extended != nil {
							 | 
						|
										newEntry.Extended = make(map[string][]byte, len(srcEntry.Extended))
							 | 
						|
										for k, v := range srcEntry.Extended {
							 | 
						|
											newEntry.Extended[k] = append([]byte(nil), v...)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Deep copy Remote field to ensure independence
							 | 
						|
									if srcEntry.Remote != nil {
							 | 
						|
										newEntry.Remote = &filer_pb.RemoteEntry{
							 | 
						|
											StorageName:       srcEntry.Remote.StorageName,
							 | 
						|
											LastLocalSyncTsNs: srcEntry.Remote.LastLocalSyncTsNs,
							 | 
						|
											RemoteETag:        srcEntry.Remote.RemoteETag,
							 | 
						|
											RemoteMtime:       srcEntry.Remote.RemoteMtime,
							 | 
						|
											RemoteSize:        srcEntry.Remote.RemoteSize,
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Log if we're copying a hard link so we can track this behavior
							 | 
						|
									if len(srcEntry.HardLinkId) > 0 {
							 | 
						|
										glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file", srcEntry.FullPath, srcEntry.HardLinkCounter)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Handle small files stored in Content field
							 | 
						|
									if len(srcEntry.Content) > 0 {
							 | 
						|
										// For small files, just copy the content directly
							 | 
						|
										newEntry.Content = make([]byte, len(srcEntry.Content))
							 | 
						|
										copy(newEntry.Content, srcEntry.Content)
							 | 
						|
										glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied content directly, size=%d", len(newEntry.Content))
							 | 
						|
										return newEntry, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Handle files stored as chunks (including resolved hard link content)
							 | 
						|
									if len(srcEntry.GetChunks()) > 0 {
							 | 
						|
										srcChunks := srcEntry.GetChunks()
							 | 
						|
								
							 | 
						|
										// Create HTTP client once for reuse across all chunk operations
							 | 
						|
										client := &http.Client{Timeout: 60 * time.Second}
							 | 
						|
								
							 | 
						|
										// Check if any chunks are manifest chunks - these require special handling
							 | 
						|
										if filer.HasChunkManifest(srcChunks) {
							 | 
						|
											glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
							 | 
						|
											newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so, client)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
							 | 
						|
											}
							 | 
						|
											newEntry.Chunks = newChunks
							 | 
						|
											glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
							 | 
						|
										} else {
							 | 
						|
											// Regular chunks without manifest - copy directly
							 | 
						|
											newChunks, err := fs.copyChunks(ctx, srcChunks, so, client)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("failed to copy chunks: %w", err)
							 | 
						|
											}
							 | 
						|
											newEntry.Chunks = newChunks
							 | 
						|
											glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied regular chunks, count=%d", len(newChunks))
							 | 
						|
										}
							 | 
						|
										return newEntry, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Empty file case (or hard link with no content - should not happen if hard link was properly resolved)
							 | 
						|
									if len(srcEntry.HardLinkId) > 0 {
							 | 
						|
										glog.WarningfCtx(ctx, "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution", srcEntry.FullPath)
							 | 
						|
									}
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: empty file, no content or chunks to copy")
							 | 
						|
									return newEntry, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
							 | 
						|
								func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
							 | 
						|
									if len(srcChunks) == 0 {
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Optimize: Batch volume lookup for all chunks to reduce RPC calls
							 | 
						|
									volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to lookup volume locations: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parallel chunk copying with concurrency control using errgroup
							 | 
						|
									const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations
							 | 
						|
								
							 | 
						|
									// Pre-allocate result slice to maintain order
							 | 
						|
									newChunks := make([]*filer_pb.FileChunk, len(srcChunks))
							 | 
						|
								
							 | 
						|
									// Use errgroup for cleaner concurrency management
							 | 
						|
									g, gCtx := errgroup.WithContext(ctx)
							 | 
						|
									g.SetLimit(maxConcurrentChunks) // Limit concurrent goroutines
							 | 
						|
								
							 | 
						|
									// Validate that all chunk locations are available before starting any concurrent work
							 | 
						|
									for _, chunk := range srcChunks {
							 | 
						|
										volumeId := chunk.Fid.VolumeId
							 | 
						|
										locations, ok := volumeLocationsMap[volumeId]
							 | 
						|
										if !ok || len(locations) == 0 {
							 | 
						|
											return nil, fmt.Errorf("no locations found for volume %d", volumeId)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d", len(srcChunks), maxConcurrentChunks)
							 | 
						|
								
							 | 
						|
									// Launch goroutines for each chunk
							 | 
						|
									for i, srcChunk := range srcChunks {
							 | 
						|
										// Capture loop variables for goroutine closure
							 | 
						|
										chunkIndex := i
							 | 
						|
										chunk := srcChunk
							 | 
						|
										chunkLocations := volumeLocationsMap[srcChunk.Fid.VolumeId]
							 | 
						|
								
							 | 
						|
										g.Go(func() error {
							 | 
						|
											glog.V(3).InfofCtx(gCtx, "FilerServer.copyChunks: copying chunk %d/%d, size=%d", chunkIndex+1, len(srcChunks), chunk.Size)
							 | 
						|
								
							 | 
						|
											// Use streaming copy to avoid loading entire chunk into memory
							 | 
						|
											newChunk, err := fs.streamCopyChunk(gCtx, chunk, so, client, chunkLocations)
							 | 
						|
											if err != nil {
							 | 
						|
												return fmt.Errorf("failed to copy chunk %d (%s): %w", chunkIndex+1, chunk.GetFileIdString(), err)
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Store result at correct index to maintain order
							 | 
						|
											newChunks[chunkIndex] = newChunk
							 | 
						|
								
							 | 
						|
											glog.V(4).InfofCtx(gCtx, "FilerServer.copyChunks: successfully copied chunk %d/%d", chunkIndex+1, len(srcChunks))
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Wait for all chunks to complete and return first error (if any)
							 | 
						|
									if err := g.Wait(); err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify all chunks were copied (shouldn't happen if no errors, but safety check)
							 | 
						|
									for i, chunk := range newChunks {
							 | 
						|
										if chunk == nil {
							 | 
						|
											return nil, fmt.Errorf("chunk %d was not copied (internal error)", i)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: successfully completed parallel copy of %d chunks", len(srcChunks))
							 | 
						|
									return newChunks, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// copyChunksWithManifest handles copying chunks that include manifest chunks
							 | 
						|
								func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
							 | 
						|
									if len(srcChunks) == 0 {
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)", len(srcChunks))
							 | 
						|
								
							 | 
						|
									// Separate manifest chunks from regular data chunks
							 | 
						|
									manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(srcChunks)
							 | 
						|
								
							 | 
						|
									var newChunks []*filer_pb.FileChunk
							 | 
						|
								
							 | 
						|
									// First, copy all non-manifest chunks directly
							 | 
						|
									if len(nonManifestChunks) > 0 {
							 | 
						|
										glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
							 | 
						|
										newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so, client)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
							 | 
						|
										}
							 | 
						|
										newChunks = append(newChunks, newNonManifestChunks...)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Process each manifest chunk separately
							 | 
						|
									for i, manifestChunk := range manifestChunks {
							 | 
						|
										glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d", i+1, len(manifestChunks))
							 | 
						|
								
							 | 
						|
										// Resolve the manifest chunk to get the actual data chunks it references
							 | 
						|
										lookupFileIdFn := func(ctx context.Context, fileId string) (urls []string, err error) {
							 | 
						|
											return fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFileIdFn, manifestChunk)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to resolve manifest chunk %s: %w", manifestChunk.GetFileIdString(), err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks",
							 | 
						|
											manifestChunk.GetFileIdString(), len(resolvedChunks))
							 | 
						|
								
							 | 
						|
										// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
							 | 
						|
										newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so, client)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Create a new manifest chunk that references the copied data chunks
							 | 
						|
										newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so, client)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										newChunks = append(newChunks, newManifestChunk)
							 | 
						|
								
							 | 
						|
										glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks",
							 | 
						|
											newManifestChunk.GetFileIdString(), len(newResolvedChunks))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)",
							 | 
						|
										len(newChunks), len(manifestChunks), len(nonManifestChunks))
							 | 
						|
								
							 | 
						|
									return newChunks, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createManifestChunk creates a new manifest chunk that references the provided data chunks
							 | 
						|
								func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) (*filer_pb.FileChunk, error) {
							 | 
						|
									// Create the manifest data structure
							 | 
						|
									filer_pb.BeforeEntrySerialization(dataChunks)
							 | 
						|
								
							 | 
						|
									manifestData := &filer_pb.FileChunkManifest{
							 | 
						|
										Chunks: dataChunks,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Serialize the manifest
							 | 
						|
									data, err := proto.Marshal(manifestData)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to marshal manifest: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Save the manifest data as a new chunk
							 | 
						|
									saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
							 | 
						|
										// Assign a new file ID
							 | 
						|
										fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
							 | 
						|
										if assignErr != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Upload the manifest data
							 | 
						|
										err = fs.uploadData(ctx, reader, urlLocation, string(auth), client)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("failed to upload manifest data: %w", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Create the chunk metadata
							 | 
						|
										chunk = &filer_pb.FileChunk{
							 | 
						|
											FileId: fileId,
							 | 
						|
											Offset: offset,
							 | 
						|
											Size:   uint64(len(data)),
							 | 
						|
										}
							 | 
						|
										return chunk, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									manifestChunk, err := saveFunc(bytes.NewReader(data), "", originalManifest.Offset, 0)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to save manifest chunk: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Set manifest-specific properties
							 | 
						|
									manifestChunk.IsChunkManifest = true
							 | 
						|
									manifestChunk.Size = originalManifest.Size
							 | 
						|
								
							 | 
						|
									return manifestChunk, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// uploadData uploads data to a volume server
							 | 
						|
								func (fs *FilerServer) uploadData(ctx context.Context, reader io.Reader, urlLocation, auth string, client *http.Client) error {
							 | 
						|
									req, err := http.NewRequestWithContext(ctx, "PUT", urlLocation, reader)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create upload request: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if auth != "" {
							 | 
						|
										req.Header.Set("Authorization", "Bearer "+auth)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									resp, err := client.Do(req)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to upload data: %w", err)
							 | 
						|
									}
							 | 
						|
									defer resp.Body.Close()
							 | 
						|
								
							 | 
						|
									if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
							 | 
						|
										body, readErr := io.ReadAll(resp.Body)
							 | 
						|
										if readErr != nil {
							 | 
						|
											return fmt.Errorf("upload failed with status %d, and failed to read response: %w", resp.StatusCode, readErr)
							 | 
						|
										}
							 | 
						|
										return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks
							 | 
						|
								func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks []*filer_pb.FileChunk) (map[uint32][]operation.Location, error) {
							 | 
						|
									// Collect unique volume IDs and their string representations to avoid repeated conversions
							 | 
						|
									volumeIdMap := make(map[uint32]string)
							 | 
						|
									for _, chunk := range chunks {
							 | 
						|
										vid := chunk.Fid.VolumeId
							 | 
						|
										if _, found := volumeIdMap[vid]; !found {
							 | 
						|
											volumeIdMap[vid] = fmt.Sprintf("%d", vid)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if len(volumeIdMap) == 0 {
							 | 
						|
										return make(map[uint32][]operation.Location), nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert to slice of strings for the lookup call
							 | 
						|
									volumeIdStrs := make([]string, 0, len(volumeIdMap))
							 | 
						|
									for _, vidStr := range volumeIdMap {
							 | 
						|
										volumeIdStrs = append(volumeIdStrs, vidStr)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Perform single batched lookup
							 | 
						|
									lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster, fs.grpcDialOption, volumeIdStrs)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to lookup volumes: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Convert result to map of volumeId -> locations
							 | 
						|
									volumeLocationsMap := make(map[uint32][]operation.Location)
							 | 
						|
									for volumeId, volumeIdStr := range volumeIdMap {
							 | 
						|
										if volumeLocations, ok := lookupResult[volumeIdStr]; ok && len(volumeLocations.Locations) > 0 {
							 | 
						|
											volumeLocationsMap[volumeId] = volumeLocations.Locations
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return volumeLocationsMap, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// streamCopyChunk copies a chunk using streaming to minimize memory usage
							 | 
						|
								func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) {
							 | 
						|
									// Assign a new file ID for destination
							 | 
						|
									fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to assign new file ID: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Try all available locations for source chunk until one succeeds
							 | 
						|
									fileIdString := srcChunk.GetFileIdString()
							 | 
						|
									var lastErr error
							 | 
						|
								
							 | 
						|
									for i, location := range locations {
							 | 
						|
										srcUrl := fmt.Sprintf("http://%s/%s", location.Url, fileIdString)
							 | 
						|
										glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)", srcUrl, urlLocation, i+1, len(locations))
							 | 
						|
								
							 | 
						|
										// Perform streaming copy using HTTP client
							 | 
						|
										err := fs.performStreamCopy(ctx, srcUrl, urlLocation, string(auth), srcChunk.Size, client)
							 | 
						|
										if err != nil {
							 | 
						|
											lastErr = err
							 | 
						|
											glog.V(2).InfofCtx(ctx, "FilerServer.streamCopyChunk: failed streaming copy from %s: %v", srcUrl, err)
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Success - create chunk metadata
							 | 
						|
										newChunk := &filer_pb.FileChunk{
							 | 
						|
											FileId: fileId,
							 | 
						|
											Offset: srcChunk.Offset,
							 | 
						|
											Size:   srcChunk.Size,
							 | 
						|
											ETag:   srcChunk.ETag,
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: successfully streamed %d bytes", srcChunk.Size)
							 | 
						|
										return newChunk, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// All locations failed
							 | 
						|
									return nil, fmt.Errorf("failed to stream copy chunk from any location: %w", lastErr)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// performStreamCopy performs the actual streaming copy from source URL to destination URL
							 | 
						|
								func (fs *FilerServer) performStreamCopy(ctx context.Context, srcUrl, dstUrl, auth string, expectedSize uint64, client *http.Client) error {
							 | 
						|
									// Create HTTP request to read from source
							 | 
						|
									req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create source request: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Perform source request
							 | 
						|
									resp, err := client.Do(req)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to read from source: %v", err)
							 | 
						|
									}
							 | 
						|
									defer resp.Body.Close()
							 | 
						|
								
							 | 
						|
									if resp.StatusCode != http.StatusOK {
							 | 
						|
										return fmt.Errorf("source returned status %d", resp.StatusCode)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create HTTP request to write to destination
							 | 
						|
									dstReq, err := http.NewRequestWithContext(ctx, "PUT", dstUrl, resp.Body)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create destination request: %v", err)
							 | 
						|
									}
							 | 
						|
									dstReq.ContentLength = int64(expectedSize)
							 | 
						|
								
							 | 
						|
									// Set authorization header if provided
							 | 
						|
									if auth != "" {
							 | 
						|
										dstReq.Header.Set("Authorization", "Bearer "+auth)
							 | 
						|
									}
							 | 
						|
									dstReq.Header.Set("Content-Type", "application/octet-stream")
							 | 
						|
								
							 | 
						|
									// Perform destination request
							 | 
						|
									dstResp, err := client.Do(dstReq)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to write to destination: %v", err)
							 | 
						|
									}
							 | 
						|
									defer dstResp.Body.Close()
							 | 
						|
								
							 | 
						|
									if dstResp.StatusCode != http.StatusCreated && dstResp.StatusCode != http.StatusOK {
							 | 
						|
										// Read error response body for more details
							 | 
						|
										body, readErr := io.ReadAll(dstResp.Body)
							 | 
						|
										if readErr != nil {
							 | 
						|
											return fmt.Errorf("destination returned status %d, and failed to read body: %w", dstResp.StatusCode, readErr)
							 | 
						|
										}
							 | 
						|
										return fmt.Errorf("destination returned status %d: %s", dstResp.StatusCode, string(body))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									glog.V(4).InfofCtx(ctx, "FilerServer.performStreamCopy: successfully streamed data from %s to %s", srcUrl, dstUrl)
							 | 
						|
									return nil
							 | 
						|
								}
							 |