You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
8.1 KiB

6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
12 years ago
  1. package operation
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "mime"
  9. "mime/multipart"
  10. "net/http"
  11. "net/textproto"
  12. "path/filepath"
  13. "strings"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/security"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. type UploadResult struct {
  21. Name string `json:"name,omitempty"`
  22. Size uint32 `json:"size,omitempty"`
  23. Error string `json:"error,omitempty"`
  24. ETag string `json:"eTag,omitempty"`
  25. CipherKey []byte `json:"cipherKey,omitempty"`
  26. Mime string `json:"mime,omitempty"`
  27. Gzip uint32 `json:"gzip,omitempty"`
  28. ContentMd5 string `json:"contentMd5,omitempty"`
  29. }
  30. func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
  31. fid, _ := filer_pb.ToFileIdObject(fileId)
  32. return &filer_pb.FileChunk{
  33. FileId: fileId,
  34. Offset: offset,
  35. Size: uint64(uploadResult.Size),
  36. Mtime: time.Now().UnixNano(),
  37. ETag: uploadResult.ETag,
  38. CipherKey: uploadResult.CipherKey,
  39. IsCompressed: uploadResult.Gzip > 0,
  40. Fid: fid,
  41. }
  42. }
  43. // HTTPClient interface for testing
  44. type HTTPClient interface {
  45. Do(req *http.Request) (*http.Response, error)
  46. }
  47. var (
  48. HttpClient HTTPClient
  49. )
  50. func init() {
  51. HttpClient = &http.Client{Transport: &http.Transport{
  52. MaxIdleConnsPerHost: 1024,
  53. }}
  54. }
  55. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  56. // Upload sends a POST request to a volume server to upload the content with adjustable compression level
  57. func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  58. uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  59. return
  60. }
  61. // Upload sends a POST request to a volume server to upload the content with fast compression
  62. func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
  63. uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
  64. return
  65. }
  66. func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
  67. data, err = ioutil.ReadAll(reader)
  68. if err != nil {
  69. err = fmt.Errorf("read input: %v", err)
  70. return
  71. }
  72. uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  73. return uploadResult, uploadErr, data
  74. }
  75. func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  76. for i := 0; i < 3; i++ {
  77. uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  78. if err == nil {
  79. return
  80. } else {
  81. glog.Warningf("uploading to %s: %v", uploadUrl, err)
  82. }
  83. }
  84. return
  85. }
  86. func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  87. contentIsGzipped := isInputCompressed
  88. shouldGzipNow := false
  89. if !isInputCompressed {
  90. if mtype == "" {
  91. mtype = http.DetectContentType(data)
  92. // println("detect1 mimetype to", mtype)
  93. if mtype == "application/octet-stream" {
  94. mtype = ""
  95. }
  96. }
  97. if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
  98. shouldGzipNow = true
  99. } else if !iAmSure && mtype == "" && len(data) > 128 {
  100. var compressed []byte
  101. compressed, err = util.GzipData(data[0:128])
  102. shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
  103. }
  104. }
  105. var clearDataLen int
  106. // gzip if possible
  107. // this could be double copying
  108. clearDataLen = len(data)
  109. if shouldGzipNow {
  110. compressed, compressErr := util.GzipData(data)
  111. // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
  112. if compressErr == nil {
  113. data = compressed
  114. contentIsGzipped = true
  115. }
  116. } else if isInputCompressed {
  117. // just to get the clear data length
  118. clearData, err := util.DecompressData(data)
  119. if err == nil {
  120. clearDataLen = len(clearData)
  121. }
  122. }
  123. if cipher {
  124. // encrypt(gzip(data))
  125. // encrypt
  126. cipherKey := util.GenCipherKey()
  127. encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
  128. if encryptionErr != nil {
  129. err = fmt.Errorf("encrypt input: %v", encryptionErr)
  130. return
  131. }
  132. // upload data
  133. uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
  134. _, err = w.Write(encryptedData)
  135. return
  136. }, "", false, len(encryptedData), "", nil, jwt)
  137. if uploadResult != nil {
  138. uploadResult.Name = filename
  139. uploadResult.Mime = mtype
  140. uploadResult.CipherKey = cipherKey
  141. }
  142. } else {
  143. // upload data
  144. uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
  145. _, err = w.Write(data)
  146. return
  147. }, filename, contentIsGzipped, 0, mtype, pairMap, jwt)
  148. }
  149. if uploadResult == nil {
  150. return
  151. }
  152. uploadResult.Size = uint32(clearDataLen)
  153. if contentIsGzipped {
  154. uploadResult.Gzip = 1
  155. }
  156. return uploadResult, err
  157. }
  158. func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
  159. body_buf := bytes.NewBufferString("")
  160. body_writer := multipart.NewWriter(body_buf)
  161. h := make(textproto.MIMEHeader)
  162. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
  163. if mtype == "" {
  164. mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
  165. }
  166. if mtype != "" {
  167. h.Set("Content-Type", mtype)
  168. }
  169. if isGzipped {
  170. h.Set("Content-Encoding", "gzip")
  171. }
  172. file_writer, cp_err := body_writer.CreatePart(h)
  173. if cp_err != nil {
  174. glog.V(0).Infoln("error creating form file", cp_err.Error())
  175. return nil, cp_err
  176. }
  177. if err := fillBufferFunction(file_writer); err != nil {
  178. glog.V(0).Infoln("error copying data", err)
  179. return nil, err
  180. }
  181. content_type := body_writer.FormDataContentType()
  182. if err := body_writer.Close(); err != nil {
  183. glog.V(0).Infoln("error closing body", err)
  184. return nil, err
  185. }
  186. req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
  187. if postErr != nil {
  188. glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr)
  189. return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr)
  190. }
  191. req.Header.Set("Content-Type", content_type)
  192. for k, v := range pairMap {
  193. req.Header.Set(k, v)
  194. }
  195. if jwt != "" {
  196. req.Header.Set("Authorization", "BEARER "+string(jwt))
  197. }
  198. resp, post_err := HttpClient.Do(req)
  199. if post_err != nil {
  200. glog.Errorf("upload to %v: %v", uploadUrl, post_err)
  201. return nil, fmt.Errorf("upload to %v: %v", uploadUrl, post_err)
  202. }
  203. defer util.CloseResponse(resp)
  204. var ret UploadResult
  205. etag := getEtag(resp)
  206. if resp.StatusCode == http.StatusNoContent {
  207. ret.ETag = etag
  208. return &ret, nil
  209. }
  210. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  211. if ra_err != nil {
  212. return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err)
  213. }
  214. unmarshal_err := json.Unmarshal(resp_body, &ret)
  215. if unmarshal_err != nil {
  216. glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body))
  217. return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err)
  218. }
  219. if ret.Error != "" {
  220. return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error)
  221. }
  222. ret.ETag = etag
  223. ret.ContentMd5 = resp.Header.Get("Content-MD5")
  224. return &ret, nil
  225. }
  226. func getEtag(r *http.Response) (etag string) {
  227. etag = r.Header.Get("ETag")
  228. if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
  229. etag = etag[1 : len(etag)-1]
  230. }
  231. return
  232. }