You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.3 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "io"
  6. "mime"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. )
  12. type FilePart struct {
  13. Reader io.Reader
  14. FileName string
  15. FileSize int64
  16. IsGzipped bool
  17. MimeType string
  18. ModTime int64 //in seconds
  19. Replication string
  20. Collection string
  21. Ttl string
  22. Server string //this comes from assign result
  23. Fid string //this comes from assign result, but customizable
  24. }
  25. type SubmitResult struct {
  26. FileName string `json:"fileName,omitempty"`
  27. FileUrl string `json:"fileUrl,omitempty"`
  28. Fid string `json:"fid,omitempty"`
  29. Size uint32 `json:"size,omitempty"`
  30. Error string `json:"error,omitempty"`
  31. }
  32. func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
  33. results := make([]SubmitResult, len(files))
  34. for index, file := range files {
  35. results[index].FileName = file.FileName
  36. }
  37. ret, err := Assign(master, len(files), replication, collection, ttl)
  38. if err != nil {
  39. for index, _ := range files {
  40. results[index].Error = err.Error()
  41. }
  42. return results, err
  43. }
  44. for index, file := range files {
  45. file.Fid = ret.Fid
  46. if index > 0 {
  47. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  48. }
  49. file.Server = ret.PublicUrl
  50. file.Replication = replication
  51. file.Collection = collection
  52. results[index].Size, err = file.Upload(maxMB, master)
  53. if err != nil {
  54. results[index].Error = err.Error()
  55. }
  56. results[index].Fid = file.Fid
  57. results[index].FileUrl = file.Server + "/" + file.Fid
  58. }
  59. return results, nil
  60. }
  61. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  62. ret = make([]FilePart, len(fullPathFilenames))
  63. for index, file := range fullPathFilenames {
  64. if ret[index], err = newFilePart(file); err != nil {
  65. return
  66. }
  67. }
  68. return
  69. }
  70. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  71. fh, openErr := os.Open(fullPathFilename)
  72. if openErr != nil {
  73. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  74. return ret, openErr
  75. }
  76. ret.Reader = fh
  77. if fi, fiErr := fh.Stat(); fiErr != nil {
  78. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  79. return ret, fiErr
  80. } else {
  81. ret.ModTime = fi.ModTime().UTC().Unix()
  82. ret.FileSize = fi.Size()
  83. }
  84. ext := strings.ToLower(path.Ext(fullPathFilename))
  85. ret.IsGzipped = ext == ".gz"
  86. if ret.IsGzipped {
  87. ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
  88. }
  89. ret.FileName = fullPathFilename
  90. if ext != "" {
  91. ret.MimeType = mime.TypeByExtension(ext)
  92. }
  93. return ret, nil
  94. }
  95. func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) {
  96. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  97. if fi.ModTime != 0 {
  98. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  99. }
  100. if closer, ok := fi.Reader.(io.Closer); ok {
  101. defer closer.Close()
  102. }
  103. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  104. chunkSize := int64(maxMB * 1024 * 1024)
  105. chunks := fi.FileSize/chunkSize + 1
  106. fids := make([]string, 0)
  107. for i := int64(0); i < chunks; i++ {
  108. id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
  109. if e != nil {
  110. return 0, e
  111. }
  112. fids = append(fids, id)
  113. retSize += count
  114. }
  115. err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
  116. } else {
  117. ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
  118. if e != nil {
  119. return 0, e
  120. }
  121. return ret.Size, e
  122. }
  123. return
  124. }
  125. func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
  126. ret, err := Assign(master, 1, replication, collection, ttl)
  127. if err != nil {
  128. return "", 0, err
  129. }
  130. fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid
  131. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  132. uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
  133. if uploadError != nil {
  134. return fid, 0, uploadError
  135. }
  136. return fid, uploadResult.Size, nil
  137. }
  138. func upload_file_id_list(fileUrl, filename string, fids []string) error {
  139. var buf bytes.Buffer
  140. buf.WriteString(strings.Join(fids, "\n"))
  141. glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
  142. _, e := Upload(fileUrl, filename, &buf, false, "text/plain")
  143. return e
  144. }