You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
7.5 KiB

9 years ago
4 years ago
5 years ago
4 years ago
11 years ago
11 years ago
11 years ago
11 years ago
4 years ago
4 years ago
10 years ago
3 years ago
6 years ago
3 years ago
9 years ago
3 years ago
  1. package operation
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "io"
  6. "math/rand/v2"
  7. "mime"
  8. "net/url"
  9. "os"
  10. "path"
  11. "strconv"
  12. "strings"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/security"
  16. )
  17. type FilePart struct {
  18. Reader io.Reader
  19. FileName string
  20. FileSize int64
  21. MimeType string
  22. ModTime int64 //in seconds
  23. Replication string
  24. Collection string
  25. DataCenter string
  26. Ttl string
  27. DiskType string
  28. Server string //this comes from assign result
  29. Fid string //this comes from assign result, but customizable
  30. Fsync bool
  31. }
  32. type SubmitResult struct {
  33. FileName string `json:"fileName,omitempty"`
  34. FileUrl string `json:"url,omitempty"`
  35. Fid string `json:"fid,omitempty"`
  36. Size uint32 `json:"size,omitempty"`
  37. Error string `json:"error,omitempty"`
  38. }
  39. type GetMasterFn func(ctx context.Context) pb.ServerAddress
  40. func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
  41. results := make([]SubmitResult, len(files))
  42. for index, file := range files {
  43. results[index].FileName = file.FileName
  44. }
  45. ar := &VolumeAssignRequest{
  46. Count: uint64(len(files)),
  47. Replication: replication,
  48. Collection: collection,
  49. DataCenter: dataCenter,
  50. Ttl: ttl,
  51. DiskType: diskType,
  52. }
  53. ret, err := Assign(masterFn, grpcDialOption, ar)
  54. if err != nil {
  55. for index := range files {
  56. results[index].Error = err.Error()
  57. }
  58. return results, err
  59. }
  60. for index, file := range files {
  61. file.Fid = ret.Fid
  62. if index > 0 {
  63. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  64. }
  65. file.Server = ret.Url
  66. if usePublicUrl {
  67. file.Server = ret.PublicUrl
  68. }
  69. file.Replication = replication
  70. file.Collection = collection
  71. file.DataCenter = dataCenter
  72. file.Ttl = ttl
  73. file.DiskType = diskType
  74. results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
  75. if err != nil {
  76. results[index].Error = err.Error()
  77. }
  78. results[index].Fid = file.Fid
  79. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  80. }
  81. return results, nil
  82. }
  83. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  84. ret = make([]FilePart, len(fullPathFilenames))
  85. for index, file := range fullPathFilenames {
  86. if ret[index], err = newFilePart(file); err != nil {
  87. return
  88. }
  89. }
  90. return
  91. }
  92. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  93. fh, openErr := os.Open(fullPathFilename)
  94. if openErr != nil {
  95. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  96. return ret, openErr
  97. }
  98. ret.Reader = fh
  99. fi, fiErr := fh.Stat()
  100. if fiErr != nil {
  101. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  102. return ret, fiErr
  103. }
  104. ret.ModTime = fi.ModTime().UTC().Unix()
  105. ret.FileSize = fi.Size()
  106. ext := strings.ToLower(path.Ext(fullPathFilename))
  107. ret.FileName = fi.Name()
  108. if ext != "" {
  109. ret.MimeType = mime.TypeByExtension(ext)
  110. }
  111. return ret, nil
  112. }
  113. func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
  114. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  115. if fi.ModTime != 0 {
  116. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  117. }
  118. if fi.Fsync {
  119. fileUrl += "?fsync=true"
  120. }
  121. if closer, ok := fi.Reader.(io.Closer); ok {
  122. defer closer.Close()
  123. }
  124. baseName := path.Base(fi.FileName)
  125. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  126. chunkSize := int64(maxMB * 1024 * 1024)
  127. chunks := fi.FileSize/chunkSize + 1
  128. cm := ChunkManifest{
  129. Name: baseName,
  130. Size: fi.FileSize,
  131. Mime: fi.MimeType,
  132. Chunks: make([]*ChunkInfo, 0, chunks),
  133. }
  134. var ret *AssignResult
  135. var id string
  136. if fi.DataCenter != "" {
  137. ar := &VolumeAssignRequest{
  138. Count: uint64(chunks),
  139. Replication: fi.Replication,
  140. Collection: fi.Collection,
  141. Ttl: fi.Ttl,
  142. DiskType: fi.DiskType,
  143. }
  144. ret, err = Assign(masterFn, grpcDialOption, ar)
  145. if err != nil {
  146. return
  147. }
  148. }
  149. for i := int64(0); i < chunks; i++ {
  150. if fi.DataCenter == "" {
  151. ar := &VolumeAssignRequest{
  152. Count: 1,
  153. Replication: fi.Replication,
  154. Collection: fi.Collection,
  155. Ttl: fi.Ttl,
  156. DiskType: fi.DiskType,
  157. }
  158. ret, err = Assign(masterFn, grpcDialOption, ar)
  159. if err != nil {
  160. // delete all uploaded chunks
  161. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  162. return
  163. }
  164. id = ret.Fid
  165. } else {
  166. id = ret.Fid
  167. if i > 0 {
  168. id += "_" + strconv.FormatInt(i, 10)
  169. }
  170. }
  171. fileUrl := genFileUrl(ret, id, usePublicUrl)
  172. count, e := uploadOneChunk(
  173. baseName+"-"+strconv.FormatInt(i+1, 10),
  174. io.LimitReader(fi.Reader, chunkSize),
  175. masterFn, fileUrl,
  176. ret.Auth)
  177. if e != nil {
  178. // delete all uploaded chunks
  179. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  180. return 0, e
  181. }
  182. cm.Chunks = append(cm.Chunks,
  183. &ChunkInfo{
  184. Offset: i * chunkSize,
  185. Size: int64(count),
  186. Fid: id,
  187. },
  188. )
  189. retSize += count
  190. }
  191. err = uploadChunkedFileManifest(fileUrl, &cm, jwt)
  192. if err != nil {
  193. // delete all uploaded chunks
  194. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  195. }
  196. } else {
  197. uploadOption := &UploadOption{
  198. UploadUrl: fileUrl,
  199. Filename: baseName,
  200. Cipher: false,
  201. IsInputCompressed: false,
  202. MimeType: fi.MimeType,
  203. PairMap: nil,
  204. Jwt: jwt,
  205. }
  206. uploader, e := NewUploader()
  207. if e != nil {
  208. return 0, e
  209. }
  210. ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
  211. if e != nil {
  212. return 0, e
  213. }
  214. return ret.Size, e
  215. }
  216. return
  217. }
  218. func genFileUrl(ret *AssignResult, id string, usePublicUrl bool) string {
  219. fileUrl := "http://" + ret.Url + "/" + id
  220. if usePublicUrl {
  221. fileUrl = "http://" + ret.PublicUrl + "/" + id
  222. }
  223. for _, replica := range ret.Replicas {
  224. if rand.IntN(len(ret.Replicas)+1) == 0 {
  225. fileUrl = "http://" + replica.Url + "/" + id
  226. if usePublicUrl {
  227. fileUrl = "http://" + replica.PublicUrl + "/" + id
  228. }
  229. }
  230. }
  231. return fileUrl
  232. }
  233. func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
  234. fileUrl string, jwt security.EncodedJwt,
  235. ) (size uint32, e error) {
  236. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  237. uploadOption := &UploadOption{
  238. UploadUrl: fileUrl,
  239. Filename: filename,
  240. Cipher: false,
  241. IsInputCompressed: false,
  242. MimeType: "",
  243. PairMap: nil,
  244. Jwt: jwt,
  245. }
  246. uploader, uploaderError := NewUploader()
  247. if uploaderError != nil {
  248. return 0, uploaderError
  249. }
  250. uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
  251. if uploadError != nil {
  252. return 0, uploadError
  253. }
  254. return uploadResult.Size, nil
  255. }
  256. func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  257. buf, e := manifest.Marshal()
  258. if e != nil {
  259. return e
  260. }
  261. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  262. u, _ := url.Parse(fileUrl)
  263. q := u.Query()
  264. q.Set("cm", "true")
  265. u.RawQuery = q.Encode()
  266. uploadOption := &UploadOption{
  267. UploadUrl: u.String(),
  268. Filename: manifest.Name,
  269. Cipher: false,
  270. IsInputCompressed: false,
  271. MimeType: "application/json",
  272. PairMap: nil,
  273. Jwt: jwt,
  274. }
  275. uploader, e := NewUploader()
  276. if e != nil {
  277. return e
  278. }
  279. _, e = uploader.UploadData(buf, uploadOption)
  280. return e
  281. }