You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
4.0 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "io"
  6. "mime"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. )
  12. type FilePart struct {
  13. Reader io.Reader
  14. FileName string
  15. FileSize int64
  16. IsGzipped bool
  17. MimeType string
  18. ModTime int64 //in seconds
  19. Server string //this comes from assign result
  20. Fid string //this comes from assign result, but customizable
  21. }
  22. type SubmitResult struct {
  23. FileName string `json:"fileName"`
  24. FileUrl string `json:"fileUrl"`
  25. Fid string `json:"fid"`
  26. Size int `json:"size"`
  27. Error string `json:"error"`
  28. }
  29. func SubmitFiles(master string, files []FilePart, replication string, maxMB int) ([]SubmitResult, error) {
  30. results := make([]SubmitResult, len(files))
  31. for index, file := range files {
  32. results[index].FileName = file.FileName
  33. }
  34. ret, err := Assign(master, len(files), replication)
  35. if err != nil {
  36. for index, _ := range files {
  37. results[index].Error = err.Error()
  38. }
  39. return results, err
  40. }
  41. for index, file := range files {
  42. file.Fid = ret.Fid
  43. if index > 0 {
  44. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  45. }
  46. file.Server = ret.PublicUrl
  47. results[index].Size, err = file.Upload(maxMB, master, replication)
  48. if err != nil {
  49. results[index].Error = err.Error()
  50. }
  51. results[index].Fid = file.Fid
  52. results[index].FileUrl = file.Server + "/" + file.Fid
  53. }
  54. return results, nil
  55. }
  56. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  57. ret = make([]FilePart, len(fullPathFilenames))
  58. for index, file := range fullPathFilenames {
  59. if ret[index], err = newFilePart(file); err != nil {
  60. return
  61. }
  62. }
  63. return
  64. }
  65. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  66. fh, openErr := os.Open(fullPathFilename)
  67. if openErr != nil {
  68. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  69. return ret, openErr
  70. }
  71. ret.Reader = fh
  72. if fi, fiErr := fh.Stat(); fiErr != nil {
  73. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  74. return ret, fiErr
  75. } else {
  76. ret.ModTime = fi.ModTime().UTC().Unix()
  77. ret.FileSize = fi.Size()
  78. }
  79. ext := strings.ToLower(path.Ext(fullPathFilename))
  80. ret.IsGzipped = ext == ".gz"
  81. if ret.IsGzipped {
  82. ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
  83. }
  84. ret.FileName = fullPathFilename
  85. if ext != "" {
  86. ret.MimeType = mime.TypeByExtension(ext)
  87. }
  88. return ret, nil
  89. }
  90. func (fi FilePart) Upload(maxMB int, master, replication string) (retSize int, err error) {
  91. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  92. if fi.ModTime != 0 {
  93. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  94. }
  95. if closer, ok := fi.Reader.(io.Closer); ok {
  96. defer closer.Close()
  97. }
  98. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  99. chunkSize := int64(maxMB * 1024 * 1024)
  100. chunks := fi.FileSize/chunkSize + 1
  101. fids := make([]string, 0)
  102. for i := int64(0); i < chunks; i++ {
  103. id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, replication)
  104. if e != nil {
  105. return 0, e
  106. }
  107. fids = append(fids, id)
  108. retSize += count
  109. }
  110. err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
  111. } else {
  112. ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
  113. if e != nil {
  114. return 0, e
  115. }
  116. return ret.Size, e
  117. }
  118. return
  119. }
  120. func upload_one_chunk(filename string, reader io.Reader, master, replication string) (fid string, size int, e error) {
  121. ret, err := Assign(master, 1, replication)
  122. if err != nil {
  123. return "", 0, err
  124. }
  125. fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid
  126. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  127. uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
  128. if uploadError != nil {
  129. return fid, 0, uploadError
  130. }
  131. return fid, uploadResult.Size, nil
  132. }
  133. func upload_file_id_list(fileUrl, filename string, fids []string) error {
  134. var buf bytes.Buffer
  135. buf.WriteString(strings.Join(fids, "\n"))
  136. glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
  137. _, e := Upload(fileUrl, filename, &buf, false, "text/plain")
  138. return e
  139. }