Browse Source

fix: filer download pass 429 to S3

pull/7482/head
Konstantin Lebedev 3 weeks ago
parent
commit
9c81ff354e
  1. 29
      weed/s3api/s3api_object_handlers.go
  2. 38
      weed/server/common.go

29
weed/s3api/s3api_object_handlers.go

@ -589,26 +589,26 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
} }
defer util_http.CloseResponse(resp) defer util_http.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
return return
}
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
case http.StatusRequestedRangeNotSatisfiable:
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return return
}
case http.StatusNotFound:
if r.Method == http.MethodDelete { if r.Method == http.MethodDelete {
if resp.StatusCode == http.StatusNotFound {
// this is normal
responseStatusCode, _ := responseFn(resp, w) responseStatusCode, _ := responseFn(resp, w)
s3err.PostLog(r, responseStatusCode, s3err.ErrNone) s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
return
}
}
if resp.StatusCode == http.StatusNotFound {
} else {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
}
return
case http.StatusTooManyRequests:
s3err.WriteErrorResponse(w, r, s3err.ErrRequestBytesExceed)
return
case http.StatusInternalServerError:
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return return
} }
@ -619,11 +619,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
return return
} }
if resp.StatusCode == http.StatusInternalServerError {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// when HEAD a directory, it should be reported as no such key // when HEAD a directory, it should be reported as no such key
// https://github.com/seaweedfs/seaweedfs/issues/3457 // https://github.com/seaweedfs/seaweedfs/issues/3457
if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified { if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {

38
weed/server/common.go

@ -19,12 +19,12 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util/request_id" "github.com/seaweedfs/seaweedfs/weed/util/request_id"
"github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/util/version"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -297,6 +297,18 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
} }
} }
func ProcessHttpError(w http.ResponseWriter, err error) error {
glog.Errorf("ProcessRangeRequest headers: %+v err: %v", w.Header(), err)
w.Header().Del("Content-Length")
switch {
case strings.Contains(err.Error(), "Too Many Requests"):
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
default:
http.Error(w, "Internal Error", http.StatusInternalServerError)
}
return err
}
func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error { func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error {
rangeReq := r.Header.Get("Range") rangeReq := r.Header.Get("Range")
bufferedWriter := writePool.Get().(*bufio.Writer) bufferedWriter := writePool.Get().(*bufio.Writer)
@ -310,16 +322,10 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
writeFn, err := prepareWriteFn(0, totalSize) writeFn, err := prepareWriteFn(0, totalSize)
if err != nil { if err != nil {
glog.Errorf("ProcessRangeRequest: %v", err)
w.Header().Del("Content-Length")
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("ProcessRangeRequest: %w", err)
return ProcessHttpError(w, err)
} }
if err = writeFn(bufferedWriter); err != nil { if err = writeFn(bufferedWriter); err != nil {
glog.Errorf("ProcessRangeRequest: %v", err)
w.Header().Del("Content-Length")
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("ProcessRangeRequest: %w", err)
return ProcessHttpError(w, err)
} }
return nil return nil
} }
@ -330,7 +336,7 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if err != nil { if err != nil {
glog.Errorf("ProcessRangeRequest headers: %+v err: %v", w.Header(), err) glog.Errorf("ProcessRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("ProcessRangeRequest header: %w", err)
return fmt.Errorf("ProcessRangeRequest header: %v", err)
} }
if sumRangesSize(ranges) > totalSize { if sumRangesSize(ranges) > totalSize {
// The total number of bytes in all the ranges // The total number of bytes in all the ranges
@ -360,14 +366,12 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
writeFn, err := prepareWriteFn(ra.start, ra.length) writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil { if err != nil {
glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err)
w.Header().Del("Content-Length")
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("ProcessRangeRequest: %w", err)
return ProcessHttpError(w, fmt.Errorf("range[0]: %v", err))
} }
w.WriteHeader(http.StatusPartialContent) w.WriteHeader(http.StatusPartialContent)
err = writeFn(bufferedWriter) err = writeFn(bufferedWriter)
if err != nil { if err != nil {
return ProcessHttpError(w, fmt.Errorf("range[0]: %v", err))
glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err) glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err)
// Cannot call http.Error() here because WriteHeader was already called // Cannot call http.Error() here because WriteHeader was already called
return fmt.Errorf("ProcessRangeRequest range[0]: %w", err) return fmt.Errorf("ProcessRangeRequest range[0]: %w", err)
@ -381,13 +385,11 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
for i, ra := range ranges { for i, ra := range ranges {
if ra.start > totalSize { if ra.start > totalSize {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("out of range: %w", err)
return fmt.Errorf("out of range: %v", err)
} }
writeFn, err := prepareWriteFn(ra.start, ra.length) writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil { if err != nil {
glog.Errorf("ProcessRangeRequest range[%d] err: %v", i, err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return fmt.Errorf("ProcessRangeRequest range[%d] err: %v", i, err)
return ProcessHttpError(w, fmt.Errorf("range[%d] err: %v", i, err))
} }
writeFnByRange[i] = writeFn writeFnByRange[i] = writeFn
} }

Loading…
Cancel
Save