You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 lines
7.5 KiB

9 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
3 years ago
6 years ago
3 years ago
9 years ago
3 years ago
  1. package operation
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "io"
  6. "math/rand/v2"
  7. "mime"
  8. "net/url"
  9. "os"
  10. "path"
  11. "strconv"
  12. "strings"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/security"
  16. )
  17. type FilePart struct {
  18. Reader io.Reader
  19. FileName string
  20. FileSize int64
  21. MimeType string
  22. ModTime int64 //in seconds
  23. Pref StoragePreference
  24. Server string //this comes from assign result
  25. Fid string //this comes from assign result, but customizable
  26. Fsync bool
  27. }
  28. type SubmitResult struct {
  29. FileName string `json:"fileName,omitempty"`
  30. FileUrl string `json:"url,omitempty"`
  31. Fid string `json:"fid,omitempty"`
  32. Size uint32 `json:"size,omitempty"`
  33. Error string `json:"error,omitempty"`
  34. }
  35. type StoragePreference struct {
  36. Replication string
  37. Collection string
  38. DataCenter string
  39. Ttl string
  40. DiskType string
  41. MaxMB int
  42. }
  43. type GetMasterFn func(ctx context.Context) pb.ServerAddress
  44. func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) {
  45. results := make([]SubmitResult, len(files))
  46. for index, file := range files {
  47. results[index].FileName = file.FileName
  48. }
  49. ar := &VolumeAssignRequest{
  50. Count: uint64(len(files)),
  51. Replication: pref.Replication,
  52. Collection: pref.Collection,
  53. DataCenter: pref.DataCenter,
  54. Ttl: pref.Ttl,
  55. DiskType: pref.DiskType,
  56. }
  57. ret, err := Assign(masterFn, grpcDialOption, ar)
  58. if err != nil {
  59. for index := range files {
  60. results[index].Error = err.Error()
  61. }
  62. return results, err
  63. }
  64. for index, file := range files {
  65. file.Fid = ret.Fid
  66. if index > 0 {
  67. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  68. }
  69. file.Server = ret.Url
  70. if usePublicUrl {
  71. file.Server = ret.PublicUrl
  72. }
  73. file.Pref = pref
  74. results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
  75. if err != nil {
  76. results[index].Error = err.Error()
  77. }
  78. results[index].Fid = file.Fid
  79. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  80. }
  81. return results, nil
  82. }
  83. func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) {
  84. ret = make([]*FilePart, len(fullPathFilenames))
  85. for index, file := range fullPathFilenames {
  86. if ret[index], err = newFilePart(file); err != nil {
  87. return
  88. }
  89. }
  90. return
  91. }
  92. func newFilePart(fullPathFilename string) (ret *FilePart, err error) {
  93. ret = &FilePart{}
  94. fh, openErr := os.Open(fullPathFilename)
  95. if openErr != nil {
  96. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  97. return ret, openErr
  98. }
  99. ret.Reader = fh
  100. fi, fiErr := fh.Stat()
  101. if fiErr != nil {
  102. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  103. return ret, fiErr
  104. }
  105. ret.ModTime = fi.ModTime().UTC().Unix()
  106. ret.FileSize = fi.Size()
  107. ext := strings.ToLower(path.Ext(fullPathFilename))
  108. ret.FileName = fi.Name()
  109. if ext != "" {
  110. ret.MimeType = mime.TypeByExtension(ext)
  111. }
  112. return ret, nil
  113. }
  114. func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
  115. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  116. if fi.ModTime != 0 {
  117. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  118. }
  119. if fi.Fsync {
  120. fileUrl += "?fsync=true"
  121. }
  122. if closer, ok := fi.Reader.(io.Closer); ok {
  123. defer closer.Close()
  124. }
  125. baseName := path.Base(fi.FileName)
  126. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  127. chunkSize := int64(maxMB * 1024 * 1024)
  128. chunks := fi.FileSize/chunkSize + 1
  129. cm := ChunkManifest{
  130. Name: baseName,
  131. Size: fi.FileSize,
  132. Mime: fi.MimeType,
  133. Chunks: make([]*ChunkInfo, 0, chunks),
  134. }
  135. var ret *AssignResult
  136. var id string
  137. if fi.Pref.DataCenter != "" {
  138. ar := &VolumeAssignRequest{
  139. Count: uint64(chunks),
  140. Replication: fi.Pref.Replication,
  141. Collection: fi.Pref.Collection,
  142. Ttl: fi.Pref.Ttl,
  143. DiskType: fi.Pref.DiskType,
  144. }
  145. ret, err = Assign(masterFn, grpcDialOption, ar)
  146. if err != nil {
  147. return
  148. }
  149. }
  150. for i := int64(0); i < chunks; i++ {
  151. if fi.Pref.DataCenter == "" {
  152. ar := &VolumeAssignRequest{
  153. Count: 1,
  154. Replication: fi.Pref.Replication,
  155. Collection: fi.Pref.Collection,
  156. Ttl: fi.Pref.Ttl,
  157. DiskType: fi.Pref.DiskType,
  158. }
  159. ret, err = Assign(masterFn, grpcDialOption, ar)
  160. if err != nil {
  161. // delete all uploaded chunks
  162. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  163. return
  164. }
  165. id = ret.Fid
  166. } else {
  167. id = ret.Fid
  168. if i > 0 {
  169. id += "_" + strconv.FormatInt(i, 10)
  170. }
  171. }
  172. fileUrl := genFileUrl(ret, id, usePublicUrl)
  173. count, e := uploadOneChunk(
  174. baseName+"-"+strconv.FormatInt(i+1, 10),
  175. io.LimitReader(fi.Reader, chunkSize),
  176. masterFn, fileUrl,
  177. ret.Auth)
  178. if e != nil {
  179. // delete all uploaded chunks
  180. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  181. return 0, e
  182. }
  183. cm.Chunks = append(cm.Chunks,
  184. &ChunkInfo{
  185. Offset: i * chunkSize,
  186. Size: int64(count),
  187. Fid: id,
  188. },
  189. )
  190. retSize += count
  191. }
  192. err = uploadChunkedFileManifest(fileUrl, &cm, jwt)
  193. if err != nil {
  194. // delete all uploaded chunks
  195. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  196. }
  197. } else {
  198. uploadOption := &UploadOption{
  199. UploadUrl: fileUrl,
  200. Filename: baseName,
  201. Cipher: false,
  202. IsInputCompressed: false,
  203. MimeType: fi.MimeType,
  204. PairMap: nil,
  205. Jwt: jwt,
  206. }
  207. uploader, e := NewUploader()
  208. if e != nil {
  209. return 0, e
  210. }
  211. ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
  212. if e != nil {
  213. return 0, e
  214. }
  215. return ret.Size, e
  216. }
  217. return
  218. }
  219. func genFileUrl(ret *AssignResult, id string, usePublicUrl bool) string {
  220. fileUrl := "http://" + ret.Url + "/" + id
  221. if usePublicUrl {
  222. fileUrl = "http://" + ret.PublicUrl + "/" + id
  223. }
  224. for _, replica := range ret.Replicas {
  225. if rand.IntN(len(ret.Replicas)+1) == 0 {
  226. fileUrl = "http://" + replica.Url + "/" + id
  227. if usePublicUrl {
  228. fileUrl = "http://" + replica.PublicUrl + "/" + id
  229. }
  230. }
  231. }
  232. return fileUrl
  233. }
  234. func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
  235. fileUrl string, jwt security.EncodedJwt,
  236. ) (size uint32, e error) {
  237. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  238. uploadOption := &UploadOption{
  239. UploadUrl: fileUrl,
  240. Filename: filename,
  241. Cipher: false,
  242. IsInputCompressed: false,
  243. MimeType: "",
  244. PairMap: nil,
  245. Jwt: jwt,
  246. }
  247. uploader, uploaderError := NewUploader()
  248. if uploaderError != nil {
  249. return 0, uploaderError
  250. }
  251. uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
  252. if uploadError != nil {
  253. return 0, uploadError
  254. }
  255. return uploadResult.Size, nil
  256. }
  257. func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  258. buf, e := manifest.Marshal()
  259. if e != nil {
  260. return e
  261. }
  262. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  263. u, _ := url.Parse(fileUrl)
  264. q := u.Query()
  265. q.Set("cm", "true")
  266. u.RawQuery = q.Encode()
  267. uploadOption := &UploadOption{
  268. UploadUrl: u.String(),
  269. Filename: manifest.Name,
  270. Cipher: false,
  271. IsInputCompressed: false,
  272. MimeType: "application/json",
  273. PairMap: nil,
  274. Jwt: jwt,
  275. }
  276. uploader, e := NewUploader()
  277. if e != nil {
  278. return e
  279. }
  280. _, e = uploader.UploadData(buf, uploadOption)
  281. return e
  282. }