You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

409 lines
12 KiB

7 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/security"
  8. "github.com/chrislusf/seaweedfs/weed/server"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. "github.com/spf13/viper"
  12. "google.golang.org/grpc"
  13. "io"
  14. "io/ioutil"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. var (
  25. copy CopyOptions
  26. waitGroup sync.WaitGroup
  27. )
  28. type CopyOptions struct {
  29. filerGrpcPort *int
  30. master *string
  31. include *string
  32. replication *string
  33. collection *string
  34. ttl *string
  35. maxMB *int
  36. grpcDialOption grpc.DialOption
  37. masterClient *wdclient.MasterClient
  38. concurrency *int
  39. }
  40. func init() {
  41. cmdCopy.Run = runCopy // break init cycle
  42. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  43. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  44. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  45. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  46. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  47. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  48. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  49. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  50. copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  51. }
  52. var cmdCopy = &Command{
  53. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  54. Short: "copy one or a list of files to a filer folder",
  55. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  56. It can copy one or a list of files or folders.
  57. If copying a whole folder recursively:
  58. All files under the folder and subfolders will be copyed.
  59. Optional parameter "-include" allows you to specify the file name patterns.
  60. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  61. `,
  62. }
  63. func runCopy(cmd *Command, args []string) bool {
  64. weed_server.LoadConfiguration("security", false)
  65. if len(args) <= 1 {
  66. return false
  67. }
  68. filerDestination := args[len(args)-1]
  69. fileOrDirs := args[0 : len(args)-1]
  70. filerUrl, err := url.Parse(filerDestination)
  71. if err != nil {
  72. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  73. return false
  74. }
  75. urlPath := filerUrl.Path
  76. if !strings.HasSuffix(urlPath, "/") {
  77. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  78. return false
  79. }
  80. if filerUrl.Port() == "" {
  81. fmt.Printf("The filer port should be specified.\n")
  82. return false
  83. }
  84. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  85. if parseErr != nil {
  86. fmt.Printf("The filer port parse error: %v\n", parseErr)
  87. return false
  88. }
  89. filerGrpcPort := filerPort + 10000
  90. if *copy.filerGrpcPort != 0 {
  91. filerGrpcPort = uint64(*copy.filerGrpcPort)
  92. }
  93. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  94. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  95. copy.masterClient = wdclient.NewMasterClient(context.Background(), copy.grpcDialOption, "client", strings.Split(*copy.master, ","))
  96. go copy.masterClient.KeepConnectedToMaster()
  97. copy.masterClient.WaitUntilConnected()
  98. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
  99. ctx := context.Background()
  100. go func() {
  101. defer close(fileCopyTaskChan)
  102. for _, fileOrDir := range fileOrDirs {
  103. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  104. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  105. break
  106. }
  107. }
  108. }()
  109. for i := 0; i < *copy.concurrency; i++ {
  110. waitGroup.Add(1)
  111. go func() {
  112. defer waitGroup.Done()
  113. worker := FileCopyWorker{
  114. options: &copy,
  115. filerHost: filerUrl.Host,
  116. filerGrpcAddress: filerGrpcAddress,
  117. }
  118. if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
  119. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  120. return
  121. }
  122. }()
  123. }
  124. waitGroup.Wait()
  125. return true
  126. }
  127. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  128. fi, err := os.Stat(fileOrDir)
  129. if err != nil {
  130. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  131. return nil
  132. }
  133. mode := fi.Mode()
  134. if mode.IsDir() {
  135. files, _ := ioutil.ReadDir(fileOrDir)
  136. for _, subFileOrDir := range files {
  137. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  138. return err
  139. }
  140. }
  141. return nil
  142. }
  143. fileCopyTaskChan <- FileCopyTask{
  144. sourceLocation: fileOrDir,
  145. destinationUrlPath: destPath,
  146. fileSize: fi.Size(),
  147. fileMode: fi.Mode(),
  148. }
  149. return nil
  150. }
  151. type FileCopyWorker struct {
  152. options *CopyOptions
  153. filerHost string
  154. filerGrpcAddress string
  155. }
  156. func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
  157. for task := range fileCopyTaskChan {
  158. if err := worker.doEachCopy(ctx, task); err != nil {
  159. return err
  160. }
  161. }
  162. return nil
  163. }
  164. type FileCopyTask struct {
  165. sourceLocation string
  166. destinationUrlPath string
  167. fileSize int64
  168. fileMode os.FileMode
  169. }
  170. func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
  171. f, err := os.Open(task.sourceLocation)
  172. if err != nil {
  173. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  174. if _, ok := err.(*os.PathError); ok {
  175. fmt.Printf("skipping %s\n", task.sourceLocation)
  176. return nil
  177. }
  178. return err
  179. }
  180. defer f.Close()
  181. // this is a regular file
  182. if *worker.options.include != "" {
  183. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  184. return nil
  185. }
  186. }
  187. // find the chunk count
  188. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  189. chunkCount := 1
  190. if chunkSize > 0 && task.fileSize > chunkSize {
  191. chunkCount = int(task.fileSize/chunkSize) + 1
  192. }
  193. if chunkCount == 1 {
  194. return worker.uploadFileAsOne(ctx, task, f)
  195. }
  196. return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
  197. }
  198. func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
  199. // upload the file content
  200. fileName := filepath.Base(f.Name())
  201. mimeType := detectMimeType(f)
  202. var chunks []*filer_pb.FileChunk
  203. if task.fileSize > 0 {
  204. // assign a volume
  205. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  206. Count: 1,
  207. Replication: *worker.options.replication,
  208. Collection: *worker.options.collection,
  209. Ttl: *worker.options.ttl,
  210. })
  211. if err != nil {
  212. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  213. }
  214. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  215. uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth)
  216. if err != nil {
  217. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  218. }
  219. if uploadResult.Error != "" {
  220. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  221. }
  222. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  223. chunks = append(chunks, &filer_pb.FileChunk{
  224. FileId: assignResult.Fid,
  225. Offset: 0,
  226. Size: uint64(uploadResult.Size),
  227. Mtime: time.Now().UnixNano(),
  228. ETag: uploadResult.ETag,
  229. })
  230. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  231. }
  232. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  233. request := &filer_pb.CreateEntryRequest{
  234. Directory: task.destinationUrlPath,
  235. Entry: &filer_pb.Entry{
  236. Name: fileName,
  237. Attributes: &filer_pb.FuseAttributes{
  238. Crtime: time.Now().Unix(),
  239. Mtime: time.Now().Unix(),
  240. Gid: uint32(os.Getgid()),
  241. Uid: uint32(os.Getuid()),
  242. FileSize: uint64(task.fileSize),
  243. FileMode: uint32(task.fileMode),
  244. Mime: mimeType,
  245. Replication: *worker.options.replication,
  246. Collection: *worker.options.collection,
  247. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  248. },
  249. Chunks: chunks,
  250. },
  251. }
  252. if _, err := client.CreateEntry(ctx, request); err != nil {
  253. return fmt.Errorf("update fh: %v", err)
  254. }
  255. return nil
  256. }); err != nil {
  257. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  258. }
  259. return nil
  260. }
  261. func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  262. fileName := filepath.Base(f.Name())
  263. mimeType := detectMimeType(f)
  264. var chunks []*filer_pb.FileChunk
  265. for i := int64(0); i < int64(chunkCount); i++ {
  266. // assign a volume
  267. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  268. Count: 1,
  269. Replication: *worker.options.replication,
  270. Collection: *worker.options.collection,
  271. Ttl: *worker.options.ttl,
  272. })
  273. if err != nil {
  274. fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
  275. }
  276. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  277. uploadResult, err := operation.Upload(targetUrl,
  278. fileName+"-"+strconv.FormatInt(i+1, 10),
  279. io.LimitReader(f, chunkSize),
  280. false, "application/octet-stream", nil, assignResult.Auth)
  281. if err != nil {
  282. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  283. }
  284. if uploadResult.Error != "" {
  285. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  286. }
  287. chunks = append(chunks, &filer_pb.FileChunk{
  288. FileId: assignResult.Fid,
  289. Offset: i * chunkSize,
  290. Size: uint64(uploadResult.Size),
  291. Mtime: time.Now().UnixNano(),
  292. ETag: uploadResult.ETag,
  293. })
  294. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  295. }
  296. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  297. request := &filer_pb.CreateEntryRequest{
  298. Directory: task.destinationUrlPath,
  299. Entry: &filer_pb.Entry{
  300. Name: fileName,
  301. Attributes: &filer_pb.FuseAttributes{
  302. Crtime: time.Now().Unix(),
  303. Mtime: time.Now().Unix(),
  304. Gid: uint32(os.Getgid()),
  305. Uid: uint32(os.Getuid()),
  306. FileSize: uint64(task.fileSize),
  307. FileMode: uint32(task.fileMode),
  308. Mime: mimeType,
  309. Replication: *worker.options.replication,
  310. Collection: *worker.options.collection,
  311. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  312. },
  313. Chunks: chunks,
  314. },
  315. }
  316. if _, err := client.CreateEntry(ctx, request); err != nil {
  317. return fmt.Errorf("update fh: %v", err)
  318. }
  319. return nil
  320. }); err != nil {
  321. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  322. }
  323. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  324. return nil
  325. }
  326. func detectMimeType(f *os.File) string {
  327. head := make([]byte, 512)
  328. f.Seek(0, io.SeekStart)
  329. n, err := f.Read(head)
  330. if err == io.EOF {
  331. return ""
  332. }
  333. if err != nil {
  334. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  335. return "application/octet-stream"
  336. }
  337. f.Seek(0, io.SeekStart)
  338. mimeType := http.DetectContentType(head[:n])
  339. return mimeType
  340. }
  341. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  342. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  343. client := filer_pb.NewSeaweedFilerClient(clientConn)
  344. return fn(client)
  345. }, filerAddress, grpcDialOption)
  346. }