You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
5.1 KiB

9 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "io"
  5. "mime"
  6. "net/url"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. )
  14. type FilePart struct {
  15. Reader io.Reader
  16. FileName string
  17. FileSize int64
  18. IsGzipped bool
  19. MimeType string
  20. ModTime int64 //in seconds
  21. Replication string
  22. Collection string
  23. Ttl string
  24. Server string //this comes from assign result
  25. Fid string //this comes from assign result, but customizable
  26. }
  27. type SubmitResult struct {
  28. FileName string `json:"fileName,omitempty"`
  29. FileUrl string `json:"fileUrl,omitempty"`
  30. Fid string `json:"fid,omitempty"`
  31. Size uint32 `json:"size,omitempty"`
  32. Error string `json:"error,omitempty"`
  33. }
  34. func SubmitFiles(master string, files []FilePart,
  35. replication string, collection string, ttl string, maxMB int,
  36. secret security.Secret,
  37. ) ([]SubmitResult, error) {
  38. results := make([]SubmitResult, len(files))
  39. for index, file := range files {
  40. results[index].FileName = file.FileName
  41. }
  42. ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
  43. if err != nil {
  44. for index, _ := range files {
  45. results[index].Error = err.Error()
  46. }
  47. return results, err
  48. }
  49. for index, file := range files {
  50. file.Fid = ret.Fid
  51. if index > 0 {
  52. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  53. }
  54. file.Server = ret.Url
  55. file.Replication = replication
  56. file.Collection = collection
  57. results[index].Size, err = file.Upload(maxMB, master, secret)
  58. if err != nil {
  59. results[index].Error = err.Error()
  60. }
  61. results[index].Fid = file.Fid
  62. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  63. }
  64. return results, nil
  65. }
  66. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  67. ret = make([]FilePart, len(fullPathFilenames))
  68. for index, file := range fullPathFilenames {
  69. if ret[index], err = newFilePart(file); err != nil {
  70. return
  71. }
  72. }
  73. return
  74. }
  75. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  76. fh, openErr := os.Open(fullPathFilename)
  77. if openErr != nil {
  78. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  79. return ret, openErr
  80. }
  81. ret.Reader = fh
  82. if fi, fiErr := fh.Stat(); fiErr != nil {
  83. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  84. return ret, fiErr
  85. } else {
  86. ret.ModTime = fi.ModTime().UTC().Unix()
  87. ret.FileSize = fi.Size()
  88. }
  89. ext := strings.ToLower(path.Ext(fullPathFilename))
  90. ret.IsGzipped = ext == ".gz"
  91. if ret.IsGzipped {
  92. ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
  93. }
  94. ret.FileName = fullPathFilename
  95. if ext != "" {
  96. ret.MimeType = mime.TypeByExtension(ext)
  97. }
  98. return ret, nil
  99. }
  100. func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
  101. jwt := security.GenJwt(secret, fi.Fid)
  102. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  103. if fi.ModTime != 0 {
  104. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  105. }
  106. if closer, ok := fi.Reader.(io.Closer); ok {
  107. defer closer.Close()
  108. }
  109. baseName := path.Base(fi.FileName)
  110. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  111. chunkSize := int64(maxMB * 1024 * 1024)
  112. chunks := fi.FileSize/chunkSize + 1
  113. cm := ChunkManifest{
  114. Name: baseName,
  115. Size: fi.FileSize,
  116. Mime: fi.MimeType,
  117. Chunks: make([]*ChunkInfo, 0, chunks),
  118. }
  119. for i := int64(0); i < chunks; i++ {
  120. id, count, e := upload_one_chunk(
  121. baseName+"-"+strconv.FormatInt(i+1, 10),
  122. io.LimitReader(fi.Reader, chunkSize),
  123. master, fi.Replication, fi.Collection, fi.Ttl,
  124. jwt)
  125. if e != nil {
  126. // delete all uploaded chunks
  127. cm.DeleteChunks(master)
  128. return 0, e
  129. }
  130. cm.Chunks = append(cm.Chunks,
  131. &ChunkInfo{
  132. Offset: i * chunkSize,
  133. Size: int64(count),
  134. Fid: id,
  135. },
  136. )
  137. retSize += count
  138. }
  139. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  140. if err != nil {
  141. // delete all uploaded chunks
  142. cm.DeleteChunks(master)
  143. }
  144. } else {
  145. ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
  146. if e != nil {
  147. return 0, e
  148. }
  149. return ret.Size, e
  150. }
  151. return
  152. }
  153. func upload_one_chunk(filename string, reader io.Reader, master,
  154. replication string, collection string, ttl string, jwt security.EncodedJwt,
  155. ) (fid string, size uint32, e error) {
  156. ret, err := Assign(master, 1, replication, collection, ttl)
  157. if err != nil {
  158. return "", 0, err
  159. }
  160. fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
  161. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  162. uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
  163. "application/octet-stream", jwt)
  164. if uploadError != nil {
  165. return fid, 0, uploadError
  166. }
  167. return fid, uploadResult.Size, nil
  168. }
  169. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  170. buf, e := manifest.Marshal()
  171. if e != nil {
  172. return e
  173. }
  174. bufReader := bytes.NewReader(buf)
  175. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  176. u, _ := url.Parse(fileUrl)
  177. q := u.Query()
  178. q.Set("cm", "true")
  179. u.RawQuery = q.Encode()
  180. _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
  181. return e
  182. }