You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.7 KiB

9 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "io"
  5. "mime"
  6. "net/url"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. )
  14. type FilePart struct {
  15. Reader io.Reader
  16. FileName string
  17. FileSize int64
  18. IsGzipped bool
  19. MimeType string
  20. ModTime int64 //in seconds
  21. Replication string
  22. Collection string
  23. DataCenter string
  24. Ttl string
  25. Server string //this comes from assign result
  26. Fid string //this comes from assign result, but customizable
  27. }
  28. type SubmitResult struct {
  29. FileName string `json:"fileName,omitempty"`
  30. FileUrl string `json:"fileUrl,omitempty"`
  31. Fid string `json:"fid,omitempty"`
  32. Size uint32 `json:"size,omitempty"`
  33. Error string `json:"error,omitempty"`
  34. }
  35. func SubmitFiles(master string, files []FilePart,
  36. replication string, collection string, dataCenter string, ttl string, maxMB int,
  37. secret security.Secret,
  38. ) ([]SubmitResult, error) {
  39. results := make([]SubmitResult, len(files))
  40. for index, file := range files {
  41. results[index].FileName = file.FileName
  42. }
  43. ar := &VolumeAssignRequest{
  44. Count: uint64(len(files)),
  45. Replication: replication,
  46. Collection: collection,
  47. DataCenter: dataCenter,
  48. Ttl: ttl,
  49. }
  50. ret, err := Assign(master, ar)
  51. if err != nil {
  52. for index, _ := range files {
  53. results[index].Error = err.Error()
  54. }
  55. return results, err
  56. }
  57. for index, file := range files {
  58. file.Fid = ret.Fid
  59. if index > 0 {
  60. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  61. }
  62. file.Server = ret.Url
  63. file.Replication = replication
  64. file.Collection = collection
  65. file.DataCenter = dataCenter
  66. results[index].Size, err = file.Upload(maxMB, master, secret)
  67. if err != nil {
  68. results[index].Error = err.Error()
  69. }
  70. results[index].Fid = file.Fid
  71. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  72. }
  73. return results, nil
  74. }
  75. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  76. ret = make([]FilePart, len(fullPathFilenames))
  77. for index, file := range fullPathFilenames {
  78. if ret[index], err = newFilePart(file); err != nil {
  79. return
  80. }
  81. }
  82. return
  83. }
  84. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  85. fh, openErr := os.Open(fullPathFilename)
  86. if openErr != nil {
  87. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  88. return ret, openErr
  89. }
  90. ret.Reader = fh
  91. fi, fiErr := fh.Stat()
  92. if fiErr != nil {
  93. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  94. return ret, fiErr
  95. }
  96. ret.ModTime = fi.ModTime().UTC().Unix()
  97. ret.FileSize = fi.Size()
  98. ext := strings.ToLower(path.Ext(fullPathFilename))
  99. ret.IsGzipped = ext == ".gz"
  100. ret.FileName = fi.Name()
  101. if ext != "" {
  102. ret.MimeType = mime.TypeByExtension(ext)
  103. }
  104. return ret, nil
  105. }
  106. func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
  107. jwt := security.GenJwt(secret, fi.Fid)
  108. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  109. if fi.ModTime != 0 {
  110. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  111. }
  112. if closer, ok := fi.Reader.(io.Closer); ok {
  113. defer closer.Close()
  114. }
  115. baseName := path.Base(fi.FileName)
  116. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  117. chunkSize := int64(maxMB * 1024 * 1024)
  118. chunks := fi.FileSize/chunkSize + 1
  119. cm := ChunkManifest{
  120. Name: baseName,
  121. Size: fi.FileSize,
  122. Mime: fi.MimeType,
  123. Chunks: make([]*ChunkInfo, 0, chunks),
  124. }
  125. var ret *AssignResult
  126. var id string
  127. if fi.DataCenter != "" {
  128. ar := &VolumeAssignRequest{
  129. Count: uint64(chunks),
  130. Replication: fi.Replication,
  131. Collection: fi.Collection,
  132. Ttl: fi.Ttl,
  133. }
  134. ret, err = Assign(master, ar)
  135. if err != nil {
  136. return
  137. }
  138. }
  139. for i := int64(0); i < chunks; i++ {
  140. if fi.DataCenter == "" {
  141. ar := &VolumeAssignRequest{
  142. Count: 1,
  143. Replication: fi.Replication,
  144. Collection: fi.Collection,
  145. Ttl: fi.Ttl,
  146. }
  147. ret, err = Assign(master, ar)
  148. if err != nil {
  149. // delete all uploaded chunks
  150. cm.DeleteChunks(master)
  151. return
  152. }
  153. id = ret.Fid
  154. } else {
  155. id = ret.Fid
  156. if i > 0 {
  157. id += "_" + strconv.FormatInt(i, 10)
  158. }
  159. }
  160. fileUrl := "http://" + ret.Url + "/" + id
  161. count, e := upload_one_chunk(
  162. baseName+"-"+strconv.FormatInt(i+1, 10),
  163. io.LimitReader(fi.Reader, chunkSize),
  164. master, fileUrl,
  165. jwt)
  166. if e != nil {
  167. // delete all uploaded chunks
  168. cm.DeleteChunks(master)
  169. return 0, e
  170. }
  171. cm.Chunks = append(cm.Chunks,
  172. &ChunkInfo{
  173. Offset: i * chunkSize,
  174. Size: int64(count),
  175. Fid: id,
  176. },
  177. )
  178. retSize += count
  179. }
  180. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  181. if err != nil {
  182. // delete all uploaded chunks
  183. cm.DeleteChunks(master)
  184. }
  185. } else {
  186. ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
  187. if e != nil {
  188. return 0, e
  189. }
  190. return ret.Size, e
  191. }
  192. return
  193. }
  194. func upload_one_chunk(filename string, reader io.Reader, master,
  195. fileUrl string, jwt security.EncodedJwt,
  196. ) (size uint32, e error) {
  197. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  198. uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
  199. "application/octet-stream", nil, jwt)
  200. if uploadError != nil {
  201. return 0, uploadError
  202. }
  203. return uploadResult.Size, nil
  204. }
  205. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  206. buf, e := manifest.Marshal()
  207. if e != nil {
  208. return e
  209. }
  210. bufReader := bytes.NewReader(buf)
  211. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  212. u, _ := url.Parse(fileUrl)
  213. q := u.Query()
  214. q.Set("cm", "true")
  215. u.RawQuery = q.Encode()
  216. _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
  217. return e
  218. }