You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
10 KiB

7 years ago
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/security"
  8. "github.com/chrislusf/seaweedfs/weed/server"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/wdclient"
  11. "github.com/spf13/viper"
  12. "google.golang.org/grpc"
  13. "io"
  14. "io/ioutil"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. var (
  24. copy CopyOptions
  25. )
  26. type CopyOptions struct {
  27. filerGrpcPort *int
  28. master *string
  29. include *string
  30. replication *string
  31. collection *string
  32. ttl *string
  33. maxMB *int
  34. grpcDialOption grpc.DialOption
  35. masterClient *wdclient.MasterClient
  36. }
  37. func init() {
  38. cmdCopy.Run = runCopy // break init cycle
  39. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  40. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  41. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  42. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  43. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  44. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  45. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  46. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  47. }
  48. var cmdCopy = &Command{
  49. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  50. Short: "copy one or a list of files to a filer folder",
  51. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  52. It can copy one or a list of files or folders.
  53. If copying a whole folder recursively:
  54. All files under the folder and subfolders will be copyed.
  55. Optional parameter "-include" allows you to specify the file name patterns.
  56. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  57. `,
  58. }
  59. func runCopy(cmd *Command, args []string) bool {
  60. weed_server.LoadConfiguration("security", false)
  61. if len(args) <= 1 {
  62. return false
  63. }
  64. filerDestination := args[len(args)-1]
  65. fileOrDirs := args[0 : len(args)-1]
  66. filerUrl, err := url.Parse(filerDestination)
  67. if err != nil {
  68. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  69. return false
  70. }
  71. urlPath := filerUrl.Path
  72. if !strings.HasSuffix(urlPath, "/") {
  73. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  74. return false
  75. }
  76. if filerUrl.Port() == "" {
  77. fmt.Printf("The filer port should be specified.\n")
  78. return false
  79. }
  80. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  81. if parseErr != nil {
  82. fmt.Printf("The filer port parse error: %v\n", parseErr)
  83. return false
  84. }
  85. filerGrpcPort := filerPort + 10000
  86. if *copy.filerGrpcPort != 0 {
  87. filerGrpcPort = uint64(*copy.filerGrpcPort)
  88. }
  89. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  90. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  91. copy.masterClient = wdclient.NewMasterClient(context.Background(), copy.grpcDialOption, "client", strings.Split(*copy.master, ","))
  92. go copy.masterClient.KeepConnectedToMaster()
  93. copy.masterClient.WaitUntilConnected()
  94. for _, fileOrDir := range fileOrDirs {
  95. if !doEachCopy(context.Background(), fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) {
  96. return false
  97. }
  98. }
  99. return true
  100. }
  101. func doEachCopy(ctx context.Context, fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool {
  102. f, err := os.Open(fileOrDir)
  103. if err != nil {
  104. fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
  105. if _, ok := err.(*os.PathError); ok {
  106. fmt.Printf("skipping %s\n", fileOrDir)
  107. return true
  108. }
  109. return false
  110. }
  111. defer f.Close()
  112. fi, err := f.Stat()
  113. if err != nil {
  114. fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
  115. return false
  116. }
  117. mode := fi.Mode()
  118. if mode.IsDir() {
  119. files, _ := ioutil.ReadDir(fileOrDir)
  120. for _, subFileOrDir := range files {
  121. if !doEachCopy(ctx, fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") {
  122. return false
  123. }
  124. }
  125. return true
  126. }
  127. // this is a regular file
  128. if *copy.include != "" {
  129. if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
  130. return true
  131. }
  132. }
  133. // find the chunk count
  134. chunkSize := int64(*copy.maxMB * 1024 * 1024)
  135. chunkCount := 1
  136. if chunkSize > 0 && fi.Size() > chunkSize {
  137. chunkCount = int(fi.Size()/chunkSize) + 1
  138. }
  139. if chunkCount == 1 {
  140. return uploadFileAsOne(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi)
  141. }
  142. return uploadFileInChunks(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize)
  143. }
  144. func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool {
  145. // upload the file content
  146. fileName := filepath.Base(f.Name())
  147. mimeType := detectMimeType(f)
  148. var chunks []*filer_pb.FileChunk
  149. if fi.Size() > 0 {
  150. // assign a volume
  151. assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{
  152. Count: 1,
  153. Replication: *copy.replication,
  154. Collection: *copy.collection,
  155. Ttl: *copy.ttl,
  156. })
  157. if err != nil {
  158. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  159. }
  160. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  161. uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth)
  162. if err != nil {
  163. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  164. return false
  165. }
  166. if uploadResult.Error != "" {
  167. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  168. return false
  169. }
  170. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  171. chunks = append(chunks, &filer_pb.FileChunk{
  172. FileId: assignResult.Fid,
  173. Offset: 0,
  174. Size: uint64(uploadResult.Size),
  175. Mtime: time.Now().UnixNano(),
  176. ETag: uploadResult.ETag,
  177. })
  178. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  179. }
  180. if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  181. request := &filer_pb.CreateEntryRequest{
  182. Directory: urlFolder,
  183. Entry: &filer_pb.Entry{
  184. Name: fileName,
  185. Attributes: &filer_pb.FuseAttributes{
  186. Crtime: time.Now().Unix(),
  187. Mtime: time.Now().Unix(),
  188. Gid: uint32(os.Getgid()),
  189. Uid: uint32(os.Getuid()),
  190. FileSize: uint64(fi.Size()),
  191. FileMode: uint32(fi.Mode()),
  192. Mime: mimeType,
  193. Replication: *copy.replication,
  194. Collection: *copy.collection,
  195. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  196. },
  197. Chunks: chunks,
  198. },
  199. }
  200. if _, err := client.CreateEntry(ctx, request); err != nil {
  201. return fmt.Errorf("update fh: %v", err)
  202. }
  203. return nil
  204. }); err != nil {
  205. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  206. return false
  207. }
  208. return true
  209. }
  210. func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
  211. fileName := filepath.Base(f.Name())
  212. mimeType := detectMimeType(f)
  213. var chunks []*filer_pb.FileChunk
  214. for i := int64(0); i < int64(chunkCount); i++ {
  215. // assign a volume
  216. assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{
  217. Count: 1,
  218. Replication: *copy.replication,
  219. Collection: *copy.collection,
  220. Ttl: *copy.ttl,
  221. })
  222. if err != nil {
  223. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  224. }
  225. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  226. uploadResult, err := operation.Upload(targetUrl,
  227. fileName+"-"+strconv.FormatInt(i+1, 10),
  228. io.LimitReader(f, chunkSize),
  229. false, "application/octet-stream", nil, assignResult.Auth)
  230. if err != nil {
  231. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  232. return false
  233. }
  234. if uploadResult.Error != "" {
  235. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  236. return false
  237. }
  238. chunks = append(chunks, &filer_pb.FileChunk{
  239. FileId: assignResult.Fid,
  240. Offset: i * chunkSize,
  241. Size: uint64(uploadResult.Size),
  242. Mtime: time.Now().UnixNano(),
  243. ETag: uploadResult.ETag,
  244. })
  245. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  246. }
  247. if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  248. request := &filer_pb.CreateEntryRequest{
  249. Directory: urlFolder,
  250. Entry: &filer_pb.Entry{
  251. Name: fileName,
  252. Attributes: &filer_pb.FuseAttributes{
  253. Crtime: time.Now().Unix(),
  254. Mtime: time.Now().Unix(),
  255. Gid: uint32(os.Getgid()),
  256. Uid: uint32(os.Getuid()),
  257. FileSize: uint64(fi.Size()),
  258. FileMode: uint32(fi.Mode()),
  259. Mime: mimeType,
  260. Replication: *copy.replication,
  261. Collection: *copy.collection,
  262. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  263. },
  264. Chunks: chunks,
  265. },
  266. }
  267. if _, err := client.CreateEntry(ctx, request); err != nil {
  268. return fmt.Errorf("update fh: %v", err)
  269. }
  270. return nil
  271. }); err != nil {
  272. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  273. return false
  274. }
  275. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  276. return true
  277. }
  278. func detectMimeType(f *os.File) string {
  279. head := make([]byte, 512)
  280. f.Seek(0, io.SeekStart)
  281. n, err := f.Read(head)
  282. if err == io.EOF {
  283. return ""
  284. }
  285. if err != nil {
  286. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  287. return "application/octet-stream"
  288. }
  289. f.Seek(0, io.SeekStart)
  290. mimeType := http.DetectContentType(head[:n])
  291. return mimeType
  292. }
  293. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  294. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  295. client := filer_pb.NewSeaweedFilerClient(clientConn)
  296. return fn(client)
  297. }, filerAddress, grpcDialOption)
  298. }