You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

352 lines
10 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/url"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "context"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "io"
  15. "net/http"
  16. "path"
  17. "strconv"
  18. "time"
  19. )
  20. var (
  21. copy CopyOptions
  22. )
  23. type CopyOptions struct {
  24. filerGrpcPort *int
  25. master *string
  26. include *string
  27. replication *string
  28. collection *string
  29. ttl *string
  30. maxMB *int
  31. secretKey *string
  32. secret security.Secret
  33. }
  34. func init() {
  35. cmdCopy.Run = runCopy // break init cycle
  36. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  37. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  38. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  39. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  40. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  41. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  42. copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
  43. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  44. copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
  45. }
  46. var cmdCopy = &Command{
  47. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  48. Short: "copy one or a list of files to a filer folder",
  49. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  50. It can copy one or a list of files or folders.
  51. If copying a whole folder recursively:
  52. All files under the folder and subfolders will be copyed.
  53. Optional parameter "-include" allows you to specify the file name patterns.
  54. If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
  55. This can save volume server's gzipped processing and allow customizable gzip compression level.
  56. The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
  57. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  58. `,
  59. }
  60. func runCopy(cmd *Command, args []string) bool {
  61. copy.secret = security.Secret(*copy.secretKey)
  62. if len(args) <= 1 {
  63. return false
  64. }
  65. filerDestination := args[len(args)-1]
  66. fileOrDirs := args[0 : len(args)-1]
  67. filerUrl, err := url.Parse(filerDestination)
  68. if err != nil {
  69. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  70. return false
  71. }
  72. urlPath := filerUrl.Path
  73. if !strings.HasSuffix(urlPath, "/") {
  74. urlPath = urlPath + "/"
  75. }
  76. if filerUrl.Port() == "" {
  77. fmt.Printf("The filer port should be specified.\n")
  78. return false
  79. }
  80. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  81. if parseErr != nil {
  82. fmt.Printf("The filer port parse error: %v\n", parseErr)
  83. return false
  84. }
  85. filerGrpcPort := filerPort + 10000
  86. if *copy.filerGrpcPort != 0 {
  87. filerGrpcPort = uint64(*copy.filerGrpcPort)
  88. }
  89. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  90. for _, fileOrDir := range fileOrDirs {
  91. if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
  92. return false
  93. }
  94. }
  95. return true
  96. }
  97. func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
  98. f, err := os.Open(fileOrDir)
  99. if err != nil {
  100. fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
  101. return false
  102. }
  103. defer f.Close()
  104. fi, err := f.Stat()
  105. if err != nil {
  106. fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
  107. return false
  108. }
  109. mode := fi.Mode()
  110. if mode.IsDir() {
  111. files, _ := ioutil.ReadDir(fileOrDir)
  112. for _, subFileOrDir := range files {
  113. if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
  114. return false
  115. }
  116. }
  117. return true
  118. }
  119. // this is a regular file
  120. if *copy.include != "" {
  121. if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
  122. return true
  123. }
  124. }
  125. // find the chunk count
  126. chunkSize := int64(*copy.maxMB * 1024 * 1024)
  127. chunkCount := 1
  128. if chunkSize > 0 && fi.Size() > chunkSize {
  129. chunkCount = int(fi.Size()/chunkSize) + 1
  130. }
  131. if chunkCount == 1 {
  132. return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
  133. }
  134. return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
  135. }
  136. func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
  137. // upload the file content
  138. fileName := filepath.Base(f.Name())
  139. mimeType := detectMimeType(f)
  140. isGzipped := isGzipped(fileName)
  141. var chunks []*filer_pb.FileChunk
  142. if fi.Size() > 0 {
  143. // assign a volume
  144. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  145. Count: 1,
  146. Replication: *copy.replication,
  147. Collection: *copy.collection,
  148. Ttl: *copy.ttl,
  149. })
  150. if err != nil {
  151. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  152. }
  153. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  154. uploadResult, err := operation.Upload(targetUrl, fileName, f, isGzipped, mimeType, nil, "")
  155. if err != nil {
  156. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  157. return false
  158. }
  159. if uploadResult.Error != "" {
  160. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  161. return false
  162. }
  163. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  164. chunks = append(chunks, &filer_pb.FileChunk{
  165. FileId: assignResult.Fid,
  166. Offset: 0,
  167. Size: uint64(uploadResult.Size),
  168. Mtime: time.Now().UnixNano(),
  169. ETag: uploadResult.ETag,
  170. })
  171. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  172. }
  173. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  174. request := &filer_pb.CreateEntryRequest{
  175. Directory: urlFolder,
  176. Entry: &filer_pb.Entry{
  177. Name: fileName,
  178. Attributes: &filer_pb.FuseAttributes{
  179. Crtime: time.Now().Unix(),
  180. Mtime: time.Now().Unix(),
  181. Gid: uint32(os.Getgid()),
  182. Uid: uint32(os.Getuid()),
  183. FileSize: uint64(fi.Size()),
  184. FileMode: uint32(fi.Mode()),
  185. Mime: mimeType,
  186. Replication: *copy.replication,
  187. Collection: *copy.collection,
  188. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  189. },
  190. Chunks: chunks,
  191. },
  192. }
  193. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  194. return fmt.Errorf("update fh: %v", err)
  195. }
  196. return nil
  197. }); err != nil {
  198. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  199. return false
  200. }
  201. return true
  202. }
  203. func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
  204. fileName := filepath.Base(f.Name())
  205. mimeType := detectMimeType(f)
  206. var chunks []*filer_pb.FileChunk
  207. for i := int64(0); i < int64(chunkCount); i++ {
  208. // assign a volume
  209. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  210. Count: 1,
  211. Replication: *copy.replication,
  212. Collection: *copy.collection,
  213. Ttl: *copy.ttl,
  214. })
  215. if err != nil {
  216. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  217. }
  218. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  219. uploadResult, err := operation.Upload(targetUrl,
  220. fileName+"-"+strconv.FormatInt(i+1, 10),
  221. io.LimitReader(f, chunkSize),
  222. false, "application/octet-stream", nil, "")
  223. if err != nil {
  224. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  225. return false
  226. }
  227. if uploadResult.Error != "" {
  228. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  229. return false
  230. }
  231. chunks = append(chunks, &filer_pb.FileChunk{
  232. FileId: assignResult.Fid,
  233. Offset: i * chunkSize,
  234. Size: uint64(uploadResult.Size),
  235. Mtime: time.Now().UnixNano(),
  236. ETag: uploadResult.ETag,
  237. })
  238. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  239. }
  240. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  241. request := &filer_pb.CreateEntryRequest{
  242. Directory: urlFolder,
  243. Entry: &filer_pb.Entry{
  244. Name: fileName,
  245. Attributes: &filer_pb.FuseAttributes{
  246. Crtime: time.Now().Unix(),
  247. Mtime: time.Now().Unix(),
  248. Gid: uint32(os.Getgid()),
  249. Uid: uint32(os.Getuid()),
  250. FileSize: uint64(fi.Size()),
  251. FileMode: uint32(fi.Mode()),
  252. Mime: mimeType,
  253. Replication: *copy.replication,
  254. Collection: *copy.collection,
  255. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  256. },
  257. Chunks: chunks,
  258. },
  259. }
  260. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  261. return fmt.Errorf("update fh: %v", err)
  262. }
  263. return nil
  264. }); err != nil {
  265. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  266. return false
  267. }
  268. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  269. return true
  270. }
  271. func isGzipped(filename string) bool {
  272. return strings.ToLower(path.Ext(filename)) == ".gz"
  273. }
  274. func detectMimeType(f *os.File) string {
  275. head := make([]byte, 512)
  276. f.Seek(0, 0)
  277. n, err := f.Read(head)
  278. if err == io.EOF {
  279. return ""
  280. }
  281. if err != nil {
  282. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  283. return "application/octet-stream"
  284. }
  285. f.Seek(0, 0)
  286. mimeType := http.DetectContentType(head[:n])
  287. return mimeType
  288. }
  289. func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
  290. grpcConnection, err := util.GrpcDial(filerAddress)
  291. if err != nil {
  292. return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
  293. }
  294. defer grpcConnection.Close()
  295. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  296. return fn(client)
  297. }