You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
9.1 KiB

  1. package command
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/url"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "path"
  12. "net/http"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "strconv"
  15. "io"
  16. "time"
  17. "google.golang.org/grpc"
  18. "context"
  19. )
  20. var (
  21. copy CopyOptions
  22. )
  23. type CopyOptions struct {
  24. master *string
  25. include *string
  26. replication *string
  27. collection *string
  28. ttl *string
  29. maxMB *int
  30. secretKey *string
  31. secret security.Secret
  32. }
  33. func init() {
  34. cmdCopy.Run = runCopy // break init cycle
  35. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  36. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  37. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  38. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  39. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  40. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  41. copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
  42. copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
  43. }
  44. var cmdCopy = &Command{
  45. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  46. Short: "copy one or a list of files to a filer folder",
  47. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  48. It can copy one or a list of files or folders.
  49. If copying a whole folder recursively:
  50. All files under the folder and subfolders will be copyed.
  51. Optional parameter "-include" allows you to specify the file name patterns.
  52. If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
  53. This can save volume server's gzipped processing and allow customizable gzip compression level.
  54. The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
  55. If "maxMB" is set to a positive number, files larger than it would be split into chunks and copyed separatedly.
  56. The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
  57. `,
  58. }
  59. func runCopy(cmd *Command, args []string) bool {
  60. copy.secret = security.Secret(*copy.secretKey)
  61. if len(args) <= 1 {
  62. return false
  63. }
  64. filerDestination := args[len(args)-1]
  65. fileOrDirs := args[0: len(args)-1]
  66. filerUrl, err := url.Parse(filerDestination)
  67. if err != nil {
  68. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  69. return false
  70. }
  71. urlPath := filerUrl.Path
  72. if !strings.HasSuffix(urlPath, "/") {
  73. urlPath = urlPath + "/"
  74. }
  75. for _, fileOrDir := range fileOrDirs {
  76. if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) {
  77. return false
  78. }
  79. }
  80. return true
  81. }
  82. func doEachCopy(fileOrDir string, host string, path string) bool {
  83. f, err := os.Open(fileOrDir)
  84. if err != nil {
  85. fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
  86. return false
  87. }
  88. defer f.Close()
  89. fi, err := f.Stat()
  90. if err != nil {
  91. fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
  92. return false
  93. }
  94. mode := fi.Mode()
  95. if mode.IsDir() {
  96. files, _ := ioutil.ReadDir(fileOrDir)
  97. for _, subFileOrDir := range files {
  98. if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") {
  99. return false
  100. }
  101. }
  102. return true
  103. }
  104. // this is a regular file
  105. if *copy.include != "" {
  106. if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
  107. return true
  108. }
  109. }
  110. // find the chunk count
  111. chunkSize := int64(*copy.maxMB * 1024 * 1024)
  112. chunkCount := 1
  113. if chunkSize > 0 && fi.Size() > chunkSize {
  114. chunkCount = int(fi.Size()/chunkSize) + 1
  115. }
  116. if chunkCount == 1 {
  117. return uploadFileAsOne(host, path, f, fi)
  118. }
  119. return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize)
  120. }
  121. func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool {
  122. // upload the file content
  123. fileName := filepath.Base(f.Name())
  124. mimeType := detectMimeType(f)
  125. isGzipped := isGzipped(fileName)
  126. var chunks []*filer_pb.FileChunk
  127. if fi.Size() > 0 {
  128. // assign a volume
  129. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  130. Count: 1,
  131. Replication: *copy.replication,
  132. Collection: *copy.collection,
  133. Ttl: *copy.ttl,
  134. })
  135. if err != nil {
  136. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  137. }
  138. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  139. uploadResult, err := operation.Upload(targetUrl, fileName, f, isGzipped, mimeType, nil, "")
  140. if err != nil {
  141. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  142. return false
  143. }
  144. if uploadResult.Error != "" {
  145. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  146. return false
  147. }
  148. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  149. chunks = append(chunks, &filer_pb.FileChunk{
  150. FileId: assignResult.Fid,
  151. Offset: 0,
  152. Size: uint64(uploadResult.Size),
  153. Mtime: time.Now().UnixNano(),
  154. })
  155. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
  156. }
  157. if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
  158. request := &filer_pb.CreateEntryRequest{
  159. Directory: urlFolder,
  160. Entry: &filer_pb.Entry{
  161. Name: fileName,
  162. Attributes: &filer_pb.FuseAttributes{
  163. Crtime: time.Now().Unix(),
  164. Mtime: time.Now().Unix(),
  165. Gid: uint32(os.Getgid()),
  166. Uid: uint32(os.Getuid()),
  167. FileSize: uint64(fi.Size()),
  168. FileMode: uint32(fi.Mode()),
  169. Mime: mimeType,
  170. },
  171. Chunks: chunks,
  172. },
  173. }
  174. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  175. return fmt.Errorf("update fh: %v", err)
  176. }
  177. return nil
  178. }); err != nil {
  179. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
  180. return false
  181. }
  182. return true
  183. }
  184. func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
  185. fileName := filepath.Base(f.Name())
  186. mimeType := detectMimeType(f)
  187. var chunks []*filer_pb.FileChunk
  188. for i := int64(0); i < int64(chunkCount); i++ {
  189. // assign a volume
  190. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  191. Count: 1,
  192. Replication: *copy.replication,
  193. Collection: *copy.collection,
  194. Ttl: *copy.ttl,
  195. })
  196. if err != nil {
  197. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  198. }
  199. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  200. uploadResult, err := operation.Upload(targetUrl,
  201. fileName+"-"+strconv.FormatInt(i+1, 10),
  202. io.LimitReader(f, chunkSize),
  203. false, "application/octet-stream", nil, "")
  204. if err != nil {
  205. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  206. return false
  207. }
  208. if uploadResult.Error != "" {
  209. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  210. return false
  211. }
  212. chunks = append(chunks, &filer_pb.FileChunk{
  213. FileId: assignResult.Fid,
  214. Offset: i * chunkSize,
  215. Size: uint64(uploadResult.Size),
  216. Mtime: time.Now().UnixNano(),
  217. })
  218. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  219. }
  220. if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
  221. request := &filer_pb.CreateEntryRequest{
  222. Directory: urlFolder,
  223. Entry: &filer_pb.Entry{
  224. Name: fileName,
  225. Attributes: &filer_pb.FuseAttributes{
  226. Crtime: time.Now().Unix(),
  227. Mtime: time.Now().Unix(),
  228. Gid: uint32(os.Getgid()),
  229. Uid: uint32(os.Getuid()),
  230. FileSize: uint64(fi.Size()),
  231. FileMode: uint32(fi.Mode()),
  232. Mime: mimeType,
  233. },
  234. Chunks: chunks,
  235. },
  236. }
  237. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  238. return fmt.Errorf("update fh: %v", err)
  239. }
  240. return nil
  241. }); err != nil {
  242. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
  243. return false
  244. }
  245. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
  246. return true
  247. }
  248. func isGzipped(filename string) bool {
  249. return strings.ToLower(path.Ext(filename)) == ".gz"
  250. }
  251. func detectMimeType(f *os.File) string {
  252. head := make([]byte, 512)
  253. f.Seek(0, 0)
  254. n, err := f.Read(head)
  255. if err == io.EOF {
  256. return ""
  257. }
  258. if err != nil {
  259. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  260. return "application/octet-stream"
  261. }
  262. f.Seek(0, 0)
  263. mimeType := http.DetectContentType(head[:n])
  264. return mimeType
  265. }
  266. func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
  267. grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure())
  268. if err != nil {
  269. return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
  270. }
  271. defer grpcConnection.Close()
  272. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  273. return fn(client)
  274. }