You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
5.6 KiB

9 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "io"
  5. "mime"
  6. "net/url"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. )
  14. type FilePart struct {
  15. Reader io.Reader
  16. FileName string
  17. FileSize int64
  18. MimeType string
  19. ModTime int64 //in seconds
  20. Replication string
  21. Collection string
  22. DataCenter string
  23. Ttl string
  24. Server string //this comes from assign result
  25. Fid string //this comes from assign result, but customizable
  26. }
  27. type SubmitResult struct {
  28. FileName string `json:"fileName,omitempty"`
  29. FileUrl string `json:"fileUrl,omitempty"`
  30. Fid string `json:"fid,omitempty"`
  31. Size uint32 `json:"size,omitempty"`
  32. Error string `json:"error,omitempty"`
  33. }
  34. func SubmitFiles(master string, files []FilePart,
  35. replication string, collection string, dataCenter string, ttl string, maxMB int,
  36. secret security.Secret,
  37. ) ([]SubmitResult, error) {
  38. results := make([]SubmitResult, len(files))
  39. for index, file := range files {
  40. results[index].FileName = file.FileName
  41. }
  42. ar := &VolumeAssignRequest{
  43. Count: uint64(len(files)),
  44. Replication: replication,
  45. Collection: collection,
  46. DataCenter: dataCenter,
  47. Ttl: ttl,
  48. }
  49. ret, err := Assign(master, ar)
  50. if err != nil {
  51. for index, _ := range files {
  52. results[index].Error = err.Error()
  53. }
  54. return results, err
  55. }
  56. for index, file := range files {
  57. file.Fid = ret.Fid
  58. if index > 0 {
  59. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  60. }
  61. file.Server = ret.Url
  62. file.Replication = replication
  63. file.Collection = collection
  64. file.DataCenter = dataCenter
  65. results[index].Size, err = file.Upload(maxMB, master, secret)
  66. if err != nil {
  67. results[index].Error = err.Error()
  68. }
  69. results[index].Fid = file.Fid
  70. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  71. }
  72. return results, nil
  73. }
  74. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  75. ret = make([]FilePart, len(fullPathFilenames))
  76. for index, file := range fullPathFilenames {
  77. if ret[index], err = newFilePart(file); err != nil {
  78. return
  79. }
  80. }
  81. return
  82. }
  83. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  84. fh, openErr := os.Open(fullPathFilename)
  85. if openErr != nil {
  86. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  87. return ret, openErr
  88. }
  89. ret.Reader = fh
  90. fi, fiErr := fh.Stat()
  91. if fiErr != nil {
  92. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  93. return ret, fiErr
  94. }
  95. ret.ModTime = fi.ModTime().UTC().Unix()
  96. ret.FileSize = fi.Size()
  97. ext := strings.ToLower(path.Ext(fullPathFilename))
  98. ret.FileName = fi.Name()
  99. if ext != "" {
  100. ret.MimeType = mime.TypeByExtension(ext)
  101. }
  102. return ret, nil
  103. }
  104. func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
  105. jwt := security.GenJwt(secret, fi.Fid)
  106. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  107. if fi.ModTime != 0 {
  108. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  109. }
  110. if closer, ok := fi.Reader.(io.Closer); ok {
  111. defer closer.Close()
  112. }
  113. baseName := path.Base(fi.FileName)
  114. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  115. chunkSize := int64(maxMB * 1024 * 1024)
  116. chunks := fi.FileSize/chunkSize + 1
  117. cm := ChunkManifest{
  118. Name: baseName,
  119. Size: fi.FileSize,
  120. Mime: fi.MimeType,
  121. Chunks: make([]*ChunkInfo, 0, chunks),
  122. }
  123. var ret *AssignResult
  124. var id string
  125. if fi.DataCenter != "" {
  126. ar := &VolumeAssignRequest{
  127. Count: uint64(chunks),
  128. Replication: fi.Replication,
  129. Collection: fi.Collection,
  130. Ttl: fi.Ttl,
  131. }
  132. ret, err = Assign(master, ar)
  133. if err != nil {
  134. return
  135. }
  136. }
  137. for i := int64(0); i < chunks; i++ {
  138. if fi.DataCenter == "" {
  139. ar := &VolumeAssignRequest{
  140. Count: 1,
  141. Replication: fi.Replication,
  142. Collection: fi.Collection,
  143. Ttl: fi.Ttl,
  144. }
  145. ret, err = Assign(master, ar)
  146. if err != nil {
  147. // delete all uploaded chunks
  148. cm.DeleteChunks(master)
  149. return
  150. }
  151. id = ret.Fid
  152. } else {
  153. id = ret.Fid
  154. if i > 0 {
  155. id += "_" + strconv.FormatInt(i, 10)
  156. }
  157. }
  158. fileUrl := "http://" + ret.Url + "/" + id
  159. count, e := upload_one_chunk(
  160. baseName+"-"+strconv.FormatInt(i+1, 10),
  161. io.LimitReader(fi.Reader, chunkSize),
  162. master, fileUrl,
  163. jwt)
  164. if e != nil {
  165. // delete all uploaded chunks
  166. cm.DeleteChunks(master)
  167. return 0, e
  168. }
  169. cm.Chunks = append(cm.Chunks,
  170. &ChunkInfo{
  171. Offset: i * chunkSize,
  172. Size: int64(count),
  173. Fid: id,
  174. },
  175. )
  176. retSize += count
  177. }
  178. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  179. if err != nil {
  180. // delete all uploaded chunks
  181. cm.DeleteChunks(master)
  182. }
  183. } else {
  184. ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
  185. if e != nil {
  186. return 0, e
  187. }
  188. return ret.Size, e
  189. }
  190. return
  191. }
  192. func upload_one_chunk(filename string, reader io.Reader, master,
  193. fileUrl string, jwt security.EncodedJwt,
  194. ) (size uint32, e error) {
  195. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  196. uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
  197. "application/octet-stream", nil, jwt)
  198. if uploadError != nil {
  199. return 0, uploadError
  200. }
  201. return uploadResult.Size, nil
  202. }
  203. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  204. buf, e := manifest.Marshal()
  205. if e != nil {
  206. return e
  207. }
  208. bufReader := bytes.NewReader(buf)
  209. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  210. u, _ := url.Parse(fileUrl)
  211. q := u.Query()
  212. q.Set("cm", "true")
  213. u.RawQuery = q.Encode()
  214. _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
  215. return e
  216. }