diff --git a/.github/workflows/test-s3-over-https-using-awscli.yml b/.github/workflows/test-s3-over-https-using-awscli.yml index a0279c42a..0a5cd7072 100644 --- a/.github/workflows/test-s3-over-https-using-awscli.yml +++ b/.github/workflows/test-s3-over-https-using-awscli.yml @@ -83,7 +83,8 @@ jobs: set -e dd if=/dev/urandom of=generated bs=1M count=32 ETAG=$(aws --no-verify-ssl s3api put-object --bucket bucket --key test-get-obj --body generated | jq -r .ETag) - aws --no-verify-ssl s3api get-object --bucket bucket --key test-get-obj --if-match ${ETAG:1:32} downloaded + # jq -r already removes quotes, so use ETAG directly (handles both simple and multipart ETags) + aws --no-verify-ssl s3api get-object --bucket bucket --key test-get-obj --if-match "$ETAG" downloaded diff -q generated downloaded rm -f generated downloaded diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index e52364441..465694147 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/base64" "encoding/hex" + "encoding/json" "encoding/xml" "fmt" "math" @@ -261,6 +262,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl mime := pentry.Attributes.Mime var finalParts []*filer_pb.FileChunk var offset int64 + + // Track part boundaries for later retrieval with PartNumber parameter + type PartBoundary struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` // exclusive + ETag string `json:"etag"` + } + var partBoundaries []PartBoundary + for _, partNumber := range completedPartNumbers { partEntriesByNumber, ok := partEntries[partNumber] if !ok { @@ -281,6 +292,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl continue } + // Record the start chunk index for this part + partStartChunk := len(finalParts) + + // Calculate the part's ETag (for GetObject with PartNumber) + partETag := filer.ETag(entry) + for _, chunk := range entry.GetChunks() { // CRITICAL: Do NOT modify SSE metadata offsets during assembly! // The encrypted data was created with the offset stored in chunk.SseMetadata. @@ -303,6 +320,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl finalParts = append(finalParts, p) offset += int64(chunk.Size) } + + // Record the part boundary + partEndChunk := len(finalParts) + partBoundaries = append(partBoundaries, PartBoundary{ + PartNumber: partNumber, + StartChunk: partStartChunk, + EndChunk: partEndChunk, + ETag: partETag, + }) + found = true } } @@ -334,6 +361,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) // Store parts count for x-amz-mp-parts-count header versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { + versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for versioned multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -393,6 +424,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") // Store parts count for x-amz-mp-parts-count header entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil { + entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for suspended versioning multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -442,6 +477,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) // Store parts count for x-amz-mp-parts-count header entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { + entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for non-versioned multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 019ac63ed..757fcf77f 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -43,6 +43,7 @@ const ( SeaweedFSPartNumber = "X-Seaweedfs-Part-Number" SeaweedFSUploadId = "X-Seaweedfs-Upload-Id" SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count" + SeaweedFSMultipartPartBoundaries = "X-Seaweedfs-Multipart-Part-Boundaries" // JSON: [{part:1,start:0,end:2,etag:"abc"},{part:2,start:2,end:3,etag:"def"}] SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3" AmzMpPartsCount = "x-amz-mp-parts-count" diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 0862b2b54..5619718b6 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -490,34 +491,56 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // This replicates the filer handler logic if partNumberStr != "" { if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { - // Validate part number (1-based) - if partNumber > len(objectEntryForSSE.Chunks) { - glog.Warningf("GetObject: Invalid part number %d, object has %d chunks", partNumber, len(objectEntryForSSE.Chunks)) + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) + + // Validate part number + if partNumber > partsCount { + glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) return } - // Set parts count header (use actual chunk count like filer does) - w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(len(objectEntryForSSE.Chunks))) - glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", len(objectEntryForSSE.Chunks), partNumber) - - // Get the specific part chunk - chunkIndex := partNumber - 1 - partChunk := objectEntryForSSE.Chunks[chunkIndex] + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber) - // Override ETag with the specific part's ETag - if partChunk.ETag != "" { - // chunk.ETag is base64-encoded, convert to hex for S3 compatibility - if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { - partETag := fmt.Sprintf("%x", md5Bytes) - w.Header().Set("ETag", "\""+partETag+"\"") - glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s", partNumber, partETag) + // Calculate the byte range for this part + var startOffset, endOffset int64 + if partInfo != nil { + // Use part boundaries from metadata (accurate for multi-chunk parts) + startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset + lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1] + endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1 + + // Override ETag with the part's ETag from metadata + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: assume 1:1 part-to-chunk mapping (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + startOffset = partChunk.Offset + endOffset = partChunk.Offset + int64(partChunk.Size) - 1 + + // Override ETag with chunk's ETag (fallback) + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } } } // CRITICAL: Set Range header to read only this part's bytes (matches filer logic) // This ensures we stream only the specific part, not the entire object - rangeHeader := fmt.Sprintf("bytes=%d-%d", partChunk.Offset, uint64(partChunk.Offset)+partChunk.Size-1) + rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset) r.Header.Set("Range", rangeHeader) glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader) } @@ -1155,10 +1178,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io } // Copy the decrypted chunk data - glog.V(2).Infof("streamDecryptedRangeFromChunks: about to copy decrypted chunk %s, expected ViewSize=%d", chunkView.FileId, chunkView.ViewSize) written, copyErr := io.Copy(w, decryptedChunkReader) - glog.V(2).Infof("streamDecryptedRangeFromChunks: io.Copy completed for chunk %s, written=%d, expected=%d, copyErr=%v", - chunkView.FileId, written, chunkView.ViewSize, copyErr) if closer, ok := decryptedChunkReader.(io.Closer); ok { closeErr := closer.Close() if closeErr != nil { @@ -1222,27 +1242,18 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil // CRITICAL: Fetch FULL encrypted chunk (not just the range we need) // CTR mode is a stream cipher - we must decrypt from the beginning and skip to the position - // Enterprise approach: decrypt full chunk, then skip in decrypted stream - glog.V(2).Infof("decryptSSECChunkView: fetching FULL chunk %s (size=%d)", chunkView.FileId, fileChunk.Size) - - // Fetch full chunk instead of using chunkView range fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) if err != nil { return nil, fmt.Errorf("failed to fetch full chunk: %w", err) } - // Calculate IV using PartOffset (NOT ViewOffset!) + // Calculate IV using PartOffset // PartOffset is the position of this chunk within its part's encrypted stream - // Enterprise uses: if PartOffset > 0 { IV = calculateIVWithOffset(baseIV, PartOffset) } var adjustedIV []byte if ssecMetadata.PartOffset > 0 { adjustedIV = adjustCTRIV(chunkIV, ssecMetadata.PartOffset) - glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, adjusted IV for PartOffset=%d, baseIV=%x, adjustedIV=%x", - chunkView.FileId, ssecMetadata.PartOffset, chunkIV[:8], adjustedIV[:8]) } else { - // PartOffset == 0, use base IV as-is adjustedIV = chunkIV - glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, using base IV (PartOffset=0)", chunkView.FileId) } // Decrypt the full chunk @@ -1252,10 +1263,8 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr) } - // Now skip to the position we need in the decrypted stream - // chunkView.OffsetInChunk tells us how many bytes to skip + // Skip to the position we need in the decrypted stream if chunkView.OffsetInChunk > 0 { - glog.V(2).Infof("SSE-C DECRYPT: Skipping %d bytes in decrypted stream", chunkView.OffsetInChunk) _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) if err != nil { if closer, ok := decryptedReader.(io.Closer); ok { @@ -1267,13 +1276,10 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil // Return a reader that only reads ViewSize bytes limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) - glog.V(2).Infof("SSE-C DECRYPT: chunk=%s, returning reader for %d bytes at offset %d", - chunkView.FileId, chunkView.ViewSize, chunkView.OffsetInChunk) return io.NopCloser(limitedReader), nil } // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) - glog.Warningf("decryptSSECChunkView: chunk=%s has no SSE-C metadata, returning raw encrypted reader", chunkView.FileId) encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) if err != nil { return nil, err @@ -1313,7 +1319,6 @@ func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *f ChunkOffset: sseKMSKey.ChunkOffset, } - glog.V(3).Infof("decryptSSEKMSChunkView: chunk=%s, ChunkOffset=%d", chunkView.FileId, sseKMSKey.ChunkOffset) decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey) if decryptErr != nil { fullChunkReader.Close() @@ -1362,7 +1367,6 @@ func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *fi return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) } - glog.V(2).Infof("decryptSSES3ChunkView: chunk=%s, using base IV", chunkView.FileId) decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv) if decryptErr != nil { fullChunkReader.Close() @@ -1452,9 +1456,6 @@ func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.R return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId) } - glog.V(2).Infof("fetchFullChunk: chunk=%s, status=%d, Content-Length=%d", - fileId, resp.StatusCode, resp.ContentLength) - return resp.Body, nil } @@ -1484,9 +1485,6 @@ func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer if !chunkView.IsFullChunk() { rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1 req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd)) - glog.V(2).Infof("fetchChunkViewData: chunk=%s, isFullChunk=false, Range header=bytes=%d-%d", chunkView.FileId, chunkView.OffsetInChunk, rangeEnd) - } else { - glog.V(2).Infof("fetchChunkViewData: chunk=%s, isFullChunk=true, no Range header", chunkView.FileId) } // Set JWT for authentication @@ -1505,9 +1503,6 @@ func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId) } - glog.V(2).Infof("fetchChunkViewData: chunk=%s, status=%d, Content-Length=%d, expected ViewSize=%d, Content-Range=%s", - chunkView.FileId, resp.StatusCode, resp.ContentLength, chunkView.ViewSize, resp.Header.Get("Content-Range")) - return resp.Body, nil } @@ -1999,28 +1994,40 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // If PartNumber is specified, set headers (matching filer logic) if partNumberStr != "" { if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { - // Validate part number (1-based) - if partNumber > len(objectEntryForSSE.Chunks) { - glog.Warningf("HeadObject: Invalid part number %d, object has %d chunks", partNumber, len(objectEntryForSSE.Chunks)) + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) + + // Validate part number + if partNumber > partsCount { + glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) return } - // Set parts count header (use actual chunk count like filer does) - w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(len(objectEntryForSSE.Chunks))) - glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", len(objectEntryForSSE.Chunks), partNumber) + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber) - // Get the specific part chunk - chunkIndex := partNumber - 1 - partChunk := objectEntryForSSE.Chunks[chunkIndex] - - // Override ETag with the specific part's ETag - if partChunk.ETag != "" { - // chunk.ETag is base64-encoded, convert to hex for S3 compatibility - if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { - partETag := fmt.Sprintf("%x", md5Bytes) - w.Header().Set("ETag", "\""+partETag+"\"") - glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s", partNumber, partETag) + // Override ETag with the part's ETag + if partInfo != nil { + // Use part ETag from metadata (accurate for multi-chunk parts) + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: use chunk's ETag (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } } } } @@ -3334,3 +3341,46 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return multiReader, nil } + +// PartBoundaryInfo holds information about a part's chunk boundaries +type PartBoundaryInfo struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` // exclusive + ETag string `json:"etag"` +} + +// getMultipartInfo retrieves multipart metadata for a given part number +// Returns: (partsCount, partInfo) +// - partsCount: total number of parts in the multipart object +// - partInfo: boundary information for the requested part (nil if not found or not a multipart object) +func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) { + if entry == nil || entry.Extended == nil { + // Not a multipart object or no metadata + return len(entry.GetChunks()), nil + } + + // Try to get parts count from metadata + partsCount := len(entry.GetChunks()) // default fallback + if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists { + if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 { + partsCount = count + } + } + + // Try to get part boundaries from metadata + if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists { + var boundaries []PartBoundaryInfo + if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil { + // Find the requested part + for i := range boundaries { + if boundaries[i].PartNumber == partNumber { + return partsCount, &boundaries[i] + } + } + } + } + + // No part boundaries metadata or part not found + return partsCount, nil +}