You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
5.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
  1. package s3api
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "strings"
  10. "github.com/gorilla/mux"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/server"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. client *http.Client
  17. )
  18. func init() {
  19. client = &http.Client{Transport: &http.Transport{
  20. MaxIdleConnsPerHost: 1024,
  21. }}
  22. }
  23. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  24. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  25. vars := mux.Vars(r)
  26. bucket := vars["bucket"]
  27. object := getObject(vars)
  28. _, err := validateContentMd5(r.Header)
  29. if err != nil {
  30. writeErrorResponse(w, ErrInvalidDigest, r.URL)
  31. return
  32. }
  33. rAuthType := getRequestAuthType(r)
  34. dataReader := r.Body
  35. var s3ErrCode ErrorCode
  36. if rAuthType == authTypeStreamingSigned {
  37. dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  38. }
  39. if s3ErrCode != ErrNone {
  40. writeErrorResponse(w, s3ErrCode, r.URL)
  41. return
  42. }
  43. defer dataReader.Close()
  44. uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  45. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  46. if errCode != ErrNone {
  47. writeErrorResponse(w, errCode, r.URL)
  48. return
  49. }
  50. setEtag(w, etag)
  51. writeSuccessResponseEmpty(w)
  52. }
  53. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  54. vars := mux.Vars(r)
  55. bucket := vars["bucket"]
  56. object := getObject(vars)
  57. if strings.HasSuffix(r.URL.Path, "/") {
  58. writeErrorResponse(w, ErrNotImplemented, r.URL)
  59. return
  60. }
  61. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  62. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  63. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  64. }
  65. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  66. vars := mux.Vars(r)
  67. bucket := vars["bucket"]
  68. object := getObject(vars)
  69. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  70. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  71. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  72. }
  73. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  74. vars := mux.Vars(r)
  75. bucket := vars["bucket"]
  76. object := getObject(vars)
  77. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  78. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  79. s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) {
  80. for k, v := range proxyResonse.Header {
  81. w.Header()[k] = v
  82. }
  83. w.WriteHeader(http.StatusNoContent)
  84. })
  85. }
  86. // DeleteMultipleObjectsHandler - Delete multiple objects
  87. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  88. // TODO
  89. writeErrorResponse(w, ErrNotImplemented, r.URL)
  90. }
  91. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
  92. glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
  93. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  94. if err != nil {
  95. glog.Errorf("NewRequest %s: %v", destUrl, err)
  96. writeErrorResponse(w, ErrInternalError, r.URL)
  97. return
  98. }
  99. proxyReq.Header.Set("Host", s3a.option.Filer)
  100. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  101. proxyReq.Header.Set("Etag-MD5", "True")
  102. for header, values := range r.Header {
  103. for _, value := range values {
  104. proxyReq.Header.Add(header, value)
  105. }
  106. }
  107. resp, postErr := client.Do(proxyReq)
  108. if postErr != nil {
  109. glog.Errorf("post to filer: %v", postErr)
  110. writeErrorResponse(w, ErrInternalError, r.URL)
  111. return
  112. }
  113. defer util.CloseResponse(resp)
  114. responseFn(resp, w)
  115. }
  116. func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
  117. for k, v := range proxyResonse.Header {
  118. w.Header()[k] = v
  119. }
  120. w.WriteHeader(proxyResonse.StatusCode)
  121. io.Copy(w, proxyResonse.Body)
  122. }
  123. func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code ErrorCode) {
  124. hash := md5.New()
  125. var body = io.TeeReader(dataReader, hash)
  126. proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
  127. if err != nil {
  128. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  129. return "", ErrInternalError
  130. }
  131. proxyReq.Header.Set("Host", s3a.option.Filer)
  132. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  133. for header, values := range r.Header {
  134. for _, value := range values {
  135. proxyReq.Header.Add(header, value)
  136. }
  137. }
  138. resp, postErr := client.Do(proxyReq)
  139. if postErr != nil {
  140. glog.Errorf("post to filer: %v", postErr)
  141. return "", ErrInternalError
  142. }
  143. defer resp.Body.Close()
  144. etag = fmt.Sprintf("%x", hash.Sum(nil))
  145. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  146. if ra_err != nil {
  147. glog.Errorf("upload to filer response read: %v", ra_err)
  148. return etag, ErrInternalError
  149. }
  150. var ret weed_server.FilerPostResult
  151. unmarshal_err := json.Unmarshal(resp_body, &ret)
  152. if unmarshal_err != nil {
  153. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  154. return "", ErrInternalError
  155. }
  156. if ret.Error != "" {
  157. glog.Errorf("upload to filer error: %v", ret.Error)
  158. return "", ErrInternalError
  159. }
  160. return etag, ErrNone
  161. }
  162. func setEtag(w http.ResponseWriter, etag string) {
  163. if etag != "" {
  164. if strings.HasPrefix(etag, "\"") {
  165. w.Header().Set("ETag", etag)
  166. } else {
  167. w.Header().Set("ETag", "\""+etag+"\"")
  168. }
  169. }
  170. }
  171. func getObject(vars map[string]string) string {
  172. object := vars["object"]
  173. if !strings.HasPrefix(object, "/") {
  174. object = "/" + object
  175. }
  176. return object
  177. }