You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

442 lines
13 KiB

7 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/joeslay/seaweedfs/weed/operation"
  16. "github.com/joeslay/seaweedfs/weed/pb/filer_pb"
  17. "github.com/joeslay/seaweedfs/weed/security"
  18. "github.com/joeslay/seaweedfs/weed/util"
  19. "github.com/joeslay/seaweedfs/weed/wdclient"
  20. "github.com/spf13/viper"
  21. "google.golang.org/grpc"
  22. )
  23. var (
  24. copy CopyOptions
  25. waitGroup sync.WaitGroup
  26. )
  27. type CopyOptions struct {
  28. include *string
  29. replication *string
  30. collection *string
  31. ttl *string
  32. maxMB *int
  33. masterClient *wdclient.MasterClient
  34. concurrency *int
  35. compressionLevel *int
  36. grpcDialOption grpc.DialOption
  37. masters []string
  38. }
  39. func init() {
  40. cmdCopy.Run = runCopy // break init cycle
  41. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  42. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  43. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  44. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  45. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  46. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  47. copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  48. copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
  49. }
  50. var cmdCopy = &Command{
  51. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  52. Short: "copy one or a list of files to a filer folder",
  53. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  54. It can copy one or a list of files or folders.
  55. If copying a whole folder recursively:
  56. All files under the folder and subfolders will be copyed.
  57. Optional parameter "-include" allows you to specify the file name patterns.
  58. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  59. `,
  60. }
  61. func runCopy(cmd *Command, args []string) bool {
  62. util.LoadConfiguration("security", false)
  63. if len(args) <= 1 {
  64. return false
  65. }
  66. filerDestination := args[len(args)-1]
  67. fileOrDirs := args[0 : len(args)-1]
  68. filerUrl, err := url.Parse(filerDestination)
  69. if err != nil {
  70. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  71. return false
  72. }
  73. urlPath := filerUrl.Path
  74. if !strings.HasSuffix(urlPath, "/") {
  75. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  76. return false
  77. }
  78. if filerUrl.Port() == "" {
  79. fmt.Printf("The filer port should be specified.\n")
  80. return false
  81. }
  82. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  83. if parseErr != nil {
  84. fmt.Printf("The filer port parse error: %v\n", parseErr)
  85. return false
  86. }
  87. filerGrpcPort := filerPort + 10000
  88. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  89. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  90. ctx := context.Background()
  91. masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress)
  92. if err != nil {
  93. fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
  94. return false
  95. }
  96. if *copy.collection == "" {
  97. *copy.collection = collection
  98. }
  99. if *copy.replication == "" {
  100. *copy.replication = replication
  101. }
  102. if *copy.maxMB == 0 {
  103. *copy.maxMB = int(maxMB)
  104. }
  105. copy.masters = masters
  106. copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
  107. go copy.masterClient.KeepConnectedToMaster()
  108. copy.masterClient.WaitUntilConnected()
  109. if *cmdCopy.IsDebug {
  110. util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
  111. }
  112. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
  113. go func() {
  114. defer close(fileCopyTaskChan)
  115. for _, fileOrDir := range fileOrDirs {
  116. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  117. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  118. break
  119. }
  120. }
  121. }()
  122. for i := 0; i < *copy.concurrency; i++ {
  123. waitGroup.Add(1)
  124. go func() {
  125. defer waitGroup.Done()
  126. worker := FileCopyWorker{
  127. options: &copy,
  128. filerHost: filerUrl.Host,
  129. filerGrpcAddress: filerGrpcAddress,
  130. }
  131. if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
  132. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  133. return
  134. }
  135. }()
  136. }
  137. waitGroup.Wait()
  138. return true
  139. }
  140. func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
  141. err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  142. resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
  143. if err != nil {
  144. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  145. }
  146. masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
  147. return nil
  148. })
  149. return
  150. }
  151. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  152. fi, err := os.Stat(fileOrDir)
  153. if err != nil {
  154. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  155. return nil
  156. }
  157. mode := fi.Mode()
  158. if mode.IsDir() {
  159. files, _ := ioutil.ReadDir(fileOrDir)
  160. for _, subFileOrDir := range files {
  161. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  162. return err
  163. }
  164. }
  165. return nil
  166. }
  167. uid, gid := util.GetFileUidGid(fi)
  168. fileCopyTaskChan <- FileCopyTask{
  169. sourceLocation: fileOrDir,
  170. destinationUrlPath: destPath,
  171. fileSize: fi.Size(),
  172. fileMode: fi.Mode(),
  173. uid: uid,
  174. gid: gid,
  175. }
  176. return nil
  177. }
  178. type FileCopyWorker struct {
  179. options *CopyOptions
  180. filerHost string
  181. filerGrpcAddress string
  182. }
  183. func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
  184. for task := range fileCopyTaskChan {
  185. if err := worker.doEachCopy(ctx, task); err != nil {
  186. return err
  187. }
  188. }
  189. return nil
  190. }
  191. type FileCopyTask struct {
  192. sourceLocation string
  193. destinationUrlPath string
  194. fileSize int64
  195. fileMode os.FileMode
  196. uid uint32
  197. gid uint32
  198. }
  199. func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
  200. f, err := os.Open(task.sourceLocation)
  201. if err != nil {
  202. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  203. if _, ok := err.(*os.PathError); ok {
  204. fmt.Printf("skipping %s\n", task.sourceLocation)
  205. return nil
  206. }
  207. return err
  208. }
  209. defer f.Close()
  210. // this is a regular file
  211. if *worker.options.include != "" {
  212. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  213. return nil
  214. }
  215. }
  216. // find the chunk count
  217. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  218. chunkCount := 1
  219. if chunkSize > 0 && task.fileSize > chunkSize {
  220. chunkCount = int(task.fileSize/chunkSize) + 1
  221. }
  222. if chunkCount == 1 {
  223. return worker.uploadFileAsOne(ctx, task, f)
  224. }
  225. return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
  226. }
  227. func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
  228. // upload the file content
  229. fileName := filepath.Base(f.Name())
  230. mimeType := detectMimeType(f)
  231. var chunks []*filer_pb.FileChunk
  232. if task.fileSize > 0 {
  233. // assign a volume
  234. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  235. Count: 1,
  236. Replication: *worker.options.replication,
  237. Collection: *worker.options.collection,
  238. Ttl: *worker.options.ttl,
  239. })
  240. if err != nil {
  241. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  242. }
  243. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  244. uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
  245. if err != nil {
  246. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  247. }
  248. if uploadResult.Error != "" {
  249. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  250. }
  251. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  252. chunks = append(chunks, &filer_pb.FileChunk{
  253. FileId: assignResult.Fid,
  254. Offset: 0,
  255. Size: uint64(uploadResult.Size),
  256. Mtime: time.Now().UnixNano(),
  257. ETag: uploadResult.ETag,
  258. })
  259. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  260. }
  261. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  262. request := &filer_pb.CreateEntryRequest{
  263. Directory: task.destinationUrlPath,
  264. Entry: &filer_pb.Entry{
  265. Name: fileName,
  266. Attributes: &filer_pb.FuseAttributes{
  267. Crtime: time.Now().Unix(),
  268. Mtime: time.Now().Unix(),
  269. Gid: task.gid,
  270. Uid: task.uid,
  271. FileSize: uint64(task.fileSize),
  272. FileMode: uint32(task.fileMode),
  273. Mime: mimeType,
  274. Replication: *worker.options.replication,
  275. Collection: *worker.options.collection,
  276. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  277. },
  278. Chunks: chunks,
  279. },
  280. }
  281. if _, err := client.CreateEntry(ctx, request); err != nil {
  282. return fmt.Errorf("update fh: %v", err)
  283. }
  284. return nil
  285. }); err != nil {
  286. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  287. }
  288. return nil
  289. }
  290. func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  291. fileName := filepath.Base(f.Name())
  292. mimeType := detectMimeType(f)
  293. var chunks []*filer_pb.FileChunk
  294. for i := int64(0); i < int64(chunkCount); i++ {
  295. // assign a volume
  296. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  297. Count: 1,
  298. Replication: *worker.options.replication,
  299. Collection: *worker.options.collection,
  300. Ttl: *worker.options.ttl,
  301. })
  302. if err != nil {
  303. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  304. }
  305. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  306. uploadResult, err := operation.Upload(targetUrl,
  307. fileName+"-"+strconv.FormatInt(i+1, 10),
  308. io.LimitReader(f, chunkSize),
  309. false, "application/octet-stream", nil, assignResult.Auth)
  310. if err != nil {
  311. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  312. }
  313. if uploadResult.Error != "" {
  314. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  315. }
  316. chunks = append(chunks, &filer_pb.FileChunk{
  317. FileId: assignResult.Fid,
  318. Offset: i * chunkSize,
  319. Size: uint64(uploadResult.Size),
  320. Mtime: time.Now().UnixNano(),
  321. ETag: uploadResult.ETag,
  322. })
  323. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  324. }
  325. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  326. request := &filer_pb.CreateEntryRequest{
  327. Directory: task.destinationUrlPath,
  328. Entry: &filer_pb.Entry{
  329. Name: fileName,
  330. Attributes: &filer_pb.FuseAttributes{
  331. Crtime: time.Now().Unix(),
  332. Mtime: time.Now().Unix(),
  333. Gid: task.gid,
  334. Uid: task.uid,
  335. FileSize: uint64(task.fileSize),
  336. FileMode: uint32(task.fileMode),
  337. Mime: mimeType,
  338. Replication: *worker.options.replication,
  339. Collection: *worker.options.collection,
  340. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  341. },
  342. Chunks: chunks,
  343. },
  344. }
  345. if _, err := client.CreateEntry(ctx, request); err != nil {
  346. return fmt.Errorf("update fh: %v", err)
  347. }
  348. return nil
  349. }); err != nil {
  350. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  351. }
  352. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  353. return nil
  354. }
  355. func detectMimeType(f *os.File) string {
  356. head := make([]byte, 512)
  357. f.Seek(0, io.SeekStart)
  358. n, err := f.Read(head)
  359. if err == io.EOF {
  360. return ""
  361. }
  362. if err != nil {
  363. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  364. return "application/octet-stream"
  365. }
  366. f.Seek(0, io.SeekStart)
  367. mimeType := http.DetectContentType(head[:n])
  368. return mimeType
  369. }
  370. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  371. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  372. client := filer_pb.NewSeaweedFilerClient(clientConn)
  373. return fn(client)
  374. }, filerAddress, grpcDialOption)
  375. }