You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

421 lines
12 KiB

7 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/security"
  8. "github.com/chrislusf/seaweedfs/weed/server"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. "github.com/spf13/viper"
  12. "google.golang.org/grpc"
  13. "io"
  14. "io/ioutil"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. var (
  25. copy CopyOptions
  26. waitGroup sync.WaitGroup
  27. )
  28. type CopyOptions struct {
  29. filerGrpcPort *int
  30. master *string
  31. include *string
  32. replication *string
  33. collection *string
  34. ttl *string
  35. maxMB *int
  36. grpcDialOption grpc.DialOption
  37. masterClient *wdclient.MasterClient
  38. concurrency *int
  39. compressionLevel *int
  40. }
  41. func init() {
  42. cmdCopy.Run = runCopy // break init cycle
  43. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  44. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  45. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  46. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  47. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  48. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  49. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  50. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  51. copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  52. copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
  53. }
  54. var cmdCopy = &Command{
  55. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  56. Short: "copy one or a list of files to a filer folder",
  57. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  58. It can copy one or a list of files or folders.
  59. If copying a whole folder recursively:
  60. All files under the folder and subfolders will be copyed.
  61. Optional parameter "-include" allows you to specify the file name patterns.
  62. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  63. `,
  64. }
  65. func runCopy(cmd *Command, args []string) bool {
  66. weed_server.LoadConfiguration("security", false)
  67. if len(args) <= 1 {
  68. return false
  69. }
  70. filerDestination := args[len(args)-1]
  71. fileOrDirs := args[0 : len(args)-1]
  72. filerUrl, err := url.Parse(filerDestination)
  73. if err != nil {
  74. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  75. return false
  76. }
  77. urlPath := filerUrl.Path
  78. if !strings.HasSuffix(urlPath, "/") {
  79. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  80. return false
  81. }
  82. if filerUrl.Port() == "" {
  83. fmt.Printf("The filer port should be specified.\n")
  84. return false
  85. }
  86. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  87. if parseErr != nil {
  88. fmt.Printf("The filer port parse error: %v\n", parseErr)
  89. return false
  90. }
  91. filerGrpcPort := filerPort + 10000
  92. if *copy.filerGrpcPort != 0 {
  93. filerGrpcPort = uint64(*copy.filerGrpcPort)
  94. }
  95. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  96. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  97. copy.masterClient = wdclient.NewMasterClient(context.Background(), copy.grpcDialOption, "client", strings.Split(*copy.master, ","))
  98. go copy.masterClient.KeepConnectedToMaster()
  99. copy.masterClient.WaitUntilConnected()
  100. if *cmdCopy.IsDebug {
  101. util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
  102. }
  103. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
  104. ctx := context.Background()
  105. go func() {
  106. defer close(fileCopyTaskChan)
  107. for _, fileOrDir := range fileOrDirs {
  108. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  109. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  110. break
  111. }
  112. }
  113. }()
  114. for i := 0; i < *copy.concurrency; i++ {
  115. waitGroup.Add(1)
  116. go func() {
  117. defer waitGroup.Done()
  118. worker := FileCopyWorker{
  119. options: &copy,
  120. filerHost: filerUrl.Host,
  121. filerGrpcAddress: filerGrpcAddress,
  122. }
  123. if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
  124. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  125. return
  126. }
  127. }()
  128. }
  129. waitGroup.Wait()
  130. return true
  131. }
  132. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  133. fi, err := os.Stat(fileOrDir)
  134. if err != nil {
  135. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  136. return nil
  137. }
  138. mode := fi.Mode()
  139. if mode.IsDir() {
  140. files, _ := ioutil.ReadDir(fileOrDir)
  141. for _, subFileOrDir := range files {
  142. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  143. return err
  144. }
  145. }
  146. return nil
  147. }
  148. uid, gid := util.GetFileUidGid(fi)
  149. fileCopyTaskChan <- FileCopyTask{
  150. sourceLocation: fileOrDir,
  151. destinationUrlPath: destPath,
  152. fileSize: fi.Size(),
  153. fileMode: fi.Mode(),
  154. uid: uid,
  155. gid: gid,
  156. }
  157. return nil
  158. }
  159. type FileCopyWorker struct {
  160. options *CopyOptions
  161. filerHost string
  162. filerGrpcAddress string
  163. }
  164. func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
  165. for task := range fileCopyTaskChan {
  166. if err := worker.doEachCopy(ctx, task); err != nil {
  167. return err
  168. }
  169. }
  170. return nil
  171. }
  172. type FileCopyTask struct {
  173. sourceLocation string
  174. destinationUrlPath string
  175. fileSize int64
  176. fileMode os.FileMode
  177. uid uint32
  178. gid uint32
  179. }
  180. func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
  181. f, err := os.Open(task.sourceLocation)
  182. if err != nil {
  183. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  184. if _, ok := err.(*os.PathError); ok {
  185. fmt.Printf("skipping %s\n", task.sourceLocation)
  186. return nil
  187. }
  188. return err
  189. }
  190. defer f.Close()
  191. // this is a regular file
  192. if *worker.options.include != "" {
  193. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  194. return nil
  195. }
  196. }
  197. // find the chunk count
  198. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  199. chunkCount := 1
  200. if chunkSize > 0 && task.fileSize > chunkSize {
  201. chunkCount = int(task.fileSize/chunkSize) + 1
  202. }
  203. if chunkCount == 1 {
  204. return worker.uploadFileAsOne(ctx, task, f)
  205. }
  206. return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
  207. }
  208. func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
  209. // upload the file content
  210. fileName := filepath.Base(f.Name())
  211. mimeType := detectMimeType(f)
  212. var chunks []*filer_pb.FileChunk
  213. if task.fileSize > 0 {
  214. // assign a volume
  215. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  216. Count: 1,
  217. Replication: *worker.options.replication,
  218. Collection: *worker.options.collection,
  219. Ttl: *worker.options.ttl,
  220. })
  221. if err != nil {
  222. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  223. }
  224. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  225. uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
  226. if err != nil {
  227. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  228. }
  229. if uploadResult.Error != "" {
  230. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  231. }
  232. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  233. chunks = append(chunks, &filer_pb.FileChunk{
  234. FileId: assignResult.Fid,
  235. Offset: 0,
  236. Size: uint64(uploadResult.Size),
  237. Mtime: time.Now().UnixNano(),
  238. ETag: uploadResult.ETag,
  239. })
  240. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  241. }
  242. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  243. request := &filer_pb.CreateEntryRequest{
  244. Directory: task.destinationUrlPath,
  245. Entry: &filer_pb.Entry{
  246. Name: fileName,
  247. Attributes: &filer_pb.FuseAttributes{
  248. Crtime: time.Now().Unix(),
  249. Mtime: time.Now().Unix(),
  250. Gid: task.gid,
  251. Uid: task.uid,
  252. FileSize: uint64(task.fileSize),
  253. FileMode: uint32(task.fileMode),
  254. Mime: mimeType,
  255. Replication: *worker.options.replication,
  256. Collection: *worker.options.collection,
  257. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  258. },
  259. Chunks: chunks,
  260. },
  261. }
  262. if _, err := client.CreateEntry(ctx, request); err != nil {
  263. return fmt.Errorf("update fh: %v", err)
  264. }
  265. return nil
  266. }); err != nil {
  267. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  268. }
  269. return nil
  270. }
  271. func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  272. fileName := filepath.Base(f.Name())
  273. mimeType := detectMimeType(f)
  274. var chunks []*filer_pb.FileChunk
  275. for i := int64(0); i < int64(chunkCount); i++ {
  276. // assign a volume
  277. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  278. Count: 1,
  279. Replication: *worker.options.replication,
  280. Collection: *worker.options.collection,
  281. Ttl: *worker.options.ttl,
  282. })
  283. if err != nil {
  284. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  285. }
  286. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  287. uploadResult, err := operation.Upload(targetUrl,
  288. fileName+"-"+strconv.FormatInt(i+1, 10),
  289. io.LimitReader(f, chunkSize),
  290. false, "application/octet-stream", nil, assignResult.Auth)
  291. if err != nil {
  292. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  293. }
  294. if uploadResult.Error != "" {
  295. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  296. }
  297. chunks = append(chunks, &filer_pb.FileChunk{
  298. FileId: assignResult.Fid,
  299. Offset: i * chunkSize,
  300. Size: uint64(uploadResult.Size),
  301. Mtime: time.Now().UnixNano(),
  302. ETag: uploadResult.ETag,
  303. })
  304. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  305. }
  306. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  307. request := &filer_pb.CreateEntryRequest{
  308. Directory: task.destinationUrlPath,
  309. Entry: &filer_pb.Entry{
  310. Name: fileName,
  311. Attributes: &filer_pb.FuseAttributes{
  312. Crtime: time.Now().Unix(),
  313. Mtime: time.Now().Unix(),
  314. Gid: task.gid,
  315. Uid: task.uid,
  316. FileSize: uint64(task.fileSize),
  317. FileMode: uint32(task.fileMode),
  318. Mime: mimeType,
  319. Replication: *worker.options.replication,
  320. Collection: *worker.options.collection,
  321. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  322. },
  323. Chunks: chunks,
  324. },
  325. }
  326. if _, err := client.CreateEntry(ctx, request); err != nil {
  327. return fmt.Errorf("update fh: %v", err)
  328. }
  329. return nil
  330. }); err != nil {
  331. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  332. }
  333. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  334. return nil
  335. }
  336. func detectMimeType(f *os.File) string {
  337. head := make([]byte, 512)
  338. f.Seek(0, io.SeekStart)
  339. n, err := f.Read(head)
  340. if err == io.EOF {
  341. return ""
  342. }
  343. if err != nil {
  344. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  345. return "application/octet-stream"
  346. }
  347. f.Seek(0, io.SeekStart)
  348. mimeType := http.DetectContentType(head[:n])
  349. return mimeType
  350. }
  351. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  352. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  353. client := filer_pb.NewSeaweedFilerClient(clientConn)
  354. return fn(client)
  355. }, filerAddress, grpcDialOption)
  356. }