You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.2 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "io"
  6. "mime"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. )
  12. type FilePart struct {
  13. Reader io.Reader
  14. FileName string
  15. FileSize int64
  16. IsGzipped bool
  17. MimeType string
  18. ModTime int64 //in seconds
  19. Replication string
  20. Collection string
  21. Server string //this comes from assign result
  22. Fid string //this comes from assign result, but customizable
  23. }
  24. type SubmitResult struct {
  25. FileName string `json:"fileName,omitempty"`
  26. FileUrl string `json:"fileUrl,omitempty"`
  27. Fid string `json:"fid,omitempty"`
  28. Size uint32 `json:"size,omitempty"`
  29. Error string `json:"error,omitempty"`
  30. }
  31. func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) {
  32. results := make([]SubmitResult, len(files))
  33. for index, file := range files {
  34. results[index].FileName = file.FileName
  35. }
  36. ret, err := Assign(master, len(files), replication, collection)
  37. if err != nil {
  38. for index, _ := range files {
  39. results[index].Error = err.Error()
  40. }
  41. return results, err
  42. }
  43. for index, file := range files {
  44. file.Fid = ret.Fid
  45. if index > 0 {
  46. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  47. }
  48. file.Server = ret.PublicUrl
  49. file.Replication = replication
  50. file.Collection = collection
  51. results[index].Size, err = file.Upload(maxMB, master)
  52. if err != nil {
  53. results[index].Error = err.Error()
  54. }
  55. results[index].Fid = file.Fid
  56. results[index].FileUrl = file.Server + "/" + file.Fid
  57. }
  58. return results, nil
  59. }
  60. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  61. ret = make([]FilePart, len(fullPathFilenames))
  62. for index, file := range fullPathFilenames {
  63. if ret[index], err = newFilePart(file); err != nil {
  64. return
  65. }
  66. }
  67. return
  68. }
  69. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  70. fh, openErr := os.Open(fullPathFilename)
  71. if openErr != nil {
  72. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  73. return ret, openErr
  74. }
  75. ret.Reader = fh
  76. if fi, fiErr := fh.Stat(); fiErr != nil {
  77. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  78. return ret, fiErr
  79. } else {
  80. ret.ModTime = fi.ModTime().UTC().Unix()
  81. ret.FileSize = fi.Size()
  82. }
  83. ext := strings.ToLower(path.Ext(fullPathFilename))
  84. ret.IsGzipped = ext == ".gz"
  85. if ret.IsGzipped {
  86. ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
  87. }
  88. ret.FileName = fullPathFilename
  89. if ext != "" {
  90. ret.MimeType = mime.TypeByExtension(ext)
  91. }
  92. return ret, nil
  93. }
  94. func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) {
  95. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  96. if fi.ModTime != 0 {
  97. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  98. }
  99. if closer, ok := fi.Reader.(io.Closer); ok {
  100. defer closer.Close()
  101. }
  102. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  103. chunkSize := int64(maxMB * 1024 * 1024)
  104. chunks := fi.FileSize/chunkSize + 1
  105. fids := make([]string, 0)
  106. for i := int64(0); i < chunks; i++ {
  107. id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection)
  108. if e != nil {
  109. return 0, e
  110. }
  111. fids = append(fids, id)
  112. retSize += count
  113. }
  114. err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
  115. } else {
  116. ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
  117. if e != nil {
  118. return 0, e
  119. }
  120. return ret.Size, e
  121. }
  122. return
  123. }
  124. func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) {
  125. ret, err := Assign(master, 1, replication, collection)
  126. if err != nil {
  127. return "", 0, err
  128. }
  129. fileUrl, fid := "http://"+ret.PublicUrl+"/"+ret.Fid, ret.Fid
  130. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  131. uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
  132. if uploadError != nil {
  133. return fid, 0, uploadError
  134. }
  135. return fid, uploadResult.Size, nil
  136. }
  137. func upload_file_id_list(fileUrl, filename string, fids []string) error {
  138. var buf bytes.Buffer
  139. buf.WriteString(strings.Join(fids, "\n"))
  140. glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
  141. _, e := Upload(fileUrl, filename, &buf, false, "text/plain")
  142. return e
  143. }