You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
5.8 KiB

9 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
6 years ago
9 years ago
  1. package operation
  2. import (
  3. "io"
  4. "mime"
  5. "net/url"
  6. "os"
  7. "path"
  8. "strconv"
  9. "strings"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. )
  14. type FilePart struct {
  15. Reader io.Reader
  16. FileName string
  17. FileSize int64
  18. MimeType string
  19. ModTime int64 //in seconds
  20. Replication string
  21. Collection string
  22. DataCenter string
  23. Ttl string
  24. Server string //this comes from assign result
  25. Fid string //this comes from assign result, but customizable
  26. }
  27. type SubmitResult struct {
  28. FileName string `json:"fileName,omitempty"`
  29. FileUrl string `json:"fileUrl,omitempty"`
  30. Fid string `json:"fid,omitempty"`
  31. Size uint32 `json:"size,omitempty"`
  32. Error string `json:"error,omitempty"`
  33. }
  34. func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
  35. results := make([]SubmitResult, len(files))
  36. for index, file := range files {
  37. results[index].FileName = file.FileName
  38. }
  39. ar := &VolumeAssignRequest{
  40. Count: uint64(len(files)),
  41. Replication: replication,
  42. Collection: collection,
  43. DataCenter: dataCenter,
  44. Ttl: ttl,
  45. }
  46. ret, err := Assign(master, grpcDialOption, ar)
  47. if err != nil {
  48. for index := range files {
  49. results[index].Error = err.Error()
  50. }
  51. return results, err
  52. }
  53. for index, file := range files {
  54. file.Fid = ret.Fid
  55. if index > 0 {
  56. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  57. }
  58. file.Server = ret.Url
  59. if usePublicUrl {
  60. file.Server = ret.PublicUrl
  61. }
  62. file.Replication = replication
  63. file.Collection = collection
  64. file.DataCenter = dataCenter
  65. results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
  66. if err != nil {
  67. results[index].Error = err.Error()
  68. }
  69. results[index].Fid = file.Fid
  70. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  71. }
  72. return results, nil
  73. }
  74. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  75. ret = make([]FilePart, len(fullPathFilenames))
  76. for index, file := range fullPathFilenames {
  77. if ret[index], err = newFilePart(file); err != nil {
  78. return
  79. }
  80. }
  81. return
  82. }
  83. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  84. fh, openErr := os.Open(fullPathFilename)
  85. if openErr != nil {
  86. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  87. return ret, openErr
  88. }
  89. ret.Reader = fh
  90. fi, fiErr := fh.Stat()
  91. if fiErr != nil {
  92. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  93. return ret, fiErr
  94. }
  95. ret.ModTime = fi.ModTime().UTC().Unix()
  96. ret.FileSize = fi.Size()
  97. ext := strings.ToLower(path.Ext(fullPathFilename))
  98. ret.FileName = fi.Name()
  99. if ext != "" {
  100. ret.MimeType = mime.TypeByExtension(ext)
  101. }
  102. return ret, nil
  103. }
  104. func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
  105. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  106. if fi.ModTime != 0 {
  107. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  108. }
  109. if closer, ok := fi.Reader.(io.Closer); ok {
  110. defer closer.Close()
  111. }
  112. baseName := path.Base(fi.FileName)
  113. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  114. chunkSize := int64(maxMB * 1024 * 1024)
  115. chunks := fi.FileSize/chunkSize + 1
  116. cm := ChunkManifest{
  117. Name: baseName,
  118. Size: fi.FileSize,
  119. Mime: fi.MimeType,
  120. Chunks: make([]*ChunkInfo, 0, chunks),
  121. }
  122. var ret *AssignResult
  123. var id string
  124. if fi.DataCenter != "" {
  125. ar := &VolumeAssignRequest{
  126. Count: uint64(chunks),
  127. Replication: fi.Replication,
  128. Collection: fi.Collection,
  129. Ttl: fi.Ttl,
  130. }
  131. ret, err = Assign(master, grpcDialOption, ar)
  132. if err != nil {
  133. return
  134. }
  135. }
  136. for i := int64(0); i < chunks; i++ {
  137. if fi.DataCenter == "" {
  138. ar := &VolumeAssignRequest{
  139. Count: 1,
  140. Replication: fi.Replication,
  141. Collection: fi.Collection,
  142. Ttl: fi.Ttl,
  143. }
  144. ret, err = Assign(master, grpcDialOption, ar)
  145. if err != nil {
  146. // delete all uploaded chunks
  147. cm.DeleteChunks(master, grpcDialOption)
  148. return
  149. }
  150. id = ret.Fid
  151. } else {
  152. id = ret.Fid
  153. if i > 0 {
  154. id += "_" + strconv.FormatInt(i, 10)
  155. }
  156. }
  157. fileUrl := "http://" + ret.Url + "/" + id
  158. count, e := upload_one_chunk(
  159. baseName+"-"+strconv.FormatInt(i+1, 10),
  160. io.LimitReader(fi.Reader, chunkSize),
  161. master, fileUrl,
  162. ret.Auth)
  163. if e != nil {
  164. // delete all uploaded chunks
  165. cm.DeleteChunks(master, grpcDialOption)
  166. return 0, e
  167. }
  168. cm.Chunks = append(cm.Chunks,
  169. &ChunkInfo{
  170. Offset: i * chunkSize,
  171. Size: int64(count),
  172. Fid: id,
  173. },
  174. )
  175. retSize += count
  176. }
  177. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  178. if err != nil {
  179. // delete all uploaded chunks
  180. cm.DeleteChunks(master, grpcDialOption)
  181. }
  182. } else {
  183. ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
  184. if e != nil {
  185. return 0, e
  186. }
  187. return ret.Size, e
  188. }
  189. return
  190. }
  191. func upload_one_chunk(filename string, reader io.Reader, master,
  192. fileUrl string, jwt security.EncodedJwt,
  193. ) (size uint32, e error) {
  194. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  195. uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
  196. if uploadError != nil {
  197. return 0, uploadError
  198. }
  199. return uploadResult.Size, nil
  200. }
  201. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  202. buf, e := manifest.Marshal()
  203. if e != nil {
  204. return e
  205. }
  206. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  207. u, _ := url.Parse(fileUrl)
  208. q := u.Query()
  209. q.Set("cm", "true")
  210. u.RawQuery = q.Encode()
  211. _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
  212. return e
  213. }