You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
9.6 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/url"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "context"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "io"
  15. "net/http"
  16. "strconv"
  17. "time"
  18. )
  19. var (
  20. copy CopyOptions
  21. )
  22. type CopyOptions struct {
  23. filerGrpcPort *int
  24. master *string
  25. include *string
  26. replication *string
  27. collection *string
  28. ttl *string
  29. maxMB *int
  30. secretKey *string
  31. secret security.Secret
  32. }
  33. func init() {
  34. cmdCopy.Run = runCopy // break init cycle
  35. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  36. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  37. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  38. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  39. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  40. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  41. copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
  42. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  43. copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
  44. }
  45. var cmdCopy = &Command{
  46. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  47. Short: "copy one or a list of files to a filer folder",
  48. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  49. It can copy one or a list of files or folders.
  50. If copying a whole folder recursively:
  51. All files under the folder and subfolders will be copyed.
  52. Optional parameter "-include" allows you to specify the file name patterns.
  53. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  54. `,
  55. }
  56. func runCopy(cmd *Command, args []string) bool {
  57. copy.secret = security.Secret(*copy.secretKey)
  58. if len(args) <= 1 {
  59. return false
  60. }
  61. filerDestination := args[len(args)-1]
  62. fileOrDirs := args[0 : len(args)-1]
  63. filerUrl, err := url.Parse(filerDestination)
  64. if err != nil {
  65. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  66. return false
  67. }
  68. urlPath := filerUrl.Path
  69. if !strings.HasSuffix(urlPath, "/") {
  70. urlPath = urlPath + "/"
  71. }
  72. if filerUrl.Port() == "" {
  73. fmt.Printf("The filer port should be specified.\n")
  74. return false
  75. }
  76. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  77. if parseErr != nil {
  78. fmt.Printf("The filer port parse error: %v\n", parseErr)
  79. return false
  80. }
  81. filerGrpcPort := filerPort + 10000
  82. if *copy.filerGrpcPort != 0 {
  83. filerGrpcPort = uint64(*copy.filerGrpcPort)
  84. }
  85. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  86. for _, fileOrDir := range fileOrDirs {
  87. if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
  88. return false
  89. }
  90. }
  91. return true
  92. }
  93. func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
  94. f, err := os.Open(fileOrDir)
  95. if err != nil {
  96. fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
  97. return false
  98. }
  99. defer f.Close()
  100. fi, err := f.Stat()
  101. if err != nil {
  102. fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
  103. return false
  104. }
  105. mode := fi.Mode()
  106. if mode.IsDir() {
  107. files, _ := ioutil.ReadDir(fileOrDir)
  108. for _, subFileOrDir := range files {
  109. if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
  110. return false
  111. }
  112. }
  113. return true
  114. }
  115. // this is a regular file
  116. if *copy.include != "" {
  117. if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
  118. return true
  119. }
  120. }
  121. // find the chunk count
  122. chunkSize := int64(*copy.maxMB * 1024 * 1024)
  123. chunkCount := 1
  124. if chunkSize > 0 && fi.Size() > chunkSize {
  125. chunkCount = int(fi.Size()/chunkSize) + 1
  126. }
  127. if chunkCount == 1 {
  128. return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
  129. }
  130. return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
  131. }
  132. func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
  133. // upload the file content
  134. fileName := filepath.Base(f.Name())
  135. mimeType := detectMimeType(f)
  136. var chunks []*filer_pb.FileChunk
  137. if fi.Size() > 0 {
  138. // assign a volume
  139. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  140. Count: 1,
  141. Replication: *copy.replication,
  142. Collection: *copy.collection,
  143. Ttl: *copy.ttl,
  144. })
  145. if err != nil {
  146. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  147. }
  148. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  149. uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, "")
  150. if err != nil {
  151. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  152. return false
  153. }
  154. if uploadResult.Error != "" {
  155. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  156. return false
  157. }
  158. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  159. chunks = append(chunks, &filer_pb.FileChunk{
  160. FileId: assignResult.Fid,
  161. Offset: 0,
  162. Size: uint64(uploadResult.Size),
  163. Mtime: time.Now().UnixNano(),
  164. ETag: uploadResult.ETag,
  165. })
  166. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  167. }
  168. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  169. request := &filer_pb.CreateEntryRequest{
  170. Directory: urlFolder,
  171. Entry: &filer_pb.Entry{
  172. Name: fileName,
  173. Attributes: &filer_pb.FuseAttributes{
  174. Crtime: time.Now().Unix(),
  175. Mtime: time.Now().Unix(),
  176. Gid: uint32(os.Getgid()),
  177. Uid: uint32(os.Getuid()),
  178. FileSize: uint64(fi.Size()),
  179. FileMode: uint32(fi.Mode()),
  180. Mime: mimeType,
  181. Replication: *copy.replication,
  182. Collection: *copy.collection,
  183. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  184. },
  185. Chunks: chunks,
  186. },
  187. }
  188. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  189. return fmt.Errorf("update fh: %v", err)
  190. }
  191. return nil
  192. }); err != nil {
  193. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  194. return false
  195. }
  196. return true
  197. }
  198. func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
  199. fileName := filepath.Base(f.Name())
  200. mimeType := detectMimeType(f)
  201. var chunks []*filer_pb.FileChunk
  202. for i := int64(0); i < int64(chunkCount); i++ {
  203. // assign a volume
  204. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  205. Count: 1,
  206. Replication: *copy.replication,
  207. Collection: *copy.collection,
  208. Ttl: *copy.ttl,
  209. })
  210. if err != nil {
  211. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  212. }
  213. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  214. uploadResult, err := operation.Upload(targetUrl,
  215. fileName+"-"+strconv.FormatInt(i+1, 10),
  216. io.LimitReader(f, chunkSize),
  217. false, "application/octet-stream", nil, "")
  218. if err != nil {
  219. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  220. return false
  221. }
  222. if uploadResult.Error != "" {
  223. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  224. return false
  225. }
  226. chunks = append(chunks, &filer_pb.FileChunk{
  227. FileId: assignResult.Fid,
  228. Offset: i * chunkSize,
  229. Size: uint64(uploadResult.Size),
  230. Mtime: time.Now().UnixNano(),
  231. ETag: uploadResult.ETag,
  232. })
  233. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  234. }
  235. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  236. request := &filer_pb.CreateEntryRequest{
  237. Directory: urlFolder,
  238. Entry: &filer_pb.Entry{
  239. Name: fileName,
  240. Attributes: &filer_pb.FuseAttributes{
  241. Crtime: time.Now().Unix(),
  242. Mtime: time.Now().Unix(),
  243. Gid: uint32(os.Getgid()),
  244. Uid: uint32(os.Getuid()),
  245. FileSize: uint64(fi.Size()),
  246. FileMode: uint32(fi.Mode()),
  247. Mime: mimeType,
  248. Replication: *copy.replication,
  249. Collection: *copy.collection,
  250. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  251. },
  252. Chunks: chunks,
  253. },
  254. }
  255. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  256. return fmt.Errorf("update fh: %v", err)
  257. }
  258. return nil
  259. }); err != nil {
  260. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  261. return false
  262. }
  263. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  264. return true
  265. }
  266. func detectMimeType(f *os.File) string {
  267. head := make([]byte, 512)
  268. f.Seek(0, 0)
  269. n, err := f.Read(head)
  270. if err == io.EOF {
  271. return ""
  272. }
  273. if err != nil {
  274. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  275. return "application/octet-stream"
  276. }
  277. f.Seek(0, 0)
  278. mimeType := http.DetectContentType(head[:n])
  279. return mimeType
  280. }
  281. func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
  282. grpcConnection, err := util.GrpcDial(filerAddress)
  283. if err != nil {
  284. return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
  285. }
  286. defer grpcConnection.Close()
  287. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  288. return fn(client)
  289. }