Browse Source

multipart boundaries

pull/7481/head
chrislu 3 weeks ago
parent
commit
47cd650ab6
  1. 3
      .github/workflows/test-s3-over-https-using-awscli.yml
  2. 39
      weed/s3api/filer_multipart.go
  3. 1
      weed/s3api/s3_constants/header.go
  4. 180
      weed/s3api/s3api_object_handlers.go

3
.github/workflows/test-s3-over-https-using-awscli.yml

@ -83,7 +83,8 @@ jobs:
set -e
dd if=/dev/urandom of=generated bs=1M count=32
ETAG=$(aws --no-verify-ssl s3api put-object --bucket bucket --key test-get-obj --body generated | jq -r .ETag)
aws --no-verify-ssl s3api get-object --bucket bucket --key test-get-obj --if-match ${ETAG:1:32} downloaded
# jq -r already removes quotes, so use ETAG directly (handles both simple and multipart ETags)
aws --no-verify-ssl s3api get-object --bucket bucket --key test-get-obj --if-match "$ETAG" downloaded
diff -q generated downloaded
rm -f generated downloaded

39
weed/s3api/filer_multipart.go

@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"fmt"
"math"
@ -261,6 +262,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
// Track part boundaries for later retrieval with PartNumber parameter
type PartBoundary struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"` // exclusive
ETag string `json:"etag"`
}
var partBoundaries []PartBoundary
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
@ -281,6 +292,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
continue
}
// Record the start chunk index for this part
partStartChunk := len(finalParts)
// Calculate the part's ETag (for GetObject with PartNumber)
partETag := filer.ETag(entry)
for _, chunk := range entry.GetChunks() {
// CRITICAL: Do NOT modify SSE metadata offsets during assembly!
// The encrypted data was created with the offset stored in chunk.SseMetadata.
@ -303,6 +320,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
}
// Record the part boundary
partEndChunk := len(finalParts)
partBoundaries = append(partBoundaries, PartBoundary{
PartNumber: partNumber,
StartChunk: partStartChunk,
EndChunk: partEndChunk,
ETag: partETag,
})
found = true
}
}
@ -334,6 +361,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@ -393,6 +424,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@ -442,6 +477,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)

1
weed/s3api/s3_constants/header.go

@ -43,6 +43,7 @@ const (
SeaweedFSPartNumber = "X-Seaweedfs-Part-Number"
SeaweedFSUploadId = "X-Seaweedfs-Upload-Id"
SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count"
SeaweedFSMultipartPartBoundaries = "X-Seaweedfs-Multipart-Part-Boundaries" // JSON: [{part:1,start:0,end:2,etag:"abc"},{part:2,start:2,end:3,etag:"def"}]
SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3"
AmzMpPartsCount = "x-amz-mp-parts-count"

180
weed/s3api/s3api_object_handlers.go

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
@ -490,34 +491,56 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// This replicates the filer handler logic
if partNumberStr != "" {
if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 {
// Validate part number (1-based)
if partNumber > len(objectEntryForSSE.Chunks) {
glog.Warningf("GetObject: Invalid part number %d, object has %d chunks", partNumber, len(objectEntryForSSE.Chunks))
// Get actual parts count from metadata (not chunk count)
partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber)
// Validate part number
if partNumber > partsCount {
glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
// Set parts count header (use actual chunk count like filer does)
w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(len(objectEntryForSSE.Chunks)))
glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", len(objectEntryForSSE.Chunks), partNumber)
// Get the specific part chunk
chunkIndex := partNumber - 1
partChunk := objectEntryForSSE.Chunks[chunkIndex]
// Set parts count header
w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount))
glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber)
// Override ETag with the specific part's ETag
if partChunk.ETag != "" {
// chunk.ETag is base64-encoded, convert to hex for S3 compatibility
if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
partETag := fmt.Sprintf("%x", md5Bytes)
w.Header().Set("ETag", "\""+partETag+"\"")
glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s", partNumber, partETag)
// Calculate the byte range for this part
var startOffset, endOffset int64
if partInfo != nil {
// Use part boundaries from metadata (accurate for multi-chunk parts)
startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset
lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1]
endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1
// Override ETag with the part's ETag from metadata
w.Header().Set("ETag", "\""+partInfo.ETag+"\"")
glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag)
} else {
// Fallback: assume 1:1 part-to-chunk mapping (backward compatibility)
chunkIndex := partNumber - 1
if chunkIndex >= len(objectEntryForSSE.Chunks) {
glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks))
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
partChunk := objectEntryForSSE.Chunks[chunkIndex]
startOffset = partChunk.Offset
endOffset = partChunk.Offset + int64(partChunk.Size) - 1
// Override ETag with chunk's ETag (fallback)
if partChunk.ETag != "" {
if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
partETag := fmt.Sprintf("%x", md5Bytes)
w.Header().Set("ETag", "\""+partETag+"\"")
glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag)
}
}
}
// CRITICAL: Set Range header to read only this part's bytes (matches filer logic)
// This ensures we stream only the specific part, not the entire object
rangeHeader := fmt.Sprintf("bytes=%d-%d", partChunk.Offset, uint64(partChunk.Offset)+partChunk.Size-1)
rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)
r.Header.Set("Range", rangeHeader)
glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader)
}
@ -1155,10 +1178,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
}
// Copy the decrypted chunk data
glog.V(2).Infof("streamDecryptedRangeFromChunks: about to copy decrypted chunk %s, expected ViewSize=%d", chunkView.FileId, chunkView.ViewSize)
written, copyErr := io.Copy(w, decryptedChunkReader)
glog.V(2).Infof("streamDecryptedRangeFromChunks: io.Copy completed for chunk %s, written=%d, expected=%d, copyErr=%v",
chunkView.FileId, written, chunkView.ViewSize, copyErr)
if closer, ok := decryptedChunkReader.(io.Closer); ok {
closeErr := closer.Close()
if closeErr != nil {
@ -1222,27 +1242,18 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil
// CRITICAL: Fetch FULL encrypted chunk (not just the range we need)
// CTR mode is a stream cipher - we must decrypt from the beginning and skip to the position
// Enterprise approach: decrypt full chunk, then skip in decrypted stream
glog.V(2).Infof("decryptSSECChunkView: fetching FULL chunk %s (size=%d)", chunkView.FileId, fileChunk.Size)
// Fetch full chunk instead of using chunkView range
fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
if err != nil {
return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
}
// Calculate IV using PartOffset (NOT ViewOffset!)
// Calculate IV using PartOffset
// PartOffset is the position of this chunk within its part's encrypted stream
// Enterprise uses: if PartOffset > 0 { IV = calculateIVWithOffset(baseIV, PartOffset) }
var adjustedIV []byte
if ssecMetadata.PartOffset > 0 {
adjustedIV = adjustCTRIV(chunkIV, ssecMetadata.PartOffset)
glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, adjusted IV for PartOffset=%d, baseIV=%x, adjustedIV=%x",
chunkView.FileId, ssecMetadata.PartOffset, chunkIV[:8], adjustedIV[:8])
} else {
// PartOffset == 0, use base IV as-is
adjustedIV = chunkIV
glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, using base IV (PartOffset=0)", chunkView.FileId)
}
// Decrypt the full chunk
@ -1252,10 +1263,8 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil
return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr)
}
// Now skip to the position we need in the decrypted stream
// chunkView.OffsetInChunk tells us how many bytes to skip
// Skip to the position we need in the decrypted stream
if chunkView.OffsetInChunk > 0 {
glog.V(2).Infof("SSE-C DECRYPT: Skipping %d bytes in decrypted stream", chunkView.OffsetInChunk)
_, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
if err != nil {
if closer, ok := decryptedReader.(io.Closer); ok {
@ -1267,13 +1276,10 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil
// Return a reader that only reads ViewSize bytes
limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, returning reader for %d bytes at offset %d",
chunkView.FileId, chunkView.ViewSize, chunkView.OffsetInChunk)
return io.NopCloser(limitedReader), nil
}
// Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it)
glog.Warningf("decryptSSECChunkView: chunk=%s has no SSE-C metadata, returning raw encrypted reader", chunkView.FileId)
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil {
return nil, err
@ -1313,7 +1319,6 @@ func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *f
ChunkOffset: sseKMSKey.ChunkOffset,
}
glog.V(3).Infof("decryptSSEKMSChunkView: chunk=%s, ChunkOffset=%d", chunkView.FileId, sseKMSKey.ChunkOffset)
decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey)
if decryptErr != nil {
fullChunkReader.Close()
@ -1362,7 +1367,6 @@ func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *fi
return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
}
glog.V(2).Infof("decryptSSES3ChunkView: chunk=%s, using base IV", chunkView.FileId)
decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv)
if decryptErr != nil {
fullChunkReader.Close()
@ -1452,9 +1456,6 @@ func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.R
return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId)
}
glog.V(2).Infof("fetchFullChunk: chunk=%s, status=%d, Content-Length=%d",
fileId, resp.StatusCode, resp.ContentLength)
return resp.Body, nil
}
@ -1484,9 +1485,6 @@ func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer
if !chunkView.IsFullChunk() {
rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd))
glog.V(2).Infof("fetchChunkViewData: chunk=%s, isFullChunk=false, Range header=bytes=%d-%d", chunkView.FileId, chunkView.OffsetInChunk, rangeEnd)
} else {
glog.V(2).Infof("fetchChunkViewData: chunk=%s, isFullChunk=true, no Range header", chunkView.FileId)
}
// Set JWT for authentication
@ -1505,9 +1503,6 @@ func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer
return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId)
}
glog.V(2).Infof("fetchChunkViewData: chunk=%s, status=%d, Content-Length=%d, expected ViewSize=%d, Content-Range=%s",
chunkView.FileId, resp.StatusCode, resp.ContentLength, chunkView.ViewSize, resp.Header.Get("Content-Range"))
return resp.Body, nil
}
@ -1999,28 +1994,40 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// If PartNumber is specified, set headers (matching filer logic)
if partNumberStr != "" {
if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 {
// Validate part number (1-based)
if partNumber > len(objectEntryForSSE.Chunks) {
glog.Warningf("HeadObject: Invalid part number %d, object has %d chunks", partNumber, len(objectEntryForSSE.Chunks))
// Get actual parts count from metadata (not chunk count)
partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber)
// Validate part number
if partNumber > partsCount {
glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
// Set parts count header (use actual chunk count like filer does)
w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(len(objectEntryForSSE.Chunks)))
glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", len(objectEntryForSSE.Chunks), partNumber)
// Set parts count header
w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount))
glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber)
// Get the specific part chunk
chunkIndex := partNumber - 1
partChunk := objectEntryForSSE.Chunks[chunkIndex]
// Override ETag with the specific part's ETag
if partChunk.ETag != "" {
// chunk.ETag is base64-encoded, convert to hex for S3 compatibility
if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
partETag := fmt.Sprintf("%x", md5Bytes)
w.Header().Set("ETag", "\""+partETag+"\"")
glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s", partNumber, partETag)
// Override ETag with the part's ETag
if partInfo != nil {
// Use part ETag from metadata (accurate for multi-chunk parts)
w.Header().Set("ETag", "\""+partInfo.ETag+"\"")
glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag)
} else {
// Fallback: use chunk's ETag (backward compatibility)
chunkIndex := partNumber - 1
if chunkIndex >= len(objectEntryForSSE.Chunks) {
glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks))
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
partChunk := objectEntryForSSE.Chunks[chunkIndex]
if partChunk.ETag != "" {
if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
partETag := fmt.Sprintf("%x", md5Bytes)
w.Header().Set("ETag", "\""+partETag+"\"")
glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag)
}
}
}
}
@ -3334,3 +3341,46 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox
return multiReader, nil
}
// PartBoundaryInfo holds information about a part's chunk boundaries
type PartBoundaryInfo struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"` // exclusive
ETag string `json:"etag"`
}
// getMultipartInfo retrieves multipart metadata for a given part number
// Returns: (partsCount, partInfo)
// - partsCount: total number of parts in the multipart object
// - partInfo: boundary information for the requested part (nil if not found or not a multipart object)
func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) {
if entry == nil || entry.Extended == nil {
// Not a multipart object or no metadata
return len(entry.GetChunks()), nil
}
// Try to get parts count from metadata
partsCount := len(entry.GetChunks()) // default fallback
if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists {
if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 {
partsCount = count
}
}
// Try to get part boundaries from metadata
if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists {
var boundaries []PartBoundaryInfo
if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil {
// Find the requested part
for i := range boundaries {
if boundaries[i].PartNumber == partNumber {
return partsCount, &boundaries[i]
}
}
}
}
// No part boundaries metadata or part not found
return partsCount, nil
}
Loading…
Cancel
Save