You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
3.6 KiB

7 years ago
7 years ago
7 years ago
  1. package s3api
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/gorilla/mux"
  7. "io/ioutil"
  8. "net/http"
  9. "io"
  10. )
  11. var (
  12. client *http.Client
  13. )
  14. func init() {
  15. client = &http.Client{Transport: &http.Transport{
  16. MaxIdleConnsPerHost: 1024,
  17. }}
  18. }
  19. type UploadResult struct {
  20. Name string `json:"name,omitempty"`
  21. Size uint32 `json:"size,omitempty"`
  22. Error string `json:"error,omitempty"`
  23. }
  24. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  25. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  26. vars := mux.Vars(r)
  27. bucket := vars["bucket"]
  28. object := vars["object"]
  29. _, err := validateContentMd5(r.Header)
  30. if err != nil {
  31. writeErrorResponse(w, ErrInvalidDigest, r.URL)
  32. return
  33. }
  34. uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
  35. s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
  36. proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body)
  37. if err != nil {
  38. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  39. writeErrorResponse(w, ErrInternalError, r.URL)
  40. return
  41. }
  42. proxyReq.Header.Set("Host", s3a.option.Filer)
  43. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  44. for header, values := range r.Header {
  45. for _, value := range values {
  46. proxyReq.Header.Add(header, value)
  47. }
  48. }
  49. resp, postErr := client.Do(proxyReq)
  50. if postErr != nil {
  51. glog.Errorf("post to filer: %v", postErr)
  52. writeErrorResponse(w, ErrInternalError, r.URL)
  53. return
  54. }
  55. defer resp.Body.Close()
  56. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  57. if ra_err != nil {
  58. glog.Errorf("upload to filer response read: %v", ra_err)
  59. writeErrorResponse(w, ErrInternalError, r.URL)
  60. return
  61. }
  62. var ret UploadResult
  63. unmarshal_err := json.Unmarshal(resp_body, &ret)
  64. if unmarshal_err != nil {
  65. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  66. writeErrorResponse(w, ErrInternalError, r.URL)
  67. return
  68. }
  69. if ret.Error != "" {
  70. glog.Errorf("upload to filer error: %v", ret.Error)
  71. writeErrorResponse(w, ErrInternalError, r.URL)
  72. return
  73. }
  74. writeSuccessResponseEmpty(w)
  75. }
  76. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  77. destUrl := fmt.Sprintf("http://%s%s%s",
  78. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  79. s3a.proxyToFiler(w, r, destUrl)
  80. }
  81. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  82. destUrl := fmt.Sprintf("http://%s%s%s",
  83. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  84. s3a.proxyToFiler(w, r, destUrl)
  85. }
  86. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  87. destUrl := fmt.Sprintf("http://%s%s%s",
  88. s3a.option.Filer, s3a.option.BucketsPath, r.RequestURI)
  89. s3a.proxyToFiler(w, r, destUrl)
  90. }
  91. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string) {
  92. glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
  93. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  94. if err != nil {
  95. glog.Errorf("NewRequest %s: %v", destUrl, err)
  96. writeErrorResponse(w, ErrInternalError, r.URL)
  97. return
  98. }
  99. proxyReq.Header.Set("Host", s3a.option.Filer)
  100. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  101. for header, values := range r.Header {
  102. for _, value := range values {
  103. proxyReq.Header.Add(header, value)
  104. }
  105. }
  106. resp, postErr := client.Do(proxyReq)
  107. if postErr != nil {
  108. glog.Errorf("post to filer: %v", postErr)
  109. writeErrorResponse(w, ErrInternalError, r.URL)
  110. return
  111. }
  112. defer resp.Body.Close()
  113. for k, v := range resp.Header {
  114. w.Header()[k] = v
  115. }
  116. w.WriteHeader(resp.StatusCode)
  117. io.Copy(w, resp.Body)
  118. }