You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
12 KiB

7 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/security"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/wdclient"
  20. "github.com/spf13/viper"
  21. "google.golang.org/grpc"
  22. )
  23. var (
  24. copy CopyOptions
  25. waitGroup sync.WaitGroup
  26. )
  27. type CopyOptions struct {
  28. filerGrpcPort *int
  29. master *string
  30. include *string
  31. replication *string
  32. collection *string
  33. ttl *string
  34. maxMB *int
  35. grpcDialOption grpc.DialOption
  36. masterClient *wdclient.MasterClient
  37. concurrency *int
  38. compressionLevel *int
  39. }
  40. func init() {
  41. cmdCopy.Run = runCopy // break init cycle
  42. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  43. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  44. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  45. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  46. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  47. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  48. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  49. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  50. copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  51. copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
  52. }
  53. var cmdCopy = &Command{
  54. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  55. Short: "copy one or a list of files to a filer folder",
  56. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  57. It can copy one or a list of files or folders.
  58. If copying a whole folder recursively:
  59. All files under the folder and subfolders will be copyed.
  60. Optional parameter "-include" allows you to specify the file name patterns.
  61. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  62. `,
  63. }
  64. func runCopy(cmd *Command, args []string) bool {
  65. util.LoadConfiguration("security", false)
  66. if len(args) <= 1 {
  67. return false
  68. }
  69. filerDestination := args[len(args)-1]
  70. fileOrDirs := args[0 : len(args)-1]
  71. filerUrl, err := url.Parse(filerDestination)
  72. if err != nil {
  73. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  74. return false
  75. }
  76. urlPath := filerUrl.Path
  77. if !strings.HasSuffix(urlPath, "/") {
  78. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  79. return false
  80. }
  81. if filerUrl.Port() == "" {
  82. fmt.Printf("The filer port should be specified.\n")
  83. return false
  84. }
  85. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  86. if parseErr != nil {
  87. fmt.Printf("The filer port parse error: %v\n", parseErr)
  88. return false
  89. }
  90. filerGrpcPort := filerPort + 10000
  91. if *copy.filerGrpcPort != 0 {
  92. filerGrpcPort = uint64(*copy.filerGrpcPort)
  93. }
  94. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  95. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  96. copy.masterClient = wdclient.NewMasterClient(context.Background(), copy.grpcDialOption, "client", strings.Split(*copy.master, ","))
  97. go copy.masterClient.KeepConnectedToMaster()
  98. copy.masterClient.WaitUntilConnected()
  99. if *cmdCopy.IsDebug {
  100. util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
  101. }
  102. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
  103. ctx := context.Background()
  104. go func() {
  105. defer close(fileCopyTaskChan)
  106. for _, fileOrDir := range fileOrDirs {
  107. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  108. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  109. break
  110. }
  111. }
  112. }()
  113. for i := 0; i < *copy.concurrency; i++ {
  114. waitGroup.Add(1)
  115. go func() {
  116. defer waitGroup.Done()
  117. worker := FileCopyWorker{
  118. options: &copy,
  119. filerHost: filerUrl.Host,
  120. filerGrpcAddress: filerGrpcAddress,
  121. }
  122. if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
  123. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  124. return
  125. }
  126. }()
  127. }
  128. waitGroup.Wait()
  129. return true
  130. }
  131. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  132. fi, err := os.Stat(fileOrDir)
  133. if err != nil {
  134. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  135. return nil
  136. }
  137. mode := fi.Mode()
  138. if mode.IsDir() {
  139. files, _ := ioutil.ReadDir(fileOrDir)
  140. for _, subFileOrDir := range files {
  141. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  142. return err
  143. }
  144. }
  145. return nil
  146. }
  147. uid, gid := util.GetFileUidGid(fi)
  148. fileCopyTaskChan <- FileCopyTask{
  149. sourceLocation: fileOrDir,
  150. destinationUrlPath: destPath,
  151. fileSize: fi.Size(),
  152. fileMode: fi.Mode(),
  153. uid: uid,
  154. gid: gid,
  155. }
  156. return nil
  157. }
  158. type FileCopyWorker struct {
  159. options *CopyOptions
  160. filerHost string
  161. filerGrpcAddress string
  162. }
  163. func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
  164. for task := range fileCopyTaskChan {
  165. if err := worker.doEachCopy(ctx, task); err != nil {
  166. return err
  167. }
  168. }
  169. return nil
  170. }
  171. type FileCopyTask struct {
  172. sourceLocation string
  173. destinationUrlPath string
  174. fileSize int64
  175. fileMode os.FileMode
  176. uid uint32
  177. gid uint32
  178. }
  179. func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
  180. f, err := os.Open(task.sourceLocation)
  181. if err != nil {
  182. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  183. if _, ok := err.(*os.PathError); ok {
  184. fmt.Printf("skipping %s\n", task.sourceLocation)
  185. return nil
  186. }
  187. return err
  188. }
  189. defer f.Close()
  190. // this is a regular file
  191. if *worker.options.include != "" {
  192. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  193. return nil
  194. }
  195. }
  196. // find the chunk count
  197. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  198. chunkCount := 1
  199. if chunkSize > 0 && task.fileSize > chunkSize {
  200. chunkCount = int(task.fileSize/chunkSize) + 1
  201. }
  202. if chunkCount == 1 {
  203. return worker.uploadFileAsOne(ctx, task, f)
  204. }
  205. return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
  206. }
  207. func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
  208. // upload the file content
  209. fileName := filepath.Base(f.Name())
  210. mimeType := detectMimeType(f)
  211. var chunks []*filer_pb.FileChunk
  212. if task.fileSize > 0 {
  213. // assign a volume
  214. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  215. Count: 1,
  216. Replication: *worker.options.replication,
  217. Collection: *worker.options.collection,
  218. Ttl: *worker.options.ttl,
  219. })
  220. if err != nil {
  221. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  222. }
  223. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  224. uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
  225. if err != nil {
  226. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  227. }
  228. if uploadResult.Error != "" {
  229. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  230. }
  231. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  232. chunks = append(chunks, &filer_pb.FileChunk{
  233. FileId: assignResult.Fid,
  234. Offset: 0,
  235. Size: uint64(uploadResult.Size),
  236. Mtime: time.Now().UnixNano(),
  237. ETag: uploadResult.ETag,
  238. })
  239. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  240. }
  241. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  242. request := &filer_pb.CreateEntryRequest{
  243. Directory: task.destinationUrlPath,
  244. Entry: &filer_pb.Entry{
  245. Name: fileName,
  246. Attributes: &filer_pb.FuseAttributes{
  247. Crtime: time.Now().Unix(),
  248. Mtime: time.Now().Unix(),
  249. Gid: task.gid,
  250. Uid: task.uid,
  251. FileSize: uint64(task.fileSize),
  252. FileMode: uint32(task.fileMode),
  253. Mime: mimeType,
  254. Replication: *worker.options.replication,
  255. Collection: *worker.options.collection,
  256. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  257. },
  258. Chunks: chunks,
  259. },
  260. }
  261. if _, err := client.CreateEntry(ctx, request); err != nil {
  262. return fmt.Errorf("update fh: %v", err)
  263. }
  264. return nil
  265. }); err != nil {
  266. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  267. }
  268. return nil
  269. }
  270. func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  271. fileName := filepath.Base(f.Name())
  272. mimeType := detectMimeType(f)
  273. var chunks []*filer_pb.FileChunk
  274. for i := int64(0); i < int64(chunkCount); i++ {
  275. // assign a volume
  276. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  277. Count: 1,
  278. Replication: *worker.options.replication,
  279. Collection: *worker.options.collection,
  280. Ttl: *worker.options.ttl,
  281. })
  282. if err != nil {
  283. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  284. }
  285. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  286. uploadResult, err := operation.Upload(targetUrl,
  287. fileName+"-"+strconv.FormatInt(i+1, 10),
  288. io.LimitReader(f, chunkSize),
  289. false, "application/octet-stream", nil, assignResult.Auth)
  290. if err != nil {
  291. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  292. }
  293. if uploadResult.Error != "" {
  294. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  295. }
  296. chunks = append(chunks, &filer_pb.FileChunk{
  297. FileId: assignResult.Fid,
  298. Offset: i * chunkSize,
  299. Size: uint64(uploadResult.Size),
  300. Mtime: time.Now().UnixNano(),
  301. ETag: uploadResult.ETag,
  302. })
  303. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  304. }
  305. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  306. request := &filer_pb.CreateEntryRequest{
  307. Directory: task.destinationUrlPath,
  308. Entry: &filer_pb.Entry{
  309. Name: fileName,
  310. Attributes: &filer_pb.FuseAttributes{
  311. Crtime: time.Now().Unix(),
  312. Mtime: time.Now().Unix(),
  313. Gid: task.gid,
  314. Uid: task.uid,
  315. FileSize: uint64(task.fileSize),
  316. FileMode: uint32(task.fileMode),
  317. Mime: mimeType,
  318. Replication: *worker.options.replication,
  319. Collection: *worker.options.collection,
  320. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  321. },
  322. Chunks: chunks,
  323. },
  324. }
  325. if _, err := client.CreateEntry(ctx, request); err != nil {
  326. return fmt.Errorf("update fh: %v", err)
  327. }
  328. return nil
  329. }); err != nil {
  330. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  331. }
  332. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  333. return nil
  334. }
  335. func detectMimeType(f *os.File) string {
  336. head := make([]byte, 512)
  337. f.Seek(0, io.SeekStart)
  338. n, err := f.Read(head)
  339. if err == io.EOF {
  340. return ""
  341. }
  342. if err != nil {
  343. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  344. return "application/octet-stream"
  345. }
  346. f.Seek(0, io.SeekStart)
  347. mimeType := http.DetectContentType(head[:n])
  348. return mimeType
  349. }
  350. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  351. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  352. client := filer_pb.NewSeaweedFilerClient(clientConn)
  353. return fn(client)
  354. }, filerAddress, grpcDialOption)
  355. }