From c284e51d201588f828081050a906e50610b66031 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Feb 2026 21:54:43 -0800 Subject: [PATCH] fix: multipart upload ETag calculation (#8238) * fix multipart etag * address comments * clean up * clean up * optimization * address comments * unquoted etag * dedup * upgrade * clean * etag * return quoted tag * quoted etag * debug * s3api: unify ETag retrieval and quoting across handlers Refactor newListEntry to take *S3ApiServer and use getObjectETag, and update setResponseHeaders to use the same logic. This ensures consistent ETags are returned for both listing and direct access. * s3api: implement ListObjects deduplication for versioned buckets Handle duplicate entries between the main path and the .versions directory by prioritizing the latest version when bucket versioning is enabled. * s3api: cleanup stale main file entries during versioned uploads Add explicit deletion of pre-existing "main" files when creating new versions in versioned buckets. This prevents stale entries from appearing in bucket listings and ensures consistency. * s3api: fix cleanup code placement in versioned uploads Correct the placement of rm calls in completeMultipartUpload and putVersionedObject to ensure stale main files are properly deleted during versioned uploads. * s3api: improve getObjectETag fallback for empty ExtETagKey Ensure that when ExtETagKey exists but contains an empty value, the function falls through to MD5/chunk-based calculation instead of returning an empty string. * s3api: fix test files for new newListEntry signature Update test files to use the new newListEntry signature where the first parameter is *S3ApiServer. Created mockS3ApiServer to properly test owner display name lookup functionality. * s3api: use filer.ETag for consistent Md5 handling in getEtagFromEntry Change getEtagFromEntry fallback to use filer.ETag(entry) instead of filer.ETagChunks to ensure legacy entries with Attributes.Md5 are handled consistently with the rest of the codebase. * s3api: optimize list logic and fix conditional header logging - Hoist bucket versioning check out of per-entry callback to avoid repeated getVersioningState calls - Extract appendOrDedup helper function to eliminate duplicate dedup/append logic across multiple code paths - Change If-Match mismatch logging from glog.Errorf to glog.V(3).Infof and remove DEBUG prefix for consistency * s3api: fix test mock to properly initialize IAM accounts Fixed nil pointer dereference in TestNewListEntryOwnerDisplayName by directly initializing the IdentityAccessManagement.accounts map in the test setup. This ensures newListEntry can properly look up account display names without panicking. * cleanup * s3api: remove premature main file cleanup in versioned uploads Removed incorrect cleanup logic that was deleting main files during versioned uploads. This was causing test failures because it deleted objects that should have been preserved as null versions when versioning was first enabled. The deduplication logic in listing is sufficient to handle duplicate entries without deleting files during upload. * s3api: add empty-value guard to getEtagFromEntry Added the same empty-value guard used in getObjectETag to prevent returning quoted empty strings. When ExtETagKey exists but is empty, the function now falls through to filer.ETag calculation instead of returning "". * s3api: fix listing of directory key objects with matching prefix Revert prefix handling logic to use strings.TrimPrefix instead of checking HasPrefix with empty string result. This ensures that when a directory key object exactly matches the prefix (e.g. prefix="dir/", object="dir/"), it is correctly handled as a regular entry instead of being skipped or incorrectly processed as a common prefix. Also fixed missing variable definition. * s3api: refactor list inline dedup to use appendOrDedup helper Refactored the inline deduplication logic in listFilerEntries to use the shared appendOrDedup helper function. This ensures consistent behavior and reduces code duplication. * test: fix port allocation race in s3tables integration test Updated startMiniCluster to find all required ports simultaneously using findAvailablePorts instead of sequentially. This prevents race conditions where the OS reallocates a port that was just released, causing multiple services (e.g. Filer and Volume) to be assigned the same port and fail to start. --- .github/workflows/helm_ci.yml | 2 +- test/s3/etag/s3_etag_test.go | 104 ++++++++++- .../s3tables_integration_test.go | 74 ++++---- weed/admin/view/app/maintenance_queue.templ | 5 - .../admin/view/app/maintenance_queue_templ.go | 6 +- weed/filer/filechunks.go | 3 +- weed/operation/upload_chunked.go | 7 +- weed/s3api/filer_multipart.go | 94 ++++++++-- weed/s3api/s3api_etag_quoting_test.go | 167 ++++++++++++++++++ weed/s3api/s3api_object_handlers.go | 18 +- weed/s3api/s3api_object_handlers_list.go | 45 +++-- weed/s3api/s3api_object_handlers_put.go | 21 ++- weed/s3api/s3api_object_handlers_test.go | 20 ++- weed/s3api/s3api_object_versioning.go | 3 + .../filer_server_handlers_write_upload.go | 11 +- weed/server/volume_server_handlers_write.go | 1 + weed/storage/needle/needle_parse_upload.go | 9 +- 17 files changed, 479 insertions(+), 111 deletions(-) create mode 100644 weed/s3api/s3api_etag_quoting_test.go diff --git a/.github/workflows/helm_ci.yml b/.github/workflows/helm_ci.yml index 268052a5a..f6df4819e 100644 --- a/.github/workflows/helm_ci.yml +++ b/.github/workflows/helm_ci.yml @@ -27,7 +27,7 @@ jobs: - uses: actions/setup-python@v6 with: - python-version: '3.9' + python-version: '3.10' check-latest: true - name: Set up chart-testing diff --git a/test/s3/etag/s3_etag_test.go b/test/s3/etag/s3_etag_test.go index 9b1e948b0..5643afc18 100644 --- a/test/s3/etag/s3_etag_test.go +++ b/test/s3/etag/s3_etag_test.go @@ -25,6 +25,7 @@ import ( "fmt" "io" mathrand "math/rand" + "os" "regexp" "strings" "testing" @@ -85,6 +86,12 @@ func init() { // getS3Client creates an AWS S3 v2 client for testing func getS3Client(t *testing.T) *s3.Client { + endpoint := os.Getenv("S3_ENDPOINT") + if endpoint == "" { + endpoint = defaultConfig.Endpoint + } + t.Logf("Using S3 endpoint: %s", endpoint) + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(defaultConfig.Region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( @@ -95,7 +102,7 @@ func getS3Client(t *testing.T) *s3.Client { config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc( func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ - URL: defaultConfig.Endpoint, + URL: endpoint, SigningRegion: defaultConfig.Region, HostnameImmutable: true, }, nil @@ -403,6 +410,101 @@ func TestMultipartUploadETagFormat(t *testing.T) { "Part count in ETag should match number of parts uploaded") } +func TestMultipartUploadETagVerification(t *testing.T) { + ctx := context.Background() + client := getS3Client(t) + + bucketName := getNewBucketName() + err := createTestBucket(ctx, client, bucketName) + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + // Create test data for multipart upload (11MB = 2 parts: 5MB + 6MB) + // Using parts of different sizes to ensure correct calculation + part1Size := 5 * 1024 * 1024 + part2Size := 6 * 1024 * 1024 + data1 := generateRandomData(part1Size) + data2 := generateRandomData(part2Size) + + objectKey := "verify-etag-multipart.bin" + + // Pre-calculate expected ETag + md1 := md5.Sum(data1) + md2 := md5.Sum(data2) + concatenatedMD5s := append(md1[:], md2[:]...) + finalMD5 := md5.Sum(concatenatedMD5s) + expectedETagValue := fmt.Sprintf("%x-2", finalMD5) + + t.Logf("Expected multipart ETag: %s", expectedETagValue) + + // 1. CreateMultipartUpload + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + uploadId := createResp.UploadId + + // 2. UploadPart 1 + putPart1, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + PartNumber: aws.Int32(1), + Body: bytes.NewReader(data1), + }) + require.NoError(t, err) + assert.Equal(t, "\""+calculateMD5(data1)+"\"", aws.ToString(putPart1.ETag)) + + // 3. UploadPart 2 + putPart2, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + PartNumber: aws.Int32(2), + Body: bytes.NewReader(data2), + }) + require.NoError(t, err) + assert.Equal(t, "\""+calculateMD5(data2)+"\"", aws.ToString(putPart2.ETag)) + + // 4. CompleteMultipartUpload + completeResp, err := client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + {ETag: putPart1.ETag, PartNumber: aws.Int32(1)}, + {ETag: putPart2.ETag, PartNumber: aws.Int32(2)}, + }, + }, + }) + require.NoError(t, err) + + completeETag := cleanETag(aws.ToString(completeResp.ETag)) + t.Logf("CompleteMultipartUpload ETag: %s", completeETag) + assert.Equal(t, expectedETagValue, completeETag, "CompleteMultipartUpload ETag mismatch") + + // 5. HeadObject + headResp, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + headETag := cleanETag(aws.ToString(headResp.ETag)) + assert.Equal(t, expectedETagValue, headETag, "HeadObject ETag mismatch") + + // 6. GetObject + getResp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + getETag := cleanETag(aws.ToString(getResp.ETag)) + assert.Equal(t, expectedETagValue, getETag, "GetObject ETag mismatch") + getResp.Body.Close() +} + // TestPutObjectETagConsistency verifies ETag consistency between PUT and GET func TestPutObjectETagConsistency(t *testing.T) { ctx := context.Background() diff --git a/test/s3tables/table-buckets/s3tables_integration_test.go b/test/s3tables/table-buckets/s3tables_integration_test.go index 26d7606a2..2d3ef8677 100644 --- a/test/s3tables/table-buckets/s3tables_integration_test.go +++ b/test/s3tables/table-buckets/s3tables_integration_test.go @@ -480,53 +480,51 @@ func testTargetOperations(t *testing.T, client *S3TablesClient) { // Helper functions -// findAvailablePort finds an available port by binding to port 0 -func findAvailablePort() (int, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err +// findAvailablePorts finds n available ports by binding to port 0 multiple times +// It keeps the listeners open until all ports are found to ensure uniqueness +func findAvailablePorts(n int) ([]int, error) { + listeners := make([]*net.TCPListener, n) + ports := make([]int, n) + + // Open all listeners to ensure we get unique ports + for i := 0; i < n; i++ { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + // Close valid listeners before returning error + for j := 0; j < i; j++ { + listeners[j].Close() + } + return nil, err + } + listeners[i] = listener.(*net.TCPListener) + ports[i] = listeners[i].Addr().(*net.TCPAddr).Port + } + + // Close all listeners + for _, l := range listeners { + l.Close() } - defer listener.Close() - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil + return ports, nil } // startMiniCluster starts a weed mini instance directly without exec func startMiniCluster(t *testing.T) (*TestCluster, error) { // Find available ports - masterPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find master port: %v", err) - } - masterGrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find master grpc port: %v", err) - } - volumePort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find volume port: %v", err) - } - volumeGrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find volume grpc port: %v", err) - } - filerPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find filer port: %v", err) - } - filerGrpcPort, err := findAvailablePort() + // We need 8 unique ports: Master(2), Volume(2), Filer(2), S3(2) + ports, err := findAvailablePorts(8) if err != nil { - return nil, fmt.Errorf("failed to find filer grpc port: %v", err) - } - s3Port, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find s3 port: %v", err) - } - s3GrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find s3 grpc port: %v", err) + return nil, fmt.Errorf("failed to find available ports: %v", err) } + + masterPort := ports[0] + masterGrpcPort := ports[1] + volumePort := ports[2] + volumeGrpcPort := ports[3] + filerPort := ports[4] + filerGrpcPort := ports[5] + s3Port := ports[6] + s3GrpcPort := ports[7] // Create temporary directory for test data testDir := t.TempDir() diff --git a/weed/admin/view/app/maintenance_queue.templ b/weed/admin/view/app/maintenance_queue.templ index d67ab56c2..00650f80a 100644 --- a/weed/admin/view/app/maintenance_queue.templ +++ b/weed/admin/view/app/maintenance_queue.templ @@ -299,11 +299,6 @@ templ MaintenanceQueue(data *maintenance.MaintenanceQueueData) { ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 63, "") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -809,7 +809,7 @@ func ProgressBar(progress float64, status maintenance.MaintenanceTaskStatus) tem var templ_7745c5c3_Var35 string templ_7745c5c3_Var35, templ_7745c5c3_Err = templruntime.SanitizeStyleAttributeValues(fmt.Sprintf("width: %.1f%%", progress)) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 385, Col: 102} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 380, Col: 102} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var35)) if templ_7745c5c3_Err != nil { @@ -822,7 +822,7 @@ func ProgressBar(progress float64, status maintenance.MaintenanceTaskStatus) tem var templ_7745c5c3_Var36 string templ_7745c5c3_Var36, templ_7745c5c3_Err = templ.JoinStringErrs(fmt.Sprintf("%.1f%%", progress)) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 388, Col: 66} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 383, Col: 66} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var36)) if templ_7745c5c3_Err != nil { diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 8a82470c8..6e9ec3f21 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -59,7 +59,8 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { for _, c := range chunks { md5Digests = append(md5Digests, util.Base64Md5ToBytes(c.ETag)) } - return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) + finalETag := fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) + return finalETag } func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 394bf4362..7d8d2b522 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/md5" + "encoding/base64" "fmt" "hash" "io" @@ -171,6 +172,10 @@ uploadLoop: jwt = assignResult.Auth } + // Calculate MD5 for the chunk + chunkMd5 := md5.Sum(buf.Bytes()) + chunkMd5B64 := base64.StdEncoding.EncodeToString(chunkMd5[:]) + uploadOption := &UploadOption{ UploadUrl: uploadUrl, Cipher: opt.Cipher, @@ -178,6 +183,7 @@ uploadLoop: MimeType: opt.MimeType, PairMap: nil, Jwt: jwt, + Md5: chunkMd5B64, } var uploadResult *UploadResult @@ -225,7 +231,6 @@ uploadLoop: } fileChunksLock.Lock() fileChunks = append(fileChunks, chunk) - glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) fileChunksLock.Unlock() }(chunkOffset, bytesBuffer) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index fe875b08c..e787bf18d 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -2,6 +2,7 @@ package s3api import ( "cmp" + "crypto/md5" "crypto/rand" "encoding/base64" "encoding/hex" @@ -206,7 +207,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), + ETag: aws.String(getEtagFromEntry(entry)), Key: objectKey(input.Key), }, s3err.ErrNone } @@ -301,10 +302,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return nil, s3err.ErrInvalidPart } found := false + if len(partEntriesByNumber) > 1 { - slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int { - return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs) - }) + sortEntriesByLatestChunk(partEntriesByNumber) } for _, entry := range partEntriesByNumber { if found { @@ -357,6 +357,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entryName, dirName := s3a.getEntryNameAndDir(input) + // Precompute ETag once for consistency across all paths + multipartETag := calculateMultipartETag(partEntries, completedPartNumbers) + etagQuote := "\"" + multipartETag + "\"" + // Check if versioning is configured for this bucket BEFORE creating any files versioningState, vErr := s3a.getVersioningState(*input.Bucket) if vErr == nil && versioningState == s3_constants.VersioningEnabled { @@ -396,6 +400,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + versionEntry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) + // Preserve ALL SSE metadata from the first part (if any) // SSE metadata is stored in individual parts, not the upload directory if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { @@ -418,14 +425,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Construct entry with metadata for caching in .versions directory // Reuse versionMtime to keep list vs. HEAD timestamps aligned - etag := "\"" + filer.ETagChunks(finalParts) + "\"" + // multipartETag is precomputed versionEntryForCache := &filer_pb.Entry{ Attributes: &filer_pb.FuseAttributes{ FileSize: uint64(offset), Mtime: versionMtime, }, Extended: map[string][]byte{ - s3_constants.ExtETagKey: []byte(etag), + s3_constants.ExtETagKey: []byte(multipartETag), }, } if amzAccountId != "" { @@ -440,13 +447,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return nil, s3err.ErrInternalError } - // For versioned buckets, don't create a main object file - all content is stored in .versions directory + // For versioned buckets, all content is stored in .versions directory // The latest version information is tracked in the .versions directory metadata - output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), VersionId: aws.String(versionId), } @@ -482,6 +488,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl firstPartEntry := partEntries[completedPartNumbers[0]][0] copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning") } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime } else if mime != "" { @@ -499,7 +507,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), // VersionId field intentionally omitted for suspended versioning } @@ -535,6 +543,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl firstPartEntry := partEntries[completedPartNumbers[0]][0] copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned") } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime } else if mime != "" { @@ -556,7 +566,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), } } @@ -929,3 +939,65 @@ func (s3a *S3ApiServer) applyMultipartEncryptionConfig(entry *filer_pb.Entry, co glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-S3 settings") } } + +func sortEntriesByLatestChunk(entries []*filer_pb.Entry) { + slices.SortFunc(entries, func(a, b *filer_pb.Entry) int { + var aTs, bTs int64 + if len(a.Chunks) > 0 { + aTs = a.Chunks[0].ModifiedTsNs + } + if len(b.Chunks) > 0 { + bTs = b.Chunks[0].ModifiedTsNs + } + return cmp.Compare(bTs, aTs) + }) +} + +func calculateMultipartETag(partEntries map[int][]*filer_pb.Entry, completedPartNumbers []int) string { + var etags []byte + for _, partNumber := range completedPartNumbers { + entries, ok := partEntries[partNumber] + if !ok || len(entries) == 0 { + continue + } + if len(entries) > 1 { + sortEntriesByLatestChunk(entries) + } + entry := entries[0] + etag := getEtagFromEntry(entry) + glog.V(4).Infof("calculateMultipartETag: part %d, entry %s, getEtagFromEntry result: %s", partNumber, entry.Name, etag) + etag = strings.Trim(etag, "\"") + if before, _, found := strings.Cut(etag, "-"); found { + etag = before + } + if etagBytes, err := hex.DecodeString(etag); err == nil { + etags = append(etags, etagBytes...) + } else { + glog.Warningf("calculateMultipartETag: failed to decode etag '%s' for part %d: %v", etag, partNumber, err) + } + } + return fmt.Sprintf("%x-%d", md5.Sum(etags), len(completedPartNumbers)) +} + +func getEtagFromEntry(entry *filer_pb.Entry) string { + if entry.Extended != nil { + if etagBytes, ok := entry.Extended[s3_constants.ExtETagKey]; ok { + etag := string(etagBytes) + if len(etag) > 0 { + if !strings.HasPrefix(etag, "\"") { + return "\"" + etag + "\"" + } + return etag + } + // Empty stored ETag — fall through to filer.ETag calculation + } + } + // Fallback to filer.ETag which handles Attributes.Md5 consistently + etag := filer.ETag(entry) + entryName := entry.Name + if entryName == "" { + entryName = "entry" + } + glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks)) + return "\"" + etag + "\"" +} diff --git a/weed/s3api/s3api_etag_quoting_test.go b/weed/s3api/s3api_etag_quoting_test.go new file mode 100644 index 000000000..89223c9b3 --- /dev/null +++ b/weed/s3api/s3api_etag_quoting_test.go @@ -0,0 +1,167 @@ +package s3api + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// TestReproIfMatchMismatch tests specifically for the scenario where internal ETag +// is unquoted (common in SeaweedFS) but client sends quoted ETag in If-Match. +func TestReproIfMatchMismatch(t *testing.T) { + bucket := "test-bucket" + object := "/test-key" + etagValue := "37b51d194a7513e45b56f6524f2d51f2" + + // Scenario 1: Internal ETag is UNQUOTED (stored in Extended), Client sends QUOTED If-Match + // This mirrors the behavior we enforced in filer_multipart.go + t.Run("UnquotedInternal_QuotedHeader", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte(etagValue), // Unquoted + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + // Client sends quoted ETag + req.Header.Set(s3_constants.IfMatch, "\""+etagValue+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for unquoted internal ETag and quoted header, got %v. Internal ETag: %s", result.ErrorCode, string(entry.Extended[s3_constants.ExtETagKey])) + } + }) + + // Scenario 2: Internal ETag is QUOTED (stored in Extended), Client sends QUOTED If-Match + // This handles legacy or mixed content + t.Run("QuotedInternal_QuotedHeader", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("\"" + etagValue + "\""), // Quoted + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + req.Header.Set(s3_constants.IfMatch, "\""+etagValue+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for quoted internal ETag and quoted header, got %v", result.ErrorCode) + } + }) + + // Scenario 3: Internal ETag is from Md5 (QUOTED by getObjectETag), Client sends QUOTED If-Match + t.Run("Md5Internal_QuotedHeader", func(t *testing.T) { + // Mock Md5 attribute (16 bytes) + md5Bytes := make([]byte, 16) + copy(md5Bytes, []byte("1234567890123456")) // This doesn't match the hex string below, but getObjectETag formats it as hex + + // Expected ETag from Md5 is hex string of bytes + expectedHex := fmt.Sprintf("%x", md5Bytes) + + entry := &filer_pb.Entry{ + Name: "test-key", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + Md5: md5Bytes, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + req.Header.Set(s3_constants.IfMatch, "\""+expectedHex+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for Md5 internal ETag and quoted header, got %v", result.ErrorCode) + } + }) + + // Test getObjectETag specifically ensuring it returns quoted strings + t.Run("getObjectETag_ShouldReturnQuoted", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("unquoted-etag"), + }, + } + + s3a := NewS3ApiServerForTest() + etag := s3a.getObjectETag(entry) + + expected := "\"unquoted-etag\"" + if etag != expected { + t.Errorf("Expected quoted ETag %s, got %s", expected, etag) + } + }) + + // Test getObjectETag fallback when Extended ETag is present but empty + t.Run("getObjectETag_EmptyExtended_ShouldFallback", func(t *testing.T) { + md5Bytes := []byte("1234567890123456") + expectedHex := fmt.Sprintf("\"%x\"", md5Bytes) + + entry := &filer_pb.Entry{ + Name: "test-key-fallback", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte(""), // Present but empty + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + Md5: md5Bytes, + }, + } + + s3a := NewS3ApiServerForTest() + etag := s3a.getObjectETag(entry) + + if etag != expectedHex { + t.Errorf("Expected fallback ETag %s, got %s", expectedHex, etag) + } + }) + + // Test newListEntry ETag behavior + t.Run("newListEntry_ShouldReturnQuoted", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("unquoted-etag"), + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + s3a := NewS3ApiServerForTest() + listEntry := newListEntry(s3a, entry, "", "bucket/dir", "test-key", "bucket/", false, false, false) + + expected := "\"unquoted-etag\"" + if listEntry.ETag != expected { + t.Errorf("Expected quoted ETag %s, got %s", expected, listEntry.ETag) + } + }) +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 97291e7d6..39cde56ac 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -483,7 +483,7 @@ func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *h return false // Not a directory object, continue with normal processing } -func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) { +func newListEntry(s3a *S3ApiServer, entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool) (listEntry ListEntry) { storageClass := "STANDARD" if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { storageClass = string(v) @@ -500,15 +500,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu } // Determine ETag: prioritize ExtETagKey for versioned objects (supports multipart ETags), // then fall back to filer.ETag() which uses Md5 attribute or calculates from chunks - var etag string - if entry.Extended != nil { - if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { - etag = string(etagBytes) - } - } - if etag == "" { - etag = "\"" + filer.ETag(entry) + "\"" - } + etag := s3a.getObjectETag(entry) listEntry = ListEntry{ Key: key, LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), @@ -531,7 +523,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu displayName = "anonymous" } else { // Get the proper display name from IAM system - displayName = iam.GetAccountNameById(ownerID) + displayName = s3a.iam.GetAccountNameById(ownerID) // Fallback to ownerID if no display name found if displayName == "" { displayName = ownerID @@ -1968,9 +1960,9 @@ func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, r *http.Reques // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests) if w.Header().Get("ETag") == "" { - etag := filer.ETag(entry) + etag := s3a.getObjectETag(entry) if etag != "" { - w.Header().Set("ETag", "\""+etag+"\"") + w.Header().Set("ETag", etag) } } diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 3f90404ef..beda56e69 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -202,6 +202,27 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m var lastEntryWasCommonPrefix bool var lastCommonPrefixName string + // Hoist versioning check out of per-entry callback + versioningState, _ := s3a.getVersioningState(bucket) + versioningEnabled := versioningState == "Enabled" + + // Helper function to handle dedup/append logic + appendOrDedup := func(newEntry ListEntry) { + if versioningEnabled { + // For versioned buckets, we need to handle duplicates between the main file and the .versions directory + if len(contents) > 0 && contents[len(contents)-1].Key == newEntry.Key { + glog.V(3).Infof("listFilerEntries deduplicating versioned entry: %s", newEntry.Key) + contents[len(contents)-1] = newEntry + } else { + contents = append(contents, newEntry) + cursor.maxKeys-- + } + } else { + contents = append(contents, newEntry) + cursor.maxKeys-- + } + } + for { empty := true @@ -221,19 +242,13 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m undelimitedPath = strings.TrimPrefix(undelimitedPath, originalPrefix) delimitedPath := strings.SplitN(undelimitedPath, delimiter, 2) - if len(delimitedPath) == 2 { // S3 clients expect the delimited prefix to contain the delimiter and prefix. delimitedPrefix := originalPrefix + delimitedPath[0] + delimiter - for i := range commonPrefixes { - if commonPrefixes[i].Prefix == delimitedPrefix { - delimiterFound = true - break - } - } - - if !delimiterFound { + // Check if this CommonPrefix already exists + if !lastEntryWasCommonPrefix || lastCommonPrefixName != delimitedPath[0] { + // New CommonPrefix found commonPrefixes = append(commonPrefixes, PrefixEntry{ Prefix: delimitedPrefix, }) @@ -249,14 +264,14 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m // If no delimiter found in the directory object name, treat it as a regular key if !delimiterFound { - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false } } else if entry.IsDirectoryKeyObject() { // No delimiter specified, or delimiter doesn't apply - treat as regular key - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html } else if delimiter != "" { // A response can contain CommonPrefixes only if you specify a delimiter. @@ -310,8 +325,8 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m } if !delimiterFound { glog.V(4).Infof("Adding file to contents: %s", entryName) - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false } } diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index a65d2332c..6941a1b9a 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -540,6 +540,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader etag = filer.ETag(entry) glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) + // Store ETag in Extended attribute for future retrieval (e.g. multipart parts) + entry.Extended[s3_constants.ExtETagKey] = []byte(etag) + // Set object owner amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { @@ -1072,10 +1075,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin } versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) - // Store ETag with quotes for S3 compatibility - if !strings.HasPrefix(etag, "\"") { - etag = "\"" + etag + "\"" - } + // Store ETag (unquoted) in Extended attribute versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) // Set object owner for versioned objects @@ -1594,7 +1594,14 @@ func parseConditionalHeaders(r *http.Request) (conditionalHeaders, s3err.ErrorCo func (s3a *S3ApiServer) getObjectETag(entry *filer_pb.Entry) string { // Try to get ETag from Extended attributes first if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { - return string(etagBytes) + etag := string(etagBytes) + if len(etag) > 0 { + if !strings.HasPrefix(etag, "\"") { + return "\"" + etag + "\"" + } + return etag + } + // Empty stored ETag — fall through to Md5/chunk-based calculation } // Check for Md5 in Attributes (matches filer.ETag behavior) // Note: len(nil slice) == 0 in Go, so no need for explicit nil check @@ -1635,7 +1642,6 @@ func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers cond // 1. Check If-Match if headers.ifMatch != "" { if !objectExists { - glog.V(3).Infof("validateConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object) return s3err.ErrPreconditionFailed } // If `ifMatch` is "*", the condition is met if the object exists. @@ -1645,7 +1651,6 @@ func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers cond objectETag := s3a.getObjectETag(entry) // Use production etagMatches method if !s3a.etagMatches(headers.ifMatch, objectETag) { - glog.V(3).Infof("validateConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag) return s3err.ErrPreconditionFailed } } @@ -1787,7 +1792,7 @@ func (s3a *S3ApiServer) validateConditionalHeadersForReads(r *http.Request, head objectETag := s3a.getObjectETag(entry) // Use production etagMatches method if !s3a.etagMatches(headers.ifMatch, objectETag) { - glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag) + glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed for object %s/%s - header If-Match: [%s], object ETag: [%s]", bucket, object, headers.ifMatch, objectETag) return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry} } } diff --git a/weed/s3api/s3api_object_handlers_test.go b/weed/s3api/s3api_object_handlers_test.go index a6592ad4b..90596962d 100644 --- a/weed/s3api/s3api_object_handlers_test.go +++ b/weed/s3api/s3api_object_handlers_test.go @@ -26,11 +26,13 @@ func (m *mockAccountManager) GetAccountIdByEmail(email string) string { } func TestNewListEntryOwnerDisplayName(t *testing.T) { - // Create mock IAM with test accounts - iam := &mockAccountManager{ - accounts: map[string]string{ - "testid": "M. Tester", - "userid123": "John Doe", + // Create S3ApiServer with a properly initialized IAM + s3a := &S3ApiServer{ + iam: &IdentityAccessManagement{ + accounts: map[string]*Account{ + "testid": {Id: "testid", DisplayName: "M. Tester"}, + "userid123": {Id: "userid123", DisplayName: "John Doe"}, + }, }, } @@ -47,7 +49,7 @@ func TestNewListEntryOwnerDisplayName(t *testing.T) { } // Test that display name is correctly looked up from IAM - listEntry := newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry := newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.NotNil(t, listEntry.Owner, "Owner should be set when fetchOwner is true") assert.Equal(t, "testid", listEntry.Owner.ID, "Owner ID should match stored owner") @@ -55,20 +57,20 @@ func TestNewListEntryOwnerDisplayName(t *testing.T) { // Test with owner that doesn't exist in IAM (should fallback to ID) entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte("unknown-user") - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.Equal(t, "unknown-user", listEntry.Owner.ID, "Owner ID should match stored owner") assert.Equal(t, "unknown-user", listEntry.Owner.DisplayName, "Display name should fallback to ID when not found in IAM") // Test with no owner metadata (should use anonymous) entry.Extended = make(map[string][]byte) - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.Equal(t, s3_constants.AccountAnonymousId, listEntry.Owner.ID, "Should use anonymous ID when no owner metadata") assert.Equal(t, "anonymous", listEntry.Owner.DisplayName, "Should use anonymous display name when no owner metadata") // Test with fetchOwner false (should not set owner) - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", false, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", false, false, false) assert.Nil(t, listEntry.Owner, "Owner should not be set when fetchOwner is false") } diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 9147ebbdf..c319ce4fe 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -818,6 +818,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe // Try to get ETag from Extended attributes first if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { version.ETag = string(etagBytes) + if !strings.HasPrefix(version.ETag, "\"") { + version.ETag = "\"" + version.ETag + "\"" + } } else { // Fallback: calculate ETag from chunks version.ETag = s3a.calculateETagFromChunks(entry.Chunks) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index dd1a1e557..2f67e7860 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/md5" + "encoding/base64" "fmt" "hash" "io" @@ -128,16 +129,15 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, r *http.Request } if chunks != nil { fileChunksLock.Lock() - fileChunksSize := len(fileChunks) + len(chunks) for _, chunk := range chunks { fileChunks = append(fileChunks, chunk) - glog.V(4).InfofCtx(ctx, "uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size)) } fileChunksLock.Unlock() } }(chunkOffset, bytesBuffer) // reset variables for the next chunk + glog.V(4).Infof("uploadReaderToChunks read chunk at offset %d, size %d", chunkOffset, dataSize) chunkOffset = chunkOffset + dataSize // if last chunk was not at full chunk size, but already exhausted the reader @@ -162,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, r *http.Request return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt, contentMd5 string) (*operation.UploadResult, error, []byte) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() start := time.Now() @@ -178,6 +178,7 @@ func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limited MimeType: contentType, PairMap: pairMap, Jwt: auth, + Md5: contentMd5, } uploader, err := operation.NewUploader() @@ -217,8 +218,10 @@ func (fs *FilerServer) dataToChunkWithSSE(ctx context.Context, r *http.Request, stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() return uploadErr } + chunkMd5 := md5.Sum(data) + chunkMd5B64 := base64.StdEncoding.EncodeToString(chunkMd5[:]) // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth) + uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth, chunkMd5B64) if uploadErr != nil { glog.V(4).InfofCtx(ctx, "retry later due to upload error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 164be7258..44a2abc34 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -68,6 +68,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Size = uint32(originalSize) ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) + ret.ContentMd5 = contentMd5 SetEtag(w, ret.ETag) w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 5dd419b25..f7f62df11 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -42,6 +42,7 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p pu.PairMap[k] = v[0] } } + // glog.V(4).Infof("ParseUpload: r.URL=%s, Content-MD5 header=%s", r.URL.String(), r.Header.Get("Content-MD5")) e = parseUpload(r, sizeLimit, pu) @@ -84,7 +85,11 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p } // verify Content-MD5 - if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { + expectedChecksum := r.Header.Get("Content-MD5") + if expectedChecksum == "" { + expectedChecksum = pu.ContentMd5 + } + if expectedChecksum != "" { h := md5.New() h.Write(pu.UncompressedData) pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) @@ -143,6 +148,7 @@ func parseUpload(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { pu.Data = pu.bytesBuffer.Bytes() contentType = part.Header.Get("Content-Type") + pu.ContentMd5 = part.Header.Get("Content-MD5") // if the filename is empty string, do a search on the other multi-part items for pu.FileName == "" { @@ -171,6 +177,7 @@ func parseUpload(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { pu.Data = pu.bytesBuffer.Bytes() pu.FileName = util.CleanWindowsPathBase(fName) contentType = part.Header.Get("Content-Type") + pu.ContentMd5 = part2.Header.Get("Content-MD5") part = part2 break }