You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

501 lines
15 KiB

5 years ago
7 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/operation"
  17. "github.com/chrislusf/seaweedfs/weed/pb"
  18. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  19. "github.com/chrislusf/seaweedfs/weed/security"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/wdclient"
  22. )
  23. var (
  24. copy CopyOptions
  25. waitGroup sync.WaitGroup
  26. )
  27. type CopyOptions struct {
  28. include *string
  29. replication *string
  30. collection *string
  31. ttl *string
  32. maxMB *int
  33. masterClient *wdclient.MasterClient
  34. concurrenctFiles *int
  35. concurrenctChunks *int
  36. compressionLevel *int
  37. grpcDialOption grpc.DialOption
  38. masters []string
  39. cipher bool
  40. }
  41. func init() {
  42. cmdCopy.Run = runCopy // break init cycle
  43. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  44. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  45. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  46. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  47. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  48. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  49. copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  50. copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
  51. copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
  52. }
  53. var cmdCopy = &Command{
  54. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  55. Short: "copy one or a list of files to a filer folder",
  56. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  57. It can copy one or a list of files or folders.
  58. If copying a whole folder recursively:
  59. All files under the folder and subfolders will be copyed.
  60. Optional parameter "-include" allows you to specify the file name patterns.
  61. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  62. `,
  63. }
  64. func runCopy(cmd *Command, args []string) bool {
  65. util.LoadConfiguration("security", false)
  66. if len(args) <= 1 {
  67. return false
  68. }
  69. filerDestination := args[len(args)-1]
  70. fileOrDirs := args[0 : len(args)-1]
  71. filerUrl, err := url.Parse(filerDestination)
  72. if err != nil {
  73. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  74. return false
  75. }
  76. urlPath := filerUrl.Path
  77. if !strings.HasSuffix(urlPath, "/") {
  78. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  79. return false
  80. }
  81. if filerUrl.Port() == "" {
  82. fmt.Printf("The filer port should be specified.\n")
  83. return false
  84. }
  85. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  86. if parseErr != nil {
  87. fmt.Printf("The filer port parse error: %v\n", parseErr)
  88. return false
  89. }
  90. filerGrpcPort := filerPort + 10000
  91. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  92. copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  93. masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
  94. if err != nil {
  95. fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
  96. return false
  97. }
  98. if *copy.collection == "" {
  99. *copy.collection = collection
  100. }
  101. if *copy.replication == "" {
  102. *copy.replication = replication
  103. }
  104. if *copy.maxMB == 0 {
  105. *copy.maxMB = int(maxMB)
  106. }
  107. copy.masters = masters
  108. copy.cipher = cipher
  109. if *cmdCopy.IsDebug {
  110. util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
  111. }
  112. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
  113. go func() {
  114. defer close(fileCopyTaskChan)
  115. for _, fileOrDir := range fileOrDirs {
  116. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  117. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  118. break
  119. }
  120. }
  121. }()
  122. for i := 0; i < *copy.concurrenctFiles; i++ {
  123. waitGroup.Add(1)
  124. go func() {
  125. defer waitGroup.Done()
  126. worker := FileCopyWorker{
  127. options: &copy,
  128. filerHost: filerUrl.Host,
  129. filerGrpcAddress: filerGrpcAddress,
  130. }
  131. if err := worker.copyFiles(fileCopyTaskChan); err != nil {
  132. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  133. return
  134. }
  135. }()
  136. }
  137. waitGroup.Wait()
  138. return true
  139. }
  140. func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
  141. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  142. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  143. if err != nil {
  144. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  145. }
  146. masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
  147. cipher = resp.Cipher
  148. return nil
  149. })
  150. return
  151. }
  152. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  153. fi, err := os.Stat(fileOrDir)
  154. if err != nil {
  155. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  156. return nil
  157. }
  158. mode := fi.Mode()
  159. if mode.IsDir() {
  160. files, _ := ioutil.ReadDir(fileOrDir)
  161. for _, subFileOrDir := range files {
  162. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  163. return err
  164. }
  165. }
  166. return nil
  167. }
  168. uid, gid := util.GetFileUidGid(fi)
  169. fileCopyTaskChan <- FileCopyTask{
  170. sourceLocation: fileOrDir,
  171. destinationUrlPath: destPath,
  172. fileSize: fi.Size(),
  173. fileMode: fi.Mode(),
  174. uid: uid,
  175. gid: gid,
  176. }
  177. return nil
  178. }
  179. type FileCopyWorker struct {
  180. options *CopyOptions
  181. filerHost string
  182. filerGrpcAddress string
  183. }
  184. func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
  185. for task := range fileCopyTaskChan {
  186. if err := worker.doEachCopy(task); err != nil {
  187. return err
  188. }
  189. }
  190. return nil
  191. }
  192. type FileCopyTask struct {
  193. sourceLocation string
  194. destinationUrlPath string
  195. fileSize int64
  196. fileMode os.FileMode
  197. uid uint32
  198. gid uint32
  199. }
  200. func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
  201. f, err := os.Open(task.sourceLocation)
  202. if err != nil {
  203. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  204. if _, ok := err.(*os.PathError); ok {
  205. fmt.Printf("skipping %s\n", task.sourceLocation)
  206. return nil
  207. }
  208. return err
  209. }
  210. defer f.Close()
  211. // this is a regular file
  212. if *worker.options.include != "" {
  213. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  214. return nil
  215. }
  216. }
  217. // find the chunk count
  218. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  219. chunkCount := 1
  220. if chunkSize > 0 && task.fileSize > chunkSize {
  221. chunkCount = int(task.fileSize/chunkSize) + 1
  222. }
  223. if chunkCount == 1 {
  224. return worker.uploadFileAsOne(task, f)
  225. }
  226. return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
  227. }
  228. func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
  229. // upload the file content
  230. fileName := filepath.Base(f.Name())
  231. mimeType := detectMimeType(f)
  232. var chunks []*filer_pb.FileChunk
  233. var assignResult *filer_pb.AssignVolumeResponse
  234. var assignError error
  235. if task.fileSize > 0 {
  236. // assign a volume
  237. err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  238. request := &filer_pb.AssignVolumeRequest{
  239. Count: 1,
  240. Replication: *worker.options.replication,
  241. Collection: *worker.options.collection,
  242. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  243. ParentPath: task.destinationUrlPath,
  244. }
  245. assignResult, assignError = client.AssignVolume(context.Background(), request)
  246. if assignError != nil {
  247. return fmt.Errorf("assign volume failure %v: %v", request, assignError)
  248. }
  249. if assignResult.Error != "" {
  250. return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
  251. }
  252. return nil
  253. })
  254. if err != nil {
  255. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  256. }
  257. targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
  258. uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, worker.options.cipher, f, false, mimeType, nil, security.EncodedJwt(assignResult.Auth), *worker.options.compressionLevel)
  259. if err != nil {
  260. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  261. }
  262. if uploadResult.Error != "" {
  263. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  264. }
  265. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  266. chunks = append(chunks, &filer_pb.FileChunk{
  267. FileId: assignResult.FileId,
  268. Offset: 0,
  269. Size: uint64(uploadResult.Size),
  270. Mtime: time.Now().UnixNano(),
  271. ETag: uploadResult.ETag,
  272. CipherKey: uploadResult.CipherKey,
  273. })
  274. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  275. }
  276. if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  277. request := &filer_pb.CreateEntryRequest{
  278. Directory: task.destinationUrlPath,
  279. Entry: &filer_pb.Entry{
  280. Name: fileName,
  281. Attributes: &filer_pb.FuseAttributes{
  282. Crtime: time.Now().Unix(),
  283. Mtime: time.Now().Unix(),
  284. Gid: task.gid,
  285. Uid: task.uid,
  286. FileSize: uint64(task.fileSize),
  287. FileMode: uint32(task.fileMode),
  288. Mime: mimeType,
  289. Replication: *worker.options.replication,
  290. Collection: *worker.options.collection,
  291. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  292. },
  293. Chunks: chunks,
  294. },
  295. }
  296. if err := filer_pb.CreateEntry(client, request); err != nil {
  297. return fmt.Errorf("update fh: %v", err)
  298. }
  299. return nil
  300. }); err != nil {
  301. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  302. }
  303. return nil
  304. }
  305. func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  306. fileName := filepath.Base(f.Name())
  307. mimeType := detectMimeType(f)
  308. chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
  309. concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
  310. var wg sync.WaitGroup
  311. var uploadError error
  312. var collection, replication string
  313. fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
  314. for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
  315. wg.Add(1)
  316. concurrentChunks <- struct{}{}
  317. go func(i int64) {
  318. defer func() {
  319. wg.Done()
  320. <-concurrentChunks
  321. }()
  322. // assign a volume
  323. var assignResult *filer_pb.AssignVolumeResponse
  324. var assignError error
  325. err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  326. request := &filer_pb.AssignVolumeRequest{
  327. Count: 1,
  328. Replication: *worker.options.replication,
  329. Collection: *worker.options.collection,
  330. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  331. ParentPath: task.destinationUrlPath,
  332. }
  333. assignResult, assignError = client.AssignVolume(context.Background(), request)
  334. if assignError != nil {
  335. return fmt.Errorf("assign volume failure %v: %v", request, assignError)
  336. }
  337. if assignResult.Error != "" {
  338. return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
  339. }
  340. return nil
  341. })
  342. if err != nil {
  343. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  344. }
  345. if err != nil {
  346. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  347. }
  348. targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
  349. if collection == "" {
  350. collection = assignResult.Collection
  351. }
  352. if replication == "" {
  353. replication = assignResult.Replication
  354. }
  355. uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), false, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
  356. if err != nil {
  357. uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  358. return
  359. }
  360. if uploadResult.Error != "" {
  361. uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  362. return
  363. }
  364. chunksChan <- &filer_pb.FileChunk{
  365. FileId: assignResult.FileId,
  366. Offset: i * chunkSize,
  367. Size: uint64(uploadResult.Size),
  368. Mtime: time.Now().UnixNano(),
  369. ETag: uploadResult.ETag,
  370. CipherKey: uploadResult.CipherKey,
  371. }
  372. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  373. }(i)
  374. }
  375. wg.Wait()
  376. close(chunksChan)
  377. var chunks []*filer_pb.FileChunk
  378. for chunk := range chunksChan {
  379. chunks = append(chunks, chunk)
  380. }
  381. if uploadError != nil {
  382. var fileIds []string
  383. for _, chunk := range chunks {
  384. fileIds = append(fileIds, chunk.FileId)
  385. }
  386. operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
  387. return uploadError
  388. }
  389. if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  390. request := &filer_pb.CreateEntryRequest{
  391. Directory: task.destinationUrlPath,
  392. Entry: &filer_pb.Entry{
  393. Name: fileName,
  394. Attributes: &filer_pb.FuseAttributes{
  395. Crtime: time.Now().Unix(),
  396. Mtime: time.Now().Unix(),
  397. Gid: task.gid,
  398. Uid: task.uid,
  399. FileSize: uint64(task.fileSize),
  400. FileMode: uint32(task.fileMode),
  401. Mime: mimeType,
  402. Replication: replication,
  403. Collection: collection,
  404. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  405. },
  406. Chunks: chunks,
  407. },
  408. }
  409. if err := filer_pb.CreateEntry(client, request); err != nil {
  410. return fmt.Errorf("update fh: %v", err)
  411. }
  412. return nil
  413. }); err != nil {
  414. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  415. }
  416. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  417. return nil
  418. }
  419. func detectMimeType(f *os.File) string {
  420. head := make([]byte, 512)
  421. f.Seek(0, io.SeekStart)
  422. n, err := f.Read(head)
  423. if err == io.EOF {
  424. return ""
  425. }
  426. if err != nil {
  427. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  428. return "application/octet-stream"
  429. }
  430. f.Seek(0, io.SeekStart)
  431. mimeType := http.DetectContentType(head[:n])
  432. return mimeType
  433. }