|
|
@ -106,14 +106,14 @@ func runCopy(cmd *Command, args []string) bool { |
|
|
|
copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") |
|
|
|
|
|
|
|
for _, fileOrDir := range fileOrDirs { |
|
|
|
if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) { |
|
|
|
if !doEachCopy(context.Background(), fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) { |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool { |
|
|
|
func doEachCopy(ctx context.Context, fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool { |
|
|
|
f, err := os.Open(fileOrDir) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) |
|
|
@ -131,7 +131,7 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDia |
|
|
|
if mode.IsDir() { |
|
|
|
files, _ := ioutil.ReadDir(fileOrDir) |
|
|
|
for _, subFileOrDir := range files { |
|
|
|
if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") { |
|
|
|
if !doEachCopy(ctx, fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") { |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
@ -153,13 +153,13 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDia |
|
|
|
} |
|
|
|
|
|
|
|
if chunkCount == 1 { |
|
|
|
return uploadFileAsOne(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi) |
|
|
|
return uploadFileAsOne(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi) |
|
|
|
} |
|
|
|
|
|
|
|
return uploadFileInChunks(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize) |
|
|
|
return uploadFileInChunks(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize) |
|
|
|
} |
|
|
|
|
|
|
|
func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool { |
|
|
|
func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool { |
|
|
|
|
|
|
|
// upload the file content
|
|
|
|
fileName := filepath.Base(f.Name()) |
|
|
@ -204,7 +204,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. |
|
|
|
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) |
|
|
|
} |
|
|
|
|
|
|
|
if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
request := &filer_pb.CreateEntryRequest{ |
|
|
|
Directory: urlFolder, |
|
|
|
Entry: &filer_pb.Entry{ |
|
|
@ -225,7 +225,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := client.CreateEntry(context.Background(), request); err != nil { |
|
|
|
if _, err := client.CreateEntry(ctx, request); err != nil { |
|
|
|
return fmt.Errorf("update fh: %v", err) |
|
|
|
} |
|
|
|
return nil |
|
|
@ -237,7 +237,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. |
|
|
|
return true |
|
|
|
} |
|
|
|
|
|
|
|
func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { |
|
|
|
func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { |
|
|
|
|
|
|
|
fileName := filepath.Base(f.Name()) |
|
|
|
mimeType := detectMimeType(f) |
|
|
@ -281,7 +281,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption gr |
|
|
|
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) |
|
|
|
} |
|
|
|
|
|
|
|
if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
request := &filer_pb.CreateEntryRequest{ |
|
|
|
Directory: urlFolder, |
|
|
|
Entry: &filer_pb.Entry{ |
|
|
@ -302,7 +302,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption gr |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
if _, err := client.CreateEntry(context.Background(), request); err != nil { |
|
|
|
if _, err := client.CreateEntry(ctx, request); err != nil { |
|
|
|
return fmt.Errorf("update fh: %v", err) |
|
|
|
} |
|
|
|
return nil |
|
|
@ -332,9 +332,9 @@ func detectMimeType(f *os.File) string { |
|
|
|
return mimeType |
|
|
|
} |
|
|
|
|
|
|
|
func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { |
|
|
|
func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { |
|
|
|
|
|
|
|
grpcConnection, err := util.GrpcDial(filerAddress, grpcDialOption) |
|
|
|
grpcConnection, err := util.GrpcDial(ctx, filerAddress, grpcDialOption) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("fail to dial %s: %v", filerAddress, err) |
|
|
|
} |
|
|
|