You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
4.5 KiB

10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "io"
  5. "mime"
  6. "os"
  7. "path"
  8. "strconv"
  9. "strings"
  10. "github.com/chrislusf/weed-fs/go/glog"
  11. "github.com/chrislusf/weed-fs/go/security"
  12. )
  13. type FilePart struct {
  14. Reader io.Reader
  15. FileName string
  16. FileSize int64
  17. IsGzipped bool
  18. MimeType string
  19. ModTime int64 //in seconds
  20. Replication string
  21. Collection string
  22. Ttl string
  23. Server string //this comes from assign result
  24. Fid string //this comes from assign result, but customizable
  25. }
  26. type SubmitResult struct {
  27. FileName string `json:"fileName,omitempty"`
  28. FileUrl string `json:"fileUrl,omitempty"`
  29. Fid string `json:"fid,omitempty"`
  30. Size uint32 `json:"size,omitempty"`
  31. Error string `json:"error,omitempty"`
  32. }
  33. func SubmitFiles(master string, files []FilePart,
  34. replication string, collection string, ttl string, maxMB int,
  35. secret security.Secret,
  36. ) ([]SubmitResult, error) {
  37. results := make([]SubmitResult, len(files))
  38. for index, file := range files {
  39. results[index].FileName = file.FileName
  40. }
  41. ret, err := Assign(master, len(files), replication, collection, ttl)
  42. if err != nil {
  43. for index, _ := range files {
  44. results[index].Error = err.Error()
  45. }
  46. return results, err
  47. }
  48. for index, file := range files {
  49. file.Fid = ret.Fid
  50. if index > 0 {
  51. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  52. }
  53. file.Server = ret.PublicUrl
  54. file.Replication = replication
  55. file.Collection = collection
  56. results[index].Size, err = file.Upload(maxMB, master, secret)
  57. if err != nil {
  58. results[index].Error = err.Error()
  59. }
  60. results[index].Fid = file.Fid
  61. results[index].FileUrl = file.Server + "/" + file.Fid
  62. }
  63. return results, nil
  64. }
  65. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  66. ret = make([]FilePart, len(fullPathFilenames))
  67. for index, file := range fullPathFilenames {
  68. if ret[index], err = newFilePart(file); err != nil {
  69. return
  70. }
  71. }
  72. return
  73. }
  74. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  75. fh, openErr := os.Open(fullPathFilename)
  76. if openErr != nil {
  77. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  78. return ret, openErr
  79. }
  80. ret.Reader = fh
  81. if fi, fiErr := fh.Stat(); fiErr != nil {
  82. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  83. return ret, fiErr
  84. } else {
  85. ret.ModTime = fi.ModTime().UTC().Unix()
  86. ret.FileSize = fi.Size()
  87. }
  88. ext := strings.ToLower(path.Ext(fullPathFilename))
  89. ret.IsGzipped = ext == ".gz"
  90. if ret.IsGzipped {
  91. ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
  92. }
  93. ret.FileName = fullPathFilename
  94. if ext != "" {
  95. ret.MimeType = mime.TypeByExtension(ext)
  96. }
  97. return ret, nil
  98. }
  99. func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
  100. jwt := security.GenJwt(secret, fi.Fid)
  101. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  102. if fi.ModTime != 0 {
  103. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  104. }
  105. if closer, ok := fi.Reader.(io.Closer); ok {
  106. defer closer.Close()
  107. }
  108. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  109. chunkSize := int64(maxMB * 1024 * 1024)
  110. chunks := fi.FileSize/chunkSize + 1
  111. fids := make([]string, 0)
  112. for i := int64(0); i < chunks; i++ {
  113. id, count, e := upload_one_chunk(
  114. fi.FileName+"-"+strconv.FormatInt(i+1, 10),
  115. io.LimitReader(fi.Reader, chunkSize),
  116. master, fi.Replication, fi.Collection, fi.Ttl,
  117. jwt)
  118. if e != nil {
  119. return 0, e
  120. }
  121. fids = append(fids, id)
  122. retSize += count
  123. }
  124. err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt)
  125. } else {
  126. ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
  127. if e != nil {
  128. return 0, e
  129. }
  130. return ret.Size, e
  131. }
  132. return
  133. }
  134. func upload_one_chunk(filename string, reader io.Reader, master,
  135. replication string, collection string, ttl string, jwt security.EncodedJwt,
  136. ) (fid string, size uint32, e error) {
  137. ret, err := Assign(master, 1, replication, collection, ttl)
  138. if err != nil {
  139. return "", 0, err
  140. }
  141. fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
  142. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  143. uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
  144. "application/octet-stream", jwt)
  145. if uploadError != nil {
  146. return fid, 0, uploadError
  147. }
  148. return fid, uploadResult.Size, nil
  149. }
  150. func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error {
  151. var buf bytes.Buffer
  152. buf.WriteString(strings.Join(fids, "\n"))
  153. glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
  154. _, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt)
  155. return e
  156. }