diff --git a/.github/workflows/helm_ci.yml b/.github/workflows/helm_ci.yml index 268052a5a..f6df4819e 100644 --- a/.github/workflows/helm_ci.yml +++ b/.github/workflows/helm_ci.yml @@ -27,7 +27,7 @@ jobs: - uses: actions/setup-python@v6 with: - python-version: '3.9' + python-version: '3.10' check-latest: true - name: Set up chart-testing diff --git a/test/s3/etag/s3_etag_test.go b/test/s3/etag/s3_etag_test.go index 9b1e948b0..5643afc18 100644 --- a/test/s3/etag/s3_etag_test.go +++ b/test/s3/etag/s3_etag_test.go @@ -25,6 +25,7 @@ import ( "fmt" "io" mathrand "math/rand" + "os" "regexp" "strings" "testing" @@ -85,6 +86,12 @@ func init() { // getS3Client creates an AWS S3 v2 client for testing func getS3Client(t *testing.T) *s3.Client { + endpoint := os.Getenv("S3_ENDPOINT") + if endpoint == "" { + endpoint = defaultConfig.Endpoint + } + t.Logf("Using S3 endpoint: %s", endpoint) + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(defaultConfig.Region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( @@ -95,7 +102,7 @@ func getS3Client(t *testing.T) *s3.Client { config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc( func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ - URL: defaultConfig.Endpoint, + URL: endpoint, SigningRegion: defaultConfig.Region, HostnameImmutable: true, }, nil @@ -403,6 +410,101 @@ func TestMultipartUploadETagFormat(t *testing.T) { "Part count in ETag should match number of parts uploaded") } +func TestMultipartUploadETagVerification(t *testing.T) { + ctx := context.Background() + client := getS3Client(t) + + bucketName := getNewBucketName() + err := createTestBucket(ctx, client, bucketName) + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + // Create test data for multipart upload (11MB = 2 parts: 5MB + 6MB) + // Using parts of different sizes to ensure correct calculation + part1Size := 5 * 1024 * 1024 + part2Size := 6 * 1024 * 1024 + data1 := generateRandomData(part1Size) + data2 := generateRandomData(part2Size) + + objectKey := "verify-etag-multipart.bin" + + // Pre-calculate expected ETag + md1 := md5.Sum(data1) + md2 := md5.Sum(data2) + concatenatedMD5s := append(md1[:], md2[:]...) + finalMD5 := md5.Sum(concatenatedMD5s) + expectedETagValue := fmt.Sprintf("%x-2", finalMD5) + + t.Logf("Expected multipart ETag: %s", expectedETagValue) + + // 1. CreateMultipartUpload + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + uploadId := createResp.UploadId + + // 2. UploadPart 1 + putPart1, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + PartNumber: aws.Int32(1), + Body: bytes.NewReader(data1), + }) + require.NoError(t, err) + assert.Equal(t, "\""+calculateMD5(data1)+"\"", aws.ToString(putPart1.ETag)) + + // 3. UploadPart 2 + putPart2, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + PartNumber: aws.Int32(2), + Body: bytes.NewReader(data2), + }) + require.NoError(t, err) + assert.Equal(t, "\""+calculateMD5(data2)+"\"", aws.ToString(putPart2.ETag)) + + // 4. CompleteMultipartUpload + completeResp, err := client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + {ETag: putPart1.ETag, PartNumber: aws.Int32(1)}, + {ETag: putPart2.ETag, PartNumber: aws.Int32(2)}, + }, + }, + }) + require.NoError(t, err) + + completeETag := cleanETag(aws.ToString(completeResp.ETag)) + t.Logf("CompleteMultipartUpload ETag: %s", completeETag) + assert.Equal(t, expectedETagValue, completeETag, "CompleteMultipartUpload ETag mismatch") + + // 5. HeadObject + headResp, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + headETag := cleanETag(aws.ToString(headResp.ETag)) + assert.Equal(t, expectedETagValue, headETag, "HeadObject ETag mismatch") + + // 6. GetObject + getResp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + getETag := cleanETag(aws.ToString(getResp.ETag)) + assert.Equal(t, expectedETagValue, getETag, "GetObject ETag mismatch") + getResp.Body.Close() +} + // TestPutObjectETagConsistency verifies ETag consistency between PUT and GET func TestPutObjectETagConsistency(t *testing.T) { ctx := context.Background() diff --git a/test/s3tables/table-buckets/s3tables_integration_test.go b/test/s3tables/table-buckets/s3tables_integration_test.go index 26d7606a2..2d3ef8677 100644 --- a/test/s3tables/table-buckets/s3tables_integration_test.go +++ b/test/s3tables/table-buckets/s3tables_integration_test.go @@ -480,53 +480,51 @@ func testTargetOperations(t *testing.T, client *S3TablesClient) { // Helper functions -// findAvailablePort finds an available port by binding to port 0 -func findAvailablePort() (int, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err +// findAvailablePorts finds n available ports by binding to port 0 multiple times +// It keeps the listeners open until all ports are found to ensure uniqueness +func findAvailablePorts(n int) ([]int, error) { + listeners := make([]*net.TCPListener, n) + ports := make([]int, n) + + // Open all listeners to ensure we get unique ports + for i := 0; i < n; i++ { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + // Close valid listeners before returning error + for j := 0; j < i; j++ { + listeners[j].Close() + } + return nil, err + } + listeners[i] = listener.(*net.TCPListener) + ports[i] = listeners[i].Addr().(*net.TCPAddr).Port + } + + // Close all listeners + for _, l := range listeners { + l.Close() } - defer listener.Close() - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil + return ports, nil } // startMiniCluster starts a weed mini instance directly without exec func startMiniCluster(t *testing.T) (*TestCluster, error) { // Find available ports - masterPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find master port: %v", err) - } - masterGrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find master grpc port: %v", err) - } - volumePort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find volume port: %v", err) - } - volumeGrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find volume grpc port: %v", err) - } - filerPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find filer port: %v", err) - } - filerGrpcPort, err := findAvailablePort() + // We need 8 unique ports: Master(2), Volume(2), Filer(2), S3(2) + ports, err := findAvailablePorts(8) if err != nil { - return nil, fmt.Errorf("failed to find filer grpc port: %v", err) - } - s3Port, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find s3 port: %v", err) - } - s3GrpcPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find s3 grpc port: %v", err) + return nil, fmt.Errorf("failed to find available ports: %v", err) } + + masterPort := ports[0] + masterGrpcPort := ports[1] + volumePort := ports[2] + volumeGrpcPort := ports[3] + filerPort := ports[4] + filerGrpcPort := ports[5] + s3Port := ports[6] + s3GrpcPort := ports[7] // Create temporary directory for test data testDir := t.TempDir() diff --git a/weed/admin/view/app/maintenance_queue.templ b/weed/admin/view/app/maintenance_queue.templ index d67ab56c2..00650f80a 100644 --- a/weed/admin/view/app/maintenance_queue.templ +++ b/weed/admin/view/app/maintenance_queue.templ @@ -299,11 +299,6 @@ templ MaintenanceQueue(data *maintenance.MaintenanceQueueData) { ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 63, "") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -809,7 +809,7 @@ func ProgressBar(progress float64, status maintenance.MaintenanceTaskStatus) tem var templ_7745c5c3_Var35 string templ_7745c5c3_Var35, templ_7745c5c3_Err = templruntime.SanitizeStyleAttributeValues(fmt.Sprintf("width: %.1f%%", progress)) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 385, Col: 102} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 380, Col: 102} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var35)) if templ_7745c5c3_Err != nil { @@ -822,7 +822,7 @@ func ProgressBar(progress float64, status maintenance.MaintenanceTaskStatus) tem var templ_7745c5c3_Var36 string templ_7745c5c3_Var36, templ_7745c5c3_Err = templ.JoinStringErrs(fmt.Sprintf("%.1f%%", progress)) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 388, Col: 66} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/maintenance_queue.templ`, Line: 383, Col: 66} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var36)) if templ_7745c5c3_Err != nil { diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 8a82470c8..6e9ec3f21 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -59,7 +59,8 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { for _, c := range chunks { md5Digests = append(md5Digests, util.Base64Md5ToBytes(c.ETag)) } - return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) + finalETag := fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks)) + return finalETag } func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 394bf4362..7d8d2b522 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/md5" + "encoding/base64" "fmt" "hash" "io" @@ -171,6 +172,10 @@ uploadLoop: jwt = assignResult.Auth } + // Calculate MD5 for the chunk + chunkMd5 := md5.Sum(buf.Bytes()) + chunkMd5B64 := base64.StdEncoding.EncodeToString(chunkMd5[:]) + uploadOption := &UploadOption{ UploadUrl: uploadUrl, Cipher: opt.Cipher, @@ -178,6 +183,7 @@ uploadLoop: MimeType: opt.MimeType, PairMap: nil, Jwt: jwt, + Md5: chunkMd5B64, } var uploadResult *UploadResult @@ -225,7 +231,6 @@ uploadLoop: } fileChunksLock.Lock() fileChunks = append(fileChunks, chunk) - glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) fileChunksLock.Unlock() }(chunkOffset, bytesBuffer) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index fe875b08c..e787bf18d 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -2,6 +2,7 @@ package s3api import ( "cmp" + "crypto/md5" "crypto/rand" "encoding/base64" "encoding/hex" @@ -206,7 +207,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), + ETag: aws.String(getEtagFromEntry(entry)), Key: objectKey(input.Key), }, s3err.ErrNone } @@ -301,10 +302,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return nil, s3err.ErrInvalidPart } found := false + if len(partEntriesByNumber) > 1 { - slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int { - return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs) - }) + sortEntriesByLatestChunk(partEntriesByNumber) } for _, entry := range partEntriesByNumber { if found { @@ -357,6 +357,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entryName, dirName := s3a.getEntryNameAndDir(input) + // Precompute ETag once for consistency across all paths + multipartETag := calculateMultipartETag(partEntries, completedPartNumbers) + etagQuote := "\"" + multipartETag + "\"" + // Check if versioning is configured for this bucket BEFORE creating any files versioningState, vErr := s3a.getVersioningState(*input.Bucket) if vErr == nil && versioningState == s3_constants.VersioningEnabled { @@ -396,6 +400,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + versionEntry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) + // Preserve ALL SSE metadata from the first part (if any) // SSE metadata is stored in individual parts, not the upload directory if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { @@ -418,14 +425,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Construct entry with metadata for caching in .versions directory // Reuse versionMtime to keep list vs. HEAD timestamps aligned - etag := "\"" + filer.ETagChunks(finalParts) + "\"" + // multipartETag is precomputed versionEntryForCache := &filer_pb.Entry{ Attributes: &filer_pb.FuseAttributes{ FileSize: uint64(offset), Mtime: versionMtime, }, Extended: map[string][]byte{ - s3_constants.ExtETagKey: []byte(etag), + s3_constants.ExtETagKey: []byte(multipartETag), }, } if amzAccountId != "" { @@ -440,13 +447,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl return nil, s3err.ErrInternalError } - // For versioned buckets, don't create a main object file - all content is stored in .versions directory + // For versioned buckets, all content is stored in .versions directory // The latest version information is tracked in the .versions directory metadata - output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), VersionId: aws.String(versionId), } @@ -482,6 +488,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl firstPartEntry := partEntries[completedPartNumbers[0]][0] copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning") } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime } else if mime != "" { @@ -499,7 +507,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), // VersionId field intentionally omitted for suspended versioning } @@ -535,6 +543,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl firstPartEntry := partEntries[completedPartNumbers[0]][0] copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned") } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime } else if mime != "" { @@ -556,7 +566,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + ETag: aws.String(etagQuote), Key: objectKey(input.Key), } } @@ -929,3 +939,65 @@ func (s3a *S3ApiServer) applyMultipartEncryptionConfig(entry *filer_pb.Entry, co glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-S3 settings") } } + +func sortEntriesByLatestChunk(entries []*filer_pb.Entry) { + slices.SortFunc(entries, func(a, b *filer_pb.Entry) int { + var aTs, bTs int64 + if len(a.Chunks) > 0 { + aTs = a.Chunks[0].ModifiedTsNs + } + if len(b.Chunks) > 0 { + bTs = b.Chunks[0].ModifiedTsNs + } + return cmp.Compare(bTs, aTs) + }) +} + +func calculateMultipartETag(partEntries map[int][]*filer_pb.Entry, completedPartNumbers []int) string { + var etags []byte + for _, partNumber := range completedPartNumbers { + entries, ok := partEntries[partNumber] + if !ok || len(entries) == 0 { + continue + } + if len(entries) > 1 { + sortEntriesByLatestChunk(entries) + } + entry := entries[0] + etag := getEtagFromEntry(entry) + glog.V(4).Infof("calculateMultipartETag: part %d, entry %s, getEtagFromEntry result: %s", partNumber, entry.Name, etag) + etag = strings.Trim(etag, "\"") + if before, _, found := strings.Cut(etag, "-"); found { + etag = before + } + if etagBytes, err := hex.DecodeString(etag); err == nil { + etags = append(etags, etagBytes...) + } else { + glog.Warningf("calculateMultipartETag: failed to decode etag '%s' for part %d: %v", etag, partNumber, err) + } + } + return fmt.Sprintf("%x-%d", md5.Sum(etags), len(completedPartNumbers)) +} + +func getEtagFromEntry(entry *filer_pb.Entry) string { + if entry.Extended != nil { + if etagBytes, ok := entry.Extended[s3_constants.ExtETagKey]; ok { + etag := string(etagBytes) + if len(etag) > 0 { + if !strings.HasPrefix(etag, "\"") { + return "\"" + etag + "\"" + } + return etag + } + // Empty stored ETag — fall through to filer.ETag calculation + } + } + // Fallback to filer.ETag which handles Attributes.Md5 consistently + etag := filer.ETag(entry) + entryName := entry.Name + if entryName == "" { + entryName = "entry" + } + glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks)) + return "\"" + etag + "\"" +} diff --git a/weed/s3api/s3api_etag_quoting_test.go b/weed/s3api/s3api_etag_quoting_test.go new file mode 100644 index 000000000..89223c9b3 --- /dev/null +++ b/weed/s3api/s3api_etag_quoting_test.go @@ -0,0 +1,167 @@ +package s3api + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// TestReproIfMatchMismatch tests specifically for the scenario where internal ETag +// is unquoted (common in SeaweedFS) but client sends quoted ETag in If-Match. +func TestReproIfMatchMismatch(t *testing.T) { + bucket := "test-bucket" + object := "/test-key" + etagValue := "37b51d194a7513e45b56f6524f2d51f2" + + // Scenario 1: Internal ETag is UNQUOTED (stored in Extended), Client sends QUOTED If-Match + // This mirrors the behavior we enforced in filer_multipart.go + t.Run("UnquotedInternal_QuotedHeader", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte(etagValue), // Unquoted + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + // Client sends quoted ETag + req.Header.Set(s3_constants.IfMatch, "\""+etagValue+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for unquoted internal ETag and quoted header, got %v. Internal ETag: %s", result.ErrorCode, string(entry.Extended[s3_constants.ExtETagKey])) + } + }) + + // Scenario 2: Internal ETag is QUOTED (stored in Extended), Client sends QUOTED If-Match + // This handles legacy or mixed content + t.Run("QuotedInternal_QuotedHeader", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("\"" + etagValue + "\""), // Quoted + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + req.Header.Set(s3_constants.IfMatch, "\""+etagValue+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for quoted internal ETag and quoted header, got %v", result.ErrorCode) + } + }) + + // Scenario 3: Internal ETag is from Md5 (QUOTED by getObjectETag), Client sends QUOTED If-Match + t.Run("Md5Internal_QuotedHeader", func(t *testing.T) { + // Mock Md5 attribute (16 bytes) + md5Bytes := make([]byte, 16) + copy(md5Bytes, []byte("1234567890123456")) // This doesn't match the hex string below, but getObjectETag formats it as hex + + // Expected ETag from Md5 is hex string of bytes + expectedHex := fmt.Sprintf("%x", md5Bytes) + + entry := &filer_pb.Entry{ + Name: "test-key", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + Md5: md5Bytes, + }, + } + + getter := &MockEntryGetter{mockEntry: entry} + req := createTestGetRequest(bucket, object) + req.Header.Set(s3_constants.IfMatch, "\""+expectedHex+"\"") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + + if result.ErrorCode != s3err.ErrNone { + t.Errorf("Expected success (ErrNone) for Md5 internal ETag and quoted header, got %v", result.ErrorCode) + } + }) + + // Test getObjectETag specifically ensuring it returns quoted strings + t.Run("getObjectETag_ShouldReturnQuoted", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("unquoted-etag"), + }, + } + + s3a := NewS3ApiServerForTest() + etag := s3a.getObjectETag(entry) + + expected := "\"unquoted-etag\"" + if etag != expected { + t.Errorf("Expected quoted ETag %s, got %s", expected, etag) + } + }) + + // Test getObjectETag fallback when Extended ETag is present but empty + t.Run("getObjectETag_EmptyExtended_ShouldFallback", func(t *testing.T) { + md5Bytes := []byte("1234567890123456") + expectedHex := fmt.Sprintf("\"%x\"", md5Bytes) + + entry := &filer_pb.Entry{ + Name: "test-key-fallback", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte(""), // Present but empty + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + Md5: md5Bytes, + }, + } + + s3a := NewS3ApiServerForTest() + etag := s3a.getObjectETag(entry) + + if etag != expectedHex { + t.Errorf("Expected fallback ETag %s, got %s", expectedHex, etag) + } + }) + + // Test newListEntry ETag behavior + t.Run("newListEntry_ShouldReturnQuoted", func(t *testing.T) { + entry := &filer_pb.Entry{ + Name: "test-key", + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("unquoted-etag"), + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: 1024, + }, + } + + s3a := NewS3ApiServerForTest() + listEntry := newListEntry(s3a, entry, "", "bucket/dir", "test-key", "bucket/", false, false, false) + + expected := "\"unquoted-etag\"" + if listEntry.ETag != expected { + t.Errorf("Expected quoted ETag %s, got %s", expected, listEntry.ETag) + } + }) +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 97291e7d6..39cde56ac 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -483,7 +483,7 @@ func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *h return false // Not a directory object, continue with normal processing } -func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) { +func newListEntry(s3a *S3ApiServer, entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool) (listEntry ListEntry) { storageClass := "STANDARD" if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { storageClass = string(v) @@ -500,15 +500,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu } // Determine ETag: prioritize ExtETagKey for versioned objects (supports multipart ETags), // then fall back to filer.ETag() which uses Md5 attribute or calculates from chunks - var etag string - if entry.Extended != nil { - if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { - etag = string(etagBytes) - } - } - if etag == "" { - etag = "\"" + filer.ETag(entry) + "\"" - } + etag := s3a.getObjectETag(entry) listEntry = ListEntry{ Key: key, LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), @@ -531,7 +523,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu displayName = "anonymous" } else { // Get the proper display name from IAM system - displayName = iam.GetAccountNameById(ownerID) + displayName = s3a.iam.GetAccountNameById(ownerID) // Fallback to ownerID if no display name found if displayName == "" { displayName = ownerID @@ -1968,9 +1960,9 @@ func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, r *http.Reques // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests) if w.Header().Get("ETag") == "" { - etag := filer.ETag(entry) + etag := s3a.getObjectETag(entry) if etag != "" { - w.Header().Set("ETag", "\""+etag+"\"") + w.Header().Set("ETag", etag) } } diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 3f90404ef..beda56e69 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -202,6 +202,27 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m var lastEntryWasCommonPrefix bool var lastCommonPrefixName string + // Hoist versioning check out of per-entry callback + versioningState, _ := s3a.getVersioningState(bucket) + versioningEnabled := versioningState == "Enabled" + + // Helper function to handle dedup/append logic + appendOrDedup := func(newEntry ListEntry) { + if versioningEnabled { + // For versioned buckets, we need to handle duplicates between the main file and the .versions directory + if len(contents) > 0 && contents[len(contents)-1].Key == newEntry.Key { + glog.V(3).Infof("listFilerEntries deduplicating versioned entry: %s", newEntry.Key) + contents[len(contents)-1] = newEntry + } else { + contents = append(contents, newEntry) + cursor.maxKeys-- + } + } else { + contents = append(contents, newEntry) + cursor.maxKeys-- + } + } + for { empty := true @@ -221,19 +242,13 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m undelimitedPath = strings.TrimPrefix(undelimitedPath, originalPrefix) delimitedPath := strings.SplitN(undelimitedPath, delimiter, 2) - if len(delimitedPath) == 2 { // S3 clients expect the delimited prefix to contain the delimiter and prefix. delimitedPrefix := originalPrefix + delimitedPath[0] + delimiter - for i := range commonPrefixes { - if commonPrefixes[i].Prefix == delimitedPrefix { - delimiterFound = true - break - } - } - - if !delimiterFound { + // Check if this CommonPrefix already exists + if !lastEntryWasCommonPrefix || lastCommonPrefixName != delimitedPath[0] { + // New CommonPrefix found commonPrefixes = append(commonPrefixes, PrefixEntry{ Prefix: delimitedPrefix, }) @@ -249,14 +264,14 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m // If no delimiter found in the directory object name, treat it as a regular key if !delimiterFound { - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false } } else if entry.IsDirectoryKeyObject() { // No delimiter specified, or delimiter doesn't apply - treat as regular key - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html } else if delimiter != "" { // A response can contain CommonPrefixes only if you specify a delimiter. @@ -310,8 +325,8 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m } if !delimiterFound { glog.V(4).Infof("Adding file to contents: %s", entryName) - contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false, s3a.iam)) - cursor.maxKeys-- + newEntry := newListEntry(s3a, entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false) + appendOrDedup(newEntry) lastEntryWasCommonPrefix = false } } diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index a65d2332c..6941a1b9a 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -540,6 +540,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader etag = filer.ETag(entry) glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) + // Store ETag in Extended attribute for future retrieval (e.g. multipart parts) + entry.Extended[s3_constants.ExtETagKey] = []byte(etag) + // Set object owner amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { @@ -1072,10 +1075,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin } versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) - // Store ETag with quotes for S3 compatibility - if !strings.HasPrefix(etag, "\"") { - etag = "\"" + etag + "\"" - } + // Store ETag (unquoted) in Extended attribute versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) // Set object owner for versioned objects @@ -1594,7 +1594,14 @@ func parseConditionalHeaders(r *http.Request) (conditionalHeaders, s3err.ErrorCo func (s3a *S3ApiServer) getObjectETag(entry *filer_pb.Entry) string { // Try to get ETag from Extended attributes first if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { - return string(etagBytes) + etag := string(etagBytes) + if len(etag) > 0 { + if !strings.HasPrefix(etag, "\"") { + return "\"" + etag + "\"" + } + return etag + } + // Empty stored ETag — fall through to Md5/chunk-based calculation } // Check for Md5 in Attributes (matches filer.ETag behavior) // Note: len(nil slice) == 0 in Go, so no need for explicit nil check @@ -1635,7 +1642,6 @@ func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers cond // 1. Check If-Match if headers.ifMatch != "" { if !objectExists { - glog.V(3).Infof("validateConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object) return s3err.ErrPreconditionFailed } // If `ifMatch` is "*", the condition is met if the object exists. @@ -1645,7 +1651,6 @@ func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers cond objectETag := s3a.getObjectETag(entry) // Use production etagMatches method if !s3a.etagMatches(headers.ifMatch, objectETag) { - glog.V(3).Infof("validateConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag) return s3err.ErrPreconditionFailed } } @@ -1787,7 +1792,7 @@ func (s3a *S3ApiServer) validateConditionalHeadersForReads(r *http.Request, head objectETag := s3a.getObjectETag(entry) // Use production etagMatches method if !s3a.etagMatches(headers.ifMatch, objectETag) { - glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag) + glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed for object %s/%s - header If-Match: [%s], object ETag: [%s]", bucket, object, headers.ifMatch, objectETag) return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry} } } diff --git a/weed/s3api/s3api_object_handlers_test.go b/weed/s3api/s3api_object_handlers_test.go index a6592ad4b..90596962d 100644 --- a/weed/s3api/s3api_object_handlers_test.go +++ b/weed/s3api/s3api_object_handlers_test.go @@ -26,11 +26,13 @@ func (m *mockAccountManager) GetAccountIdByEmail(email string) string { } func TestNewListEntryOwnerDisplayName(t *testing.T) { - // Create mock IAM with test accounts - iam := &mockAccountManager{ - accounts: map[string]string{ - "testid": "M. Tester", - "userid123": "John Doe", + // Create S3ApiServer with a properly initialized IAM + s3a := &S3ApiServer{ + iam: &IdentityAccessManagement{ + accounts: map[string]*Account{ + "testid": {Id: "testid", DisplayName: "M. Tester"}, + "userid123": {Id: "userid123", DisplayName: "John Doe"}, + }, }, } @@ -47,7 +49,7 @@ func TestNewListEntryOwnerDisplayName(t *testing.T) { } // Test that display name is correctly looked up from IAM - listEntry := newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry := newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.NotNil(t, listEntry.Owner, "Owner should be set when fetchOwner is true") assert.Equal(t, "testid", listEntry.Owner.ID, "Owner ID should match stored owner") @@ -55,20 +57,20 @@ func TestNewListEntryOwnerDisplayName(t *testing.T) { // Test with owner that doesn't exist in IAM (should fallback to ID) entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte("unknown-user") - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.Equal(t, "unknown-user", listEntry.Owner.ID, "Owner ID should match stored owner") assert.Equal(t, "unknown-user", listEntry.Owner.DisplayName, "Display name should fallback to ID when not found in IAM") // Test with no owner metadata (should use anonymous) entry.Extended = make(map[string][]byte) - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", true, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", true, false, false) assert.Equal(t, s3_constants.AccountAnonymousId, listEntry.Owner.ID, "Should use anonymous ID when no owner metadata") assert.Equal(t, "anonymous", listEntry.Owner.DisplayName, "Should use anonymous display name when no owner metadata") // Test with fetchOwner false (should not set owner) - listEntry = newListEntry(entry, "", "dir", "test-object", "/buckets/test/", false, false, false, iam) + listEntry = newListEntry(s3a, entry, "", "dir", "test-object", "/buckets/test/", false, false, false) assert.Nil(t, listEntry.Owner, "Owner should not be set when fetchOwner is false") } diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 9147ebbdf..c319ce4fe 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -818,6 +818,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe // Try to get ETag from Extended attributes first if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { version.ETag = string(etagBytes) + if !strings.HasPrefix(version.ETag, "\"") { + version.ETag = "\"" + version.ETag + "\"" + } } else { // Fallback: calculate ETag from chunks version.ETag = s3a.calculateETagFromChunks(entry.Chunks) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index dd1a1e557..2f67e7860 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/md5" + "encoding/base64" "fmt" "hash" "io" @@ -128,16 +129,15 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, r *http.Request } if chunks != nil { fileChunksLock.Lock() - fileChunksSize := len(fileChunks) + len(chunks) for _, chunk := range chunks { fileChunks = append(fileChunks, chunk) - glog.V(4).InfofCtx(ctx, "uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size)) } fileChunksLock.Unlock() } }(chunkOffset, bytesBuffer) // reset variables for the next chunk + glog.V(4).Infof("uploadReaderToChunks read chunk at offset %d, size %d", chunkOffset, dataSize) chunkOffset = chunkOffset + dataSize // if last chunk was not at full chunk size, but already exhausted the reader @@ -162,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, r *http.Request return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt, contentMd5 string) (*operation.UploadResult, error, []byte) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() start := time.Now() @@ -178,6 +178,7 @@ func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limited MimeType: contentType, PairMap: pairMap, Jwt: auth, + Md5: contentMd5, } uploader, err := operation.NewUploader() @@ -217,8 +218,10 @@ func (fs *FilerServer) dataToChunkWithSSE(ctx context.Context, r *http.Request, stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() return uploadErr } + chunkMd5 := md5.Sum(data) + chunkMd5B64 := base64.StdEncoding.EncodeToString(chunkMd5[:]) // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth) + uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth, chunkMd5B64) if uploadErr != nil { glog.V(4).InfofCtx(ctx, "retry later due to upload error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 164be7258..44a2abc34 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -68,6 +68,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Size = uint32(originalSize) ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) + ret.ContentMd5 = contentMd5 SetEtag(w, ret.ETag) w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 5dd419b25..f7f62df11 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -42,6 +42,7 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p pu.PairMap[k] = v[0] } } + // glog.V(4).Infof("ParseUpload: r.URL=%s, Content-MD5 header=%s", r.URL.String(), r.Header.Get("Content-MD5")) e = parseUpload(r, sizeLimit, pu) @@ -84,7 +85,11 @@ func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (p } // verify Content-MD5 - if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { + expectedChecksum := r.Header.Get("Content-MD5") + if expectedChecksum == "" { + expectedChecksum = pu.ContentMd5 + } + if expectedChecksum != "" { h := md5.New() h.Write(pu.UncompressedData) pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) @@ -143,6 +148,7 @@ func parseUpload(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { pu.Data = pu.bytesBuffer.Bytes() contentType = part.Header.Get("Content-Type") + pu.ContentMd5 = part.Header.Get("Content-MD5") // if the filename is empty string, do a search on the other multi-part items for pu.FileName == "" { @@ -171,6 +177,7 @@ func parseUpload(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { pu.Data = pu.bytesBuffer.Bytes() pu.FileName = util.CleanWindowsPathBase(fName) contentType = part.Header.Get("Content-Type") + pu.ContentMd5 = part2.Header.Get("Content-MD5") part = part2 break }