You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
9.5 KiB

7 years ago
7 years ago
7 years ago
  1. package command
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/url"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "context"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "io"
  14. "net/http"
  15. "strconv"
  16. "time"
  17. )
  18. var (
  19. copy CopyOptions
  20. )
  21. type CopyOptions struct {
  22. filerGrpcPort *int
  23. master *string
  24. include *string
  25. replication *string
  26. collection *string
  27. ttl *string
  28. maxMB *int
  29. }
  30. func init() {
  31. cmdCopy.Run = runCopy // break init cycle
  32. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  33. copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  34. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  35. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  36. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  37. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  38. copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
  39. copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
  40. }
  41. var cmdCopy = &Command{
  42. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  43. Short: "copy one or a list of files to a filer folder",
  44. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  45. It can copy one or a list of files or folders.
  46. If copying a whole folder recursively:
  47. All files under the folder and subfolders will be copyed.
  48. Optional parameter "-include" allows you to specify the file name patterns.
  49. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  50. `,
  51. }
  52. func runCopy(cmd *Command, args []string) bool {
  53. if len(args) <= 1 {
  54. return false
  55. }
  56. filerDestination := args[len(args)-1]
  57. fileOrDirs := args[0 : len(args)-1]
  58. filerUrl, err := url.Parse(filerDestination)
  59. if err != nil {
  60. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  61. return false
  62. }
  63. urlPath := filerUrl.Path
  64. if !strings.HasSuffix(urlPath, "/") {
  65. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  66. return false
  67. }
  68. if filerUrl.Port() == "" {
  69. fmt.Printf("The filer port should be specified.\n")
  70. return false
  71. }
  72. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  73. if parseErr != nil {
  74. fmt.Printf("The filer port parse error: %v\n", parseErr)
  75. return false
  76. }
  77. filerGrpcPort := filerPort + 10000
  78. if *copy.filerGrpcPort != 0 {
  79. filerGrpcPort = uint64(*copy.filerGrpcPort)
  80. }
  81. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  82. for _, fileOrDir := range fileOrDirs {
  83. if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
  84. return false
  85. }
  86. }
  87. return true
  88. }
  89. func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
  90. f, err := os.Open(fileOrDir)
  91. if err != nil {
  92. fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
  93. return false
  94. }
  95. defer f.Close()
  96. fi, err := f.Stat()
  97. if err != nil {
  98. fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
  99. return false
  100. }
  101. mode := fi.Mode()
  102. if mode.IsDir() {
  103. files, _ := ioutil.ReadDir(fileOrDir)
  104. for _, subFileOrDir := range files {
  105. if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
  106. return false
  107. }
  108. }
  109. return true
  110. }
  111. // this is a regular file
  112. if *copy.include != "" {
  113. if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
  114. return true
  115. }
  116. }
  117. // find the chunk count
  118. chunkSize := int64(*copy.maxMB * 1024 * 1024)
  119. chunkCount := 1
  120. if chunkSize > 0 && fi.Size() > chunkSize {
  121. chunkCount = int(fi.Size()/chunkSize) + 1
  122. }
  123. if chunkCount == 1 {
  124. return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
  125. }
  126. return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
  127. }
  128. func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
  129. // upload the file content
  130. fileName := filepath.Base(f.Name())
  131. mimeType := detectMimeType(f)
  132. var chunks []*filer_pb.FileChunk
  133. if fi.Size() > 0 {
  134. // assign a volume
  135. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  136. Count: 1,
  137. Replication: *copy.replication,
  138. Collection: *copy.collection,
  139. Ttl: *copy.ttl,
  140. })
  141. if err != nil {
  142. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  143. }
  144. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  145. uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth)
  146. if err != nil {
  147. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  148. return false
  149. }
  150. if uploadResult.Error != "" {
  151. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  152. return false
  153. }
  154. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  155. chunks = append(chunks, &filer_pb.FileChunk{
  156. FileId: assignResult.Fid,
  157. Offset: 0,
  158. Size: uint64(uploadResult.Size),
  159. Mtime: time.Now().UnixNano(),
  160. ETag: uploadResult.ETag,
  161. })
  162. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  163. }
  164. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  165. request := &filer_pb.CreateEntryRequest{
  166. Directory: urlFolder,
  167. Entry: &filer_pb.Entry{
  168. Name: fileName,
  169. Attributes: &filer_pb.FuseAttributes{
  170. Crtime: time.Now().Unix(),
  171. Mtime: time.Now().Unix(),
  172. Gid: uint32(os.Getgid()),
  173. Uid: uint32(os.Getuid()),
  174. FileSize: uint64(fi.Size()),
  175. FileMode: uint32(fi.Mode()),
  176. Mime: mimeType,
  177. Replication: *copy.replication,
  178. Collection: *copy.collection,
  179. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  180. },
  181. Chunks: chunks,
  182. },
  183. }
  184. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  185. return fmt.Errorf("update fh: %v", err)
  186. }
  187. return nil
  188. }); err != nil {
  189. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  190. return false
  191. }
  192. return true
  193. }
  194. func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
  195. fileName := filepath.Base(f.Name())
  196. mimeType := detectMimeType(f)
  197. var chunks []*filer_pb.FileChunk
  198. for i := int64(0); i < int64(chunkCount); i++ {
  199. // assign a volume
  200. assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
  201. Count: 1,
  202. Replication: *copy.replication,
  203. Collection: *copy.collection,
  204. Ttl: *copy.ttl,
  205. })
  206. if err != nil {
  207. fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
  208. }
  209. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  210. uploadResult, err := operation.Upload(targetUrl,
  211. fileName+"-"+strconv.FormatInt(i+1, 10),
  212. io.LimitReader(f, chunkSize),
  213. false, "application/octet-stream", nil, assignResult.Auth)
  214. if err != nil {
  215. fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  216. return false
  217. }
  218. if uploadResult.Error != "" {
  219. fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  220. return false
  221. }
  222. chunks = append(chunks, &filer_pb.FileChunk{
  223. FileId: assignResult.Fid,
  224. Offset: i * chunkSize,
  225. Size: uint64(uploadResult.Size),
  226. Mtime: time.Now().UnixNano(),
  227. ETag: uploadResult.ETag,
  228. })
  229. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  230. }
  231. if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
  232. request := &filer_pb.CreateEntryRequest{
  233. Directory: urlFolder,
  234. Entry: &filer_pb.Entry{
  235. Name: fileName,
  236. Attributes: &filer_pb.FuseAttributes{
  237. Crtime: time.Now().Unix(),
  238. Mtime: time.Now().Unix(),
  239. Gid: uint32(os.Getgid()),
  240. Uid: uint32(os.Getuid()),
  241. FileSize: uint64(fi.Size()),
  242. FileMode: uint32(fi.Mode()),
  243. Mime: mimeType,
  244. Replication: *copy.replication,
  245. Collection: *copy.collection,
  246. TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
  247. },
  248. Chunks: chunks,
  249. },
  250. }
  251. if _, err := client.CreateEntry(context.Background(), request); err != nil {
  252. return fmt.Errorf("update fh: %v", err)
  253. }
  254. return nil
  255. }); err != nil {
  256. fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
  257. return false
  258. }
  259. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
  260. return true
  261. }
  262. func detectMimeType(f *os.File) string {
  263. head := make([]byte, 512)
  264. f.Seek(0, io.SeekStart)
  265. n, err := f.Read(head)
  266. if err == io.EOF {
  267. return ""
  268. }
  269. if err != nil {
  270. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  271. return "application/octet-stream"
  272. }
  273. f.Seek(0, io.SeekStart)
  274. mimeType := http.DetectContentType(head[:n])
  275. return mimeType
  276. }
  277. func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
  278. grpcConnection, err := util.GrpcDial(filerAddress)
  279. if err != nil {
  280. return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
  281. }
  282. defer grpcConnection.Close()
  283. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  284. return fn(client)
  285. }