You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.0 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package s3api
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/gorilla/mux"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. )
  11. var (
  12. client *http.Client
  13. )
  14. func init() {
  15. client = &http.Client{Transport: &http.Transport{
  16. MaxIdleConnsPerHost: 1024,
  17. }}
  18. }
  19. type UploadResult struct {
  20. Name string `json:"name,omitempty"`
  21. Size uint32 `json:"size,omitempty"`
  22. Error string `json:"error,omitempty"`
  23. }
  24. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  25. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  26. vars := mux.Vars(r)
  27. bucket := vars["bucket"]
  28. object := vars["object"]
  29. _, err := validateContentMd5(r.Header)
  30. if err != nil {
  31. writeErrorResponse(w, ErrInvalidDigest, r.URL)
  32. return
  33. }
  34. uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
  35. s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
  36. proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body)
  37. if err != nil {
  38. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  39. writeErrorResponse(w, ErrInternalError, r.URL)
  40. return
  41. }
  42. proxyReq.Header.Set("Host", s3a.option.Filer)
  43. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  44. for header, values := range r.Header {
  45. for _, value := range values {
  46. proxyReq.Header.Add(header, value)
  47. }
  48. }
  49. resp, postErr := client.Do(proxyReq)
  50. if postErr != nil {
  51. glog.Errorf("post to filer: %v", postErr)
  52. writeErrorResponse(w, ErrInternalError, r.URL)
  53. return
  54. }
  55. defer resp.Body.Close()
  56. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  57. if ra_err != nil {
  58. glog.Errorf("upload to filer response read: %v", ra_err)
  59. writeErrorResponse(w, ErrInternalError, r.URL)
  60. return
  61. }
  62. var ret UploadResult
  63. unmarshal_err := json.Unmarshal(resp_body, &ret)
  64. if unmarshal_err != nil {
  65. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  66. writeErrorResponse(w, ErrInternalError, r.URL)
  67. return
  68. }
  69. if ret.Error != "" {
  70. glog.Errorf("upload to filer error: %v", ret.Error)
  71. writeErrorResponse(w, ErrInternalError, r.URL)
  72. return
  73. }
  74. writeSuccessResponseEmpty(w)
  75. }
  76. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  77. destUrl := fmt.Sprintf("http://%s%s%s",
  78. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  79. s3a.proxyToFiler(w, r, destUrl, passThroghResponse)
  80. }
  81. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  82. destUrl := fmt.Sprintf("http://%s%s%s",
  83. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  84. s3a.proxyToFiler(w, r, destUrl, passThroghResponse)
  85. }
  86. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  87. destUrl := fmt.Sprintf("http://%s%s%s",
  88. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  89. s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) {
  90. for k, v := range proxyResonse.Header {
  91. w.Header()[k] = v
  92. }
  93. w.WriteHeader(http.StatusNoContent)
  94. })
  95. }
  96. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
  97. glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
  98. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  99. if err != nil {
  100. glog.Errorf("NewRequest %s: %v", destUrl, err)
  101. writeErrorResponse(w, ErrInternalError, r.URL)
  102. return
  103. }
  104. proxyReq.Header.Set("Host", s3a.option.Filer)
  105. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  106. for header, values := range r.Header {
  107. for _, value := range values {
  108. proxyReq.Header.Add(header, value)
  109. }
  110. }
  111. resp, postErr := client.Do(proxyReq)
  112. if postErr != nil {
  113. glog.Errorf("post to filer: %v", postErr)
  114. writeErrorResponse(w, ErrInternalError, r.URL)
  115. return
  116. }
  117. defer resp.Body.Close()
  118. responseFn(resp, w)
  119. }
  120. func passThroghResponse(proxyResonse *http.Response, w http.ResponseWriter) {
  121. for k, v := range proxyResonse.Header {
  122. w.Header()[k] = v
  123. }
  124. w.WriteHeader(proxyResonse.StatusCode)
  125. io.Copy(w, proxyResonse.Body)
  126. }