288 lines
7.2 KiB

9 years ago
4 years ago
5 years ago
4 years ago
11 years ago
11 years ago
11 years ago
11 years ago
4 years ago
4 years ago
10 years ago
3 years ago
6 years ago
3 years ago
9 years ago
3 years ago
  1. package operation
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "io"
  6. "mime"
  7. "net/url"
  8. "os"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/security"
  15. )
  16. type FilePart struct {
  17. Reader io.Reader
  18. FileName string
  19. FileSize int64
  20. MimeType string
  21. ModTime int64 //in seconds
  22. Replication string
  23. Collection string
  24. DataCenter string
  25. Ttl string
  26. DiskType string
  27. Server string //this comes from assign result
  28. Fid string //this comes from assign result, but customizable
  29. Fsync bool
  30. }
  31. type SubmitResult struct {
  32. FileName string `json:"fileName,omitempty"`
  33. FileUrl string `json:"url,omitempty"`
  34. Fid string `json:"fid,omitempty"`
  35. Size uint32 `json:"size,omitempty"`
  36. Error string `json:"error,omitempty"`
  37. }
  38. type GetMasterFn func(ctx context.Context) pb.ServerAddress
  39. func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
  40. results := make([]SubmitResult, len(files))
  41. for index, file := range files {
  42. results[index].FileName = file.FileName
  43. }
  44. ar := &VolumeAssignRequest{
  45. Count: uint64(len(files)),
  46. Replication: replication,
  47. Collection: collection,
  48. DataCenter: dataCenter,
  49. Ttl: ttl,
  50. DiskType: diskType,
  51. }
  52. ret, err := Assign(masterFn, grpcDialOption, ar)
  53. if err != nil {
  54. for index := range files {
  55. results[index].Error = err.Error()
  56. }
  57. return results, err
  58. }
  59. for index, file := range files {
  60. file.Fid = ret.Fid
  61. if index > 0 {
  62. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  63. }
  64. file.Server = ret.Url
  65. if usePublicUrl {
  66. file.Server = ret.PublicUrl
  67. }
  68. file.Replication = replication
  69. file.Collection = collection
  70. file.DataCenter = dataCenter
  71. file.Ttl = ttl
  72. file.DiskType = diskType
  73. results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
  74. if err != nil {
  75. results[index].Error = err.Error()
  76. }
  77. results[index].Fid = file.Fid
  78. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  79. }
  80. return results, nil
  81. }
  82. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  83. ret = make([]FilePart, len(fullPathFilenames))
  84. for index, file := range fullPathFilenames {
  85. if ret[index], err = newFilePart(file); err != nil {
  86. return
  87. }
  88. }
  89. return
  90. }
  91. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  92. fh, openErr := os.Open(fullPathFilename)
  93. if openErr != nil {
  94. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  95. return ret, openErr
  96. }
  97. ret.Reader = fh
  98. fi, fiErr := fh.Stat()
  99. if fiErr != nil {
  100. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  101. return ret, fiErr
  102. }
  103. ret.ModTime = fi.ModTime().UTC().Unix()
  104. ret.FileSize = fi.Size()
  105. ext := strings.ToLower(path.Ext(fullPathFilename))
  106. ret.FileName = fi.Name()
  107. if ext != "" {
  108. ret.MimeType = mime.TypeByExtension(ext)
  109. }
  110. return ret, nil
  111. }
  112. func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
  113. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  114. if fi.ModTime != 0 {
  115. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  116. }
  117. if fi.Fsync {
  118. fileUrl += "?fsync=true"
  119. }
  120. if closer, ok := fi.Reader.(io.Closer); ok {
  121. defer closer.Close()
  122. }
  123. baseName := path.Base(fi.FileName)
  124. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  125. chunkSize := int64(maxMB * 1024 * 1024)
  126. chunks := fi.FileSize/chunkSize + 1
  127. cm := ChunkManifest{
  128. Name: baseName,
  129. Size: fi.FileSize,
  130. Mime: fi.MimeType,
  131. Chunks: make([]*ChunkInfo, 0, chunks),
  132. }
  133. var ret *AssignResult
  134. var id string
  135. if fi.DataCenter != "" {
  136. ar := &VolumeAssignRequest{
  137. Count: uint64(chunks),
  138. Replication: fi.Replication,
  139. Collection: fi.Collection,
  140. Ttl: fi.Ttl,
  141. DiskType: fi.DiskType,
  142. }
  143. ret, err = Assign(masterFn, grpcDialOption, ar)
  144. if err != nil {
  145. return
  146. }
  147. }
  148. for i := int64(0); i < chunks; i++ {
  149. if fi.DataCenter == "" {
  150. ar := &VolumeAssignRequest{
  151. Count: 1,
  152. Replication: fi.Replication,
  153. Collection: fi.Collection,
  154. Ttl: fi.Ttl,
  155. DiskType: fi.DiskType,
  156. }
  157. ret, err = Assign(masterFn, grpcDialOption, ar)
  158. if err != nil {
  159. // delete all uploaded chunks
  160. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  161. return
  162. }
  163. id = ret.Fid
  164. } else {
  165. id = ret.Fid
  166. if i > 0 {
  167. id += "_" + strconv.FormatInt(i, 10)
  168. }
  169. }
  170. fileUrl := "http://" + ret.Url + "/" + id
  171. if usePublicUrl {
  172. fileUrl = "http://" + ret.PublicUrl + "/" + id
  173. }
  174. count, e := upload_one_chunk(
  175. baseName+"-"+strconv.FormatInt(i+1, 10),
  176. io.LimitReader(fi.Reader, chunkSize),
  177. masterFn, fileUrl,
  178. ret.Auth)
  179. if e != nil {
  180. // delete all uploaded chunks
  181. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  182. return 0, e
  183. }
  184. cm.Chunks = append(cm.Chunks,
  185. &ChunkInfo{
  186. Offset: i * chunkSize,
  187. Size: int64(count),
  188. Fid: id,
  189. },
  190. )
  191. retSize += count
  192. }
  193. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  194. if err != nil {
  195. // delete all uploaded chunks
  196. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  197. }
  198. } else {
  199. uploadOption := &UploadOption{
  200. UploadUrl: fileUrl,
  201. Filename: baseName,
  202. Cipher: false,
  203. IsInputCompressed: false,
  204. MimeType: fi.MimeType,
  205. PairMap: nil,
  206. Jwt: jwt,
  207. }
  208. uploader, e := NewUploader()
  209. if e != nil {
  210. return 0, e
  211. }
  212. ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
  213. if e != nil {
  214. return 0, e
  215. }
  216. return ret.Size, e
  217. }
  218. return
  219. }
  220. func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
  221. fileUrl string, jwt security.EncodedJwt,
  222. ) (size uint32, e error) {
  223. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  224. uploadOption := &UploadOption{
  225. UploadUrl: fileUrl,
  226. Filename: filename,
  227. Cipher: false,
  228. IsInputCompressed: false,
  229. MimeType: "",
  230. PairMap: nil,
  231. Jwt: jwt,
  232. }
  233. uploader, uploaderError := NewUploader()
  234. if uploaderError != nil {
  235. return 0, uploaderError
  236. }
  237. uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
  238. if uploadError != nil {
  239. return 0, uploadError
  240. }
  241. return uploadResult.Size, nil
  242. }
  243. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  244. buf, e := manifest.Marshal()
  245. if e != nil {
  246. return e
  247. }
  248. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  249. u, _ := url.Parse(fileUrl)
  250. q := u.Query()
  251. q.Set("cm", "true")
  252. u.RawQuery = q.Encode()
  253. uploadOption := &UploadOption{
  254. UploadUrl: u.String(),
  255. Filename: manifest.Name,
  256. Cipher: false,
  257. IsInputCompressed: false,
  258. MimeType: "application/json",
  259. PairMap: nil,
  260. Jwt: jwt,
  261. }
  262. uploader, e := NewUploader()
  263. if e != nil {
  264. return e
  265. }
  266. _, e = uploader.UploadData(buf, uploadOption)
  267. return e
  268. }