From 88f1d32cc4a09d399966d7fa1039c1a714758e43 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 2 Sep 2018 14:20:47 -0700 Subject: [PATCH] add s3ChunkedReader fix https://github.com/chrislusf/seaweedfs/issues/718 --- weed/s3api/chunked_reader_v4.go | 276 ++++++++++++++++++++++++++++ weed/s3api/s3api_auth.go | 90 +++++++++ weed/s3api/s3api_object_handlers.go | 8 +- 3 files changed, 373 insertions(+), 1 deletion(-) create mode 100644 weed/s3api/chunked_reader_v4.go create mode 100644 weed/s3api/s3api_auth.go diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go new file mode 100644 index 000000000..35a97dffe --- /dev/null +++ b/weed/s3api/chunked_reader_v4.go @@ -0,0 +1,276 @@ +package s3api + +// the related code is copied and modified from minio source code + +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "bufio" + "bytes" + "errors" + "io" + "net/http" + "github.com/dustin/go-humanize" +) + +// Streaming AWS Signature Version '4' constants. +const ( + streamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" +) + +const maxLineLength = 4 * humanize.KiByte // assumed <= bufio.defaultBufSize 4KiB + +// lineTooLong is generated as chunk header is bigger than 4KiB. +var errLineTooLong = errors.New("header line too long") + +// Malformed encoding is generated when chunk header is wrongly formed. +var errMalformedEncoding = errors.New("malformed chunked encoding") + +// newSignV4ChunkedReader returns a new s3ChunkedReader that translates the data read from r +// out of HTTP "chunked" format before returning it. +// The s3ChunkedReader returns io.EOF when the final 0-length chunk is read. +func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser) { + return &s3ChunkedReader{ + reader: bufio.NewReader(req.Body), + state: readChunkHeader, + } +} + +// Represents the overall state that is required for decoding a +// AWS Signature V4 chunked reader. +type s3ChunkedReader struct { + reader *bufio.Reader + state chunkState + lastChunk bool + chunkSignature string + n uint64 // Unread bytes in chunk + err error +} + +// Read chunk reads the chunk token signature portion. +func (cr *s3ChunkedReader) readS3ChunkHeader() { + // Read the first chunk line until CRLF. + var hexChunkSize, hexChunkSignature []byte + hexChunkSize, hexChunkSignature, cr.err = readChunkLine(cr.reader) + if cr.err != nil { + return + } + // ;token=value - converts the hex into its uint64 form. + cr.n, cr.err = parseHexUint(hexChunkSize) + if cr.err != nil { + return + } + if cr.n == 0 { + cr.err = io.EOF + } + // Save the incoming chunk signature. + cr.chunkSignature = string(hexChunkSignature) +} + +type chunkState int + +const ( + readChunkHeader chunkState = iota + readChunkTrailer + readChunk + verifyChunk + eofChunk +) + +func (cs chunkState) String() string { + stateString := "" + switch cs { + case readChunkHeader: + stateString = "readChunkHeader" + case readChunkTrailer: + stateString = "readChunkTrailer" + case readChunk: + stateString = "readChunk" + case verifyChunk: + stateString = "verifyChunk" + case eofChunk: + stateString = "eofChunk" + + } + return stateString +} + +func (cr *s3ChunkedReader) Close() (err error) { + return nil +} + +// Read - implements `io.Reader`, which transparently decodes +// the incoming AWS Signature V4 streaming signature. +func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { + for { + switch cr.state { + case readChunkHeader: + cr.readS3ChunkHeader() + // If we're at the end of a chunk. + if cr.n == 0 && cr.err == io.EOF { + cr.state = readChunkTrailer + cr.lastChunk = true + continue + } + if cr.err != nil { + return 0, cr.err + } + cr.state = readChunk + case readChunkTrailer: + cr.err = readCRLF(cr.reader) + if cr.err != nil { + return 0, errMalformedEncoding + } + cr.state = verifyChunk + case readChunk: + // There is no more space left in the request buffer. + if len(buf) == 0 { + return n, nil + } + rbuf := buf + // The request buffer is larger than the current chunk size. + // Read only the current chunk from the underlying reader. + if uint64(len(rbuf)) > cr.n { + rbuf = rbuf[:cr.n] + } + var n0 int + n0, cr.err = cr.reader.Read(rbuf) + if cr.err != nil { + // We have lesser than chunk size advertised in chunkHeader, this is 'unexpected'. + if cr.err == io.EOF { + cr.err = io.ErrUnexpectedEOF + } + return 0, cr.err + } + + // Update the bytes read into request buffer so far. + n += n0 + buf = buf[n0:] + // Update bytes to be read of the current chunk before verifying chunk's signature. + cr.n -= uint64(n0) + + // If we're at the end of a chunk. + if cr.n == 0 { + cr.state = readChunkTrailer + continue + } + case verifyChunk: + if cr.lastChunk { + cr.state = eofChunk + } else { + cr.state = readChunkHeader + } + case eofChunk: + return n, io.EOF + } + } +} + +// readCRLF - check if reader only has '\r\n' CRLF character. +// returns malformed encoding if it doesn't. +func readCRLF(reader io.Reader) error { + buf := make([]byte, 2) + _, err := io.ReadFull(reader, buf[:2]) + if err != nil { + return err + } + if buf[0] != '\r' || buf[1] != '\n' { + return errMalformedEncoding + } + return nil +} + +// Read a line of bytes (up to \n) from b. +// Give up if the line exceeds maxLineLength. +// The returned bytes are owned by the bufio.Reader +// so they are only valid until the next bufio read. +func readChunkLine(b *bufio.Reader) ([]byte, []byte, error) { + buf, err := b.ReadSlice('\n') + if err != nil { + // We always know when EOF is coming. + // If the caller asked for a line, there should be a line. + if err == io.EOF { + err = io.ErrUnexpectedEOF + } else if err == bufio.ErrBufferFull { + err = errLineTooLong + } + return nil, nil, err + } + if len(buf) >= maxLineLength { + return nil, nil, errLineTooLong + } + // Parse s3 specific chunk extension and fetch the values. + hexChunkSize, hexChunkSignature := parseS3ChunkExtension(buf) + return hexChunkSize, hexChunkSignature, nil +} + +// trimTrailingWhitespace - trim trailing white space. +func trimTrailingWhitespace(b []byte) []byte { + for len(b) > 0 && isASCIISpace(b[len(b)-1]) { + b = b[:len(b)-1] + } + return b +} + +// isASCIISpace - is ascii space? +func isASCIISpace(b byte) bool { + return b == ' ' || b == '\t' || b == '\n' || b == '\r' +} + +// Constant s3 chunk encoding signature. +const s3ChunkSignatureStr = ";chunk-signature=" + +// parses3ChunkExtension removes any s3 specific chunk-extension from buf. +// For example, +// "10000;chunk-signature=..." => "10000", "chunk-signature=..." +func parseS3ChunkExtension(buf []byte) ([]byte, []byte) { + buf = trimTrailingWhitespace(buf) + semi := bytes.Index(buf, []byte(s3ChunkSignatureStr)) + // Chunk signature not found, return the whole buffer. + if semi == -1 { + return buf, nil + } + return buf[:semi], parseChunkSignature(buf[semi:]) +} + +// parseChunkSignature - parse chunk signature. +func parseChunkSignature(chunk []byte) []byte { + chunkSplits := bytes.SplitN(chunk, []byte(s3ChunkSignatureStr), 2) + return chunkSplits[1] +} + +// parse hex to uint64. +func parseHexUint(v []byte) (n uint64, err error) { + for i, b := range v { + switch { + case '0' <= b && b <= '9': + b = b - '0' + case 'a' <= b && b <= 'f': + b = b - 'a' + 10 + case 'A' <= b && b <= 'F': + b = b - 'A' + 10 + default: + return 0, errors.New("invalid byte in chunk length") + } + if i == 16 { + return 0, errors.New("http chunk length too large") + } + n <<= 4 + n |= uint64(b) + } + return +} diff --git a/weed/s3api/s3api_auth.go b/weed/s3api/s3api_auth.go new file mode 100644 index 000000000..f429ad28d --- /dev/null +++ b/weed/s3api/s3api_auth.go @@ -0,0 +1,90 @@ +package s3api + +import ( + "net/http" + "strings" +) + +// AWS Signature Version '4' constants. +const ( + signV4Algorithm = "AWS4-HMAC-SHA256" + signV2Algorithm = "AWS" +) + +// Verify if request has JWT. +func isRequestJWT(r *http.Request) bool { + return strings.HasPrefix(r.Header.Get("Authorization"), "Bearer") +} + +// Verify if request has AWS Signature Version '4'. +func isRequestSignatureV4(r *http.Request) bool { + return strings.HasPrefix(r.Header.Get("Authorization"), signV4Algorithm) +} + +// Verify if request has AWS Signature Version '2'. +func isRequestSignatureV2(r *http.Request) bool { + return (!strings.HasPrefix(r.Header.Get("Authorization"), signV4Algorithm) && + strings.HasPrefix(r.Header.Get("Authorization"), signV2Algorithm)) +} + +// Verify if request has AWS PreSign Version '4'. +func isRequestPresignedSignatureV4(r *http.Request) bool { + _, ok := r.URL.Query()["X-Amz-Credential"] + return ok +} + +// Verify request has AWS PreSign Version '2'. +func isRequestPresignedSignatureV2(r *http.Request) bool { + _, ok := r.URL.Query()["AWSAccessKeyId"] + return ok +} + +// Verify if request has AWS Post policy Signature Version '4'. +func isRequestPostPolicySignatureV4(r *http.Request) bool { + return strings.Contains(r.Header.Get("Content-Type"), "multipart/form-data") && + r.Method == http.MethodPost +} + +// Verify if the request has AWS Streaming Signature Version '4'. This is only valid for 'PUT' operation. +func isRequestSignStreamingV4(r *http.Request) bool { + return r.Header.Get("x-amz-content-sha256") == streamingContentSHA256 && + r.Method == http.MethodPut +} + +// Authorization type. +type authType int + +// List of all supported auth types. +const ( + authTypeUnknown authType = iota + authTypeAnonymous + authTypePresigned + authTypePresignedV2 + authTypePostPolicy + authTypeStreamingSigned + authTypeSigned + authTypeSignedV2 + authTypeJWT +) + +// Get request authentication type. +func getRequestAuthType(r *http.Request) authType { + if isRequestSignatureV2(r) { + return authTypeSignedV2 + } else if isRequestPresignedSignatureV2(r) { + return authTypePresignedV2 + } else if isRequestSignStreamingV4(r) { + return authTypeStreamingSigned + } else if isRequestSignatureV4(r) { + return authTypeSigned + } else if isRequestPresignedSignatureV4(r) { + return authTypePresigned + } else if isRequestJWT(r) { + return authTypeJWT + } else if isRequestPostPolicySignatureV4(r) { + return authTypePostPolicy + } else if _, ok := r.Header["Authorization"]; !ok { + return authTypeAnonymous + } + return authTypeUnknown +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index f0d13af05..bd41a48fe 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -41,9 +41,15 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } + rAuthType := getRequestAuthType(r) + dataReader := r.Body + if rAuthType == authTypeStreamingSigned{ + dataReader = newSignV4ChunkedReader(r) + } + uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket) - proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body) + proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader) if err != nil { glog.Errorf("NewRequest %s: %v", uploadUrl, err)