You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
5.5 KiB

6 years ago
7 years ago
7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package s3api
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "strings"
  10. "github.com/gorilla/mux"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/server"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. client *http.Client
  17. )
  18. func init() {
  19. client = &http.Client{Transport: &http.Transport{
  20. MaxIdleConnsPerHost: 1024,
  21. }}
  22. }
  23. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  24. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  25. vars := mux.Vars(r)
  26. bucket := vars["bucket"]
  27. object := getObject(vars)
  28. _, err := validateContentMd5(r.Header)
  29. if err != nil {
  30. writeErrorResponse(w, ErrInvalidDigest, r.URL)
  31. return
  32. }
  33. rAuthType := getRequestAuthType(r)
  34. dataReader := r.Body
  35. var s3ErrCode ErrorCode
  36. if rAuthType == authTypeStreamingSigned {
  37. dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  38. }
  39. if s3ErrCode != ErrNone {
  40. writeErrorResponse(w, s3ErrCode, r.URL)
  41. return
  42. }
  43. defer dataReader.Close()
  44. uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
  45. s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
  46. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  47. if errCode != ErrNone {
  48. writeErrorResponse(w, errCode, r.URL)
  49. return
  50. }
  51. setEtag(w, etag)
  52. writeSuccessResponseEmpty(w)
  53. }
  54. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  55. vars := mux.Vars(r)
  56. bucket := vars["bucket"]
  57. object := getObject(vars)
  58. if strings.HasSuffix(r.URL.Path, "/") {
  59. writeErrorResponse(w, ErrNotImplemented, r.URL)
  60. return
  61. }
  62. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  63. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  64. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  65. }
  66. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  67. vars := mux.Vars(r)
  68. bucket := vars["bucket"]
  69. object := getObject(vars)
  70. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  71. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  72. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  73. }
  74. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  75. vars := mux.Vars(r)
  76. bucket := vars["bucket"]
  77. object := getObject(vars)
  78. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  79. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  80. s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) {
  81. for k, v := range proxyResonse.Header {
  82. w.Header()[k] = v
  83. }
  84. w.WriteHeader(http.StatusNoContent)
  85. })
  86. }
  87. // DeleteMultipleObjectsHandler - Delete multiple objects
  88. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  89. // TODO
  90. writeErrorResponse(w, ErrNotImplemented, r.URL)
  91. }
  92. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
  93. glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
  94. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  95. if err != nil {
  96. glog.Errorf("NewRequest %s: %v", destUrl, err)
  97. writeErrorResponse(w, ErrInternalError, r.URL)
  98. return
  99. }
  100. proxyReq.Header.Set("Host", s3a.option.Filer)
  101. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  102. proxyReq.Header.Set("Etag-MD5", "True")
  103. for header, values := range r.Header {
  104. for _, value := range values {
  105. proxyReq.Header.Add(header, value)
  106. }
  107. }
  108. resp, postErr := client.Do(proxyReq)
  109. if postErr != nil {
  110. glog.Errorf("post to filer: %v", postErr)
  111. writeErrorResponse(w, ErrInternalError, r.URL)
  112. return
  113. }
  114. defer util.CloseResponse(resp)
  115. responseFn(resp, w)
  116. }
  117. func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
  118. for k, v := range proxyResonse.Header {
  119. w.Header()[k] = v
  120. }
  121. w.WriteHeader(proxyResonse.StatusCode)
  122. io.Copy(w, proxyResonse.Body)
  123. }
  124. func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code ErrorCode) {
  125. hash := md5.New()
  126. var body io.Reader = io.TeeReader(dataReader, hash)
  127. proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
  128. if err != nil {
  129. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  130. return "", ErrInternalError
  131. }
  132. proxyReq.Header.Set("Host", s3a.option.Filer)
  133. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  134. for header, values := range r.Header {
  135. for _, value := range values {
  136. proxyReq.Header.Add(header, value)
  137. }
  138. }
  139. resp, postErr := client.Do(proxyReq)
  140. if postErr != nil {
  141. glog.Errorf("post to filer: %v", postErr)
  142. return "", ErrInternalError
  143. }
  144. defer resp.Body.Close()
  145. etag = fmt.Sprintf("%x", hash.Sum(nil))
  146. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  147. if ra_err != nil {
  148. glog.Errorf("upload to filer response read: %v", ra_err)
  149. return etag, ErrInternalError
  150. }
  151. var ret weed_server.FilerPostResult
  152. unmarshal_err := json.Unmarshal(resp_body, &ret)
  153. if unmarshal_err != nil {
  154. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  155. return "", ErrInternalError
  156. }
  157. if ret.Error != "" {
  158. glog.Errorf("upload to filer error: %v", ret.Error)
  159. return "", ErrInternalError
  160. }
  161. return etag, ErrNone
  162. }
  163. func setEtag(w http.ResponseWriter, etag string) {
  164. if etag != "" {
  165. if strings.HasPrefix(etag, "\"") {
  166. w.Header().Set("ETag", etag)
  167. } else {
  168. w.Header().Set("ETag", "\""+etag+"\"")
  169. }
  170. }
  171. }
  172. func getObject(vars map[string]string) string {
  173. object := vars["object"]
  174. if !strings.HasPrefix(object, "/") {
  175. object = "/" + object
  176. }
  177. return object
  178. }