You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
5.2 KiB

9 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "io"
  5. "mime"
  6. "net/url"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. )
  14. type FilePart struct {
  15. Reader io.Reader
  16. FileName string
  17. FileSize int64
  18. IsGzipped bool
  19. MimeType string
  20. ModTime int64 //in seconds
  21. Replication string
  22. Collection string
  23. Ttl string
  24. Server string //this comes from assign result
  25. Fid string //this comes from assign result, but customizable
  26. }
  27. type SubmitResult struct {
  28. FileName string `json:"fileName,omitempty"`
  29. FileUrl string `json:"fileUrl,omitempty"`
  30. Fid string `json:"fid,omitempty"`
  31. Size uint32 `json:"size,omitempty"`
  32. Error string `json:"error,omitempty"`
  33. }
  34. func SubmitFiles(master string, files []FilePart,
  35. replication string, collection string, ttl string, maxMB int,
  36. secret security.Secret,
  37. ) ([]SubmitResult, error) {
  38. results := make([]SubmitResult, len(files))
  39. for index, file := range files {
  40. results[index].FileName = file.FileName
  41. }
  42. ar := &VolumeAssignRequest{
  43. Count: uint64(len(files)),
  44. Replication: replication,
  45. Collection: collection,
  46. Ttl: ttl,
  47. }
  48. ret, err := Assign(master, ar)
  49. if err != nil {
  50. for index, _ := range files {
  51. results[index].Error = err.Error()
  52. }
  53. return results, err
  54. }
  55. for index, file := range files {
  56. file.Fid = ret.Fid
  57. if index > 0 {
  58. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  59. }
  60. file.Server = ret.Url
  61. file.Replication = replication
  62. file.Collection = collection
  63. results[index].Size, err = file.Upload(maxMB, master, secret)
  64. if err != nil {
  65. results[index].Error = err.Error()
  66. }
  67. results[index].Fid = file.Fid
  68. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  69. }
  70. return results, nil
  71. }
  72. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  73. ret = make([]FilePart, len(fullPathFilenames))
  74. for index, file := range fullPathFilenames {
  75. if ret[index], err = newFilePart(file); err != nil {
  76. return
  77. }
  78. }
  79. return
  80. }
  81. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  82. fh, openErr := os.Open(fullPathFilename)
  83. if openErr != nil {
  84. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  85. return ret, openErr
  86. }
  87. ret.Reader = fh
  88. fi, fiErr := fh.Stat()
  89. if fiErr != nil {
  90. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  91. return ret, fiErr
  92. }
  93. ret.ModTime = fi.ModTime().UTC().Unix()
  94. ret.FileSize = fi.Size()
  95. ext := strings.ToLower(path.Ext(fullPathFilename))
  96. ret.IsGzipped = ext == ".gz"
  97. ret.FileName = fullPathFilename
  98. if ext != "" {
  99. ret.MimeType = mime.TypeByExtension(ext)
  100. }
  101. return ret, nil
  102. }
  103. func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
  104. jwt := security.GenJwt(secret, fi.Fid)
  105. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  106. if fi.ModTime != 0 {
  107. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  108. }
  109. if closer, ok := fi.Reader.(io.Closer); ok {
  110. defer closer.Close()
  111. }
  112. baseName := path.Base(fi.FileName)
  113. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  114. chunkSize := int64(maxMB * 1024 * 1024)
  115. chunks := fi.FileSize/chunkSize + 1
  116. cm := ChunkManifest{
  117. Name: baseName,
  118. Size: fi.FileSize,
  119. Mime: fi.MimeType,
  120. Chunks: make([]*ChunkInfo, 0, chunks),
  121. }
  122. for i := int64(0); i < chunks; i++ {
  123. id, count, e := upload_one_chunk(
  124. baseName+"-"+strconv.FormatInt(i+1, 10),
  125. io.LimitReader(fi.Reader, chunkSize),
  126. master, fi.Replication, fi.Collection, fi.Ttl,
  127. jwt)
  128. if e != nil {
  129. // delete all uploaded chunks
  130. cm.DeleteChunks(master)
  131. return 0, e
  132. }
  133. cm.Chunks = append(cm.Chunks,
  134. &ChunkInfo{
  135. Offset: i * chunkSize,
  136. Size: int64(count),
  137. Fid: id,
  138. },
  139. )
  140. retSize += count
  141. }
  142. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  143. if err != nil {
  144. // delete all uploaded chunks
  145. cm.DeleteChunks(master)
  146. }
  147. } else {
  148. ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
  149. if e != nil {
  150. return 0, e
  151. }
  152. return ret.Size, e
  153. }
  154. return
  155. }
  156. func upload_one_chunk(filename string, reader io.Reader, master,
  157. replication string, collection string, ttl string, jwt security.EncodedJwt,
  158. ) (fid string, size uint32, e error) {
  159. ar := &VolumeAssignRequest{
  160. Count: 1,
  161. Replication: replication,
  162. Collection: collection,
  163. Ttl: ttl,
  164. }
  165. ret, err := Assign(master, ar)
  166. if err != nil {
  167. return "", 0, err
  168. }
  169. fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
  170. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  171. uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
  172. "application/octet-stream", nil, jwt)
  173. if uploadError != nil {
  174. return fid, 0, uploadError
  175. }
  176. return fid, uploadResult.Size, nil
  177. }
  178. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  179. buf, e := manifest.Marshal()
  180. if e != nil {
  181. return e
  182. }
  183. bufReader := bytes.NewReader(buf)
  184. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  185. u, _ := url.Parse(fileUrl)
  186. q := u.Query()
  187. q.Set("cm", "true")
  188. u.RawQuery = q.Encode()
  189. _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
  190. return e
  191. }