From 8a48baa0563ac94b5e84b8b4fb2ffac34c8d8ff8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 29 May 2018 23:46:45 -0700 Subject: [PATCH] add single chunk file copying to new filer --- weed/command/filer_copy.go | 74 +++++++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 8a754bb55..3b8193cd1 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -11,6 +11,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer" "github.com/chrislusf/seaweedfs/weed/security" + "path" + "net/http" ) var ( @@ -68,20 +70,20 @@ func runCopy(cmd *Command, args []string) bool { return false } filerDestination := args[len(args)-1] - fileOrDirs := args[0 : len(args)-1] + fileOrDirs := args[0: len(args)-1] filerUrl, err := url.Parse(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - path := filerUrl.Path - if !strings.HasSuffix(path, "/") { - path = path + "/" + urlPath := filerUrl.Path + if !strings.HasSuffix(urlPath, "/") { + urlPath = urlPath + "/" } for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, path) { + if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) { return false } } @@ -120,29 +122,67 @@ func doEachCopy(fileOrDir string, host string, path string) bool { } } - parts, err := operation.NewFileParts([]string{fileOrDir}) + // find the chunk count + chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkCount := 1 + if chunkSize > 0 && fi.Size() > chunkSize { + chunkCount = int(fi.Size()/chunkSize) + 1 + } + + // assign a volume + assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + Count: uint64(chunkCount), + Replication: *copy.replication, + Collection: *copy.collection, + Ttl: *copy.ttl, + }) if err != nil { - fmt.Printf("Failed to read file %s: %v\n", fileOrDir, err) + fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + } + + if chunkCount == 1 { + return uploadFileAsOne(host, path, assignResult, f, fi) } - results, err := operation.SubmitFiles(*copy.master, parts, - *copy.replication, *copy.collection, "", - *copy.ttl, *copy.maxMB, copy.secret) + return uploadFileInChunks(host, path, assignResult, f, chunkCount) +} + +func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool { + // upload the file content + ext := strings.ToLower(path.Ext(f.Name())) + head := make([]byte, 512) + f.Seek(0, 0) + n, err := f.Read(head) if err != nil { - fmt.Printf("Failed to submit file %s: %v\n", fileOrDir, err) + fmt.Printf("read head of %v: %v\n", f.Name(), err) + return false } + f.Seek(0, 0) + mimeType := http.DetectContentType(head[:n]) + isGzipped := ext == ".gz" + + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - if strings.HasSuffix(path, "/") { - path = path + fi.Name() + uploadResult, err := operation.Upload(targetUrl, f.Name(), f, isGzipped, mimeType, nil, "") + if err != nil { + fmt.Printf("upload data %v to %s: %v\n", f.Name(), targetUrl, err) + return false + } + if uploadResult.Error != "" { + fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error) + return false } - if err = filer_operation.RegisterFile(host, path, results[0].Fid, parts[0].FileSize, + if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlPath, f.Name()), assignResult.Fid, fi.Size(), os.Getuid(), os.Getgid(), copy.secret); err != nil { - fmt.Printf("Failed to register file %s on %s: %v\n", fileOrDir, host, err) + fmt.Printf("Failed to register file %s on %s: %v\n", f.Name(), filerUrl, err) return false } - fmt.Printf("Copy %s => http://%s%s\n", fileOrDir, host, path) - + fmt.Printf("Copied %s => http://%s%s\n", f.Name(), filerUrl, urlPath) return true } + +func uploadFileInChunks(filerUrl string, path string, assignResult *operation.AssignResult, f *os.File, chunkCount int) bool { + return false +}