diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 91da98a0e..4dd31c8ce 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -230,10 +230,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } } - // Check if destination bucket has versioning configured - dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket) + // Check if destination bucket has versioning enabled + // Only create versions if versioning is explicitly "Enabled", not "Suspended" or unconfigured + dstVersioningState, err := s3a.getVersioningState(dstBucket) if err != nil { - glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err) + glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -241,7 +242,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request var dstVersionId string var etag string - if dstVersioningConfigured { + if shouldCreateVersionForCopy(dstVersioningState) { // For versioned destination, create a new version dstVersionId = generateVersionId() glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) @@ -294,6 +295,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request w.Header().Set("x-amz-version-id", dstVersionId) } else { // For non-versioned destination, use regular copy + // Remove any versioning-related metadata from source that shouldn't carry over + cleanupVersioningMetadata(dstEntry.Extended) + dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) dstDir, dstName := dstPath.DirAndName() @@ -2327,6 +2331,25 @@ func (ctx *EncryptionHeaderContext) shouldSkipEncryptedToUnencryptedHeader() boo return hasSourceEncryption && !hasDestinationEncryption && isAnyEncryptionHeader } +// cleanupVersioningMetadata removes versioning-related metadata from Extended attributes +// when copying to non-versioned or suspended-versioning buckets. +// This prevents objects in non-versioned buckets from carrying invalid versioning metadata. +// It also removes the source ETag to prevent metadata inconsistency, as a new ETag will be +// calculated for the destination object. +func cleanupVersioningMetadata(metadata map[string][]byte) { + delete(metadata, s3_constants.ExtVersionIdKey) + delete(metadata, s3_constants.ExtDeleteMarkerKey) + delete(metadata, s3_constants.ExtIsLatestKey) + delete(metadata, s3_constants.ExtETagKey) +} + +// shouldCreateVersionForCopy determines whether a version should be created during a copy operation +// based on the destination bucket's versioning state. +// Returns true only if versioning is explicitly "Enabled", not "Suspended" or unconfigured. +func shouldCreateVersionForCopy(versioningState string) bool { + return versioningState == s3_constants.VersioningEnabled +} + // shouldSkipEncryptionHeader determines if a header should be skipped when copying extended attributes // based on the source and destination encryption types. This consolidates the repetitive logic for // filtering encryption-related headers during copy operations. diff --git a/weed/s3api/s3api_object_handlers_copy_test.go b/weed/s3api/s3api_object_handlers_copy_test.go index a537b6f3c..018c9f270 100644 --- a/weed/s3api/s3api_object_handlers_copy_test.go +++ b/weed/s3api/s3api_object_handlers_copy_test.go @@ -436,3 +436,206 @@ func transferHeaderToH(data map[string][]string) H { } return m } + +// TestShouldCreateVersionForCopy tests the production function that determines +// whether a version should be created during a copy operation. +// This addresses issue #7505 where copies were incorrectly creating versions for non-versioned buckets. +func TestShouldCreateVersionForCopy(t *testing.T) { + testCases := []struct { + name string + versioningState string + expectedResult bool + description string + }{ + { + name: "VersioningEnabled", + versioningState: s3_constants.VersioningEnabled, + expectedResult: true, + description: "Should create versions in .versions/ directory when versioning is Enabled", + }, + { + name: "VersioningSuspended", + versioningState: s3_constants.VersioningSuspended, + expectedResult: false, + description: "Should NOT create versions when versioning is Suspended", + }, + { + name: "VersioningNotConfigured", + versioningState: "", + expectedResult: false, + description: "Should NOT create versions when versioning is not configured", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Call the actual production function + result := shouldCreateVersionForCopy(tc.versioningState) + + if result != tc.expectedResult { + t.Errorf("Test case %s failed: %s\nExpected shouldCreateVersionForCopy(%q)=%v, got %v", + tc.name, tc.description, tc.versioningState, tc.expectedResult, result) + } + }) + } +} + +// TestCleanupVersioningMetadata tests the production function that removes versioning metadata. +// This ensures objects copied to non-versioned buckets don't carry invalid versioning metadata +// or stale ETag values from the source. +func TestCleanupVersioningMetadata(t *testing.T) { + testCases := []struct { + name string + sourceMetadata map[string][]byte + expectedKeys []string // Keys that should be present after cleanup + removedKeys []string // Keys that should be removed + }{ + { + name: "RemovesAllVersioningMetadata", + sourceMetadata: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("version-123"), + s3_constants.ExtDeleteMarkerKey: []byte("false"), + s3_constants.ExtIsLatestKey: []byte("true"), + s3_constants.ExtETagKey: []byte("\"abc123\""), + "X-Amz-Meta-Custom": []byte("value"), + }, + expectedKeys: []string{"X-Amz-Meta-Custom"}, + removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtDeleteMarkerKey, s3_constants.ExtIsLatestKey, s3_constants.ExtETagKey}, + }, + { + name: "HandlesEmptyMetadata", + sourceMetadata: map[string][]byte{}, + expectedKeys: []string{}, + removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtDeleteMarkerKey, s3_constants.ExtIsLatestKey, s3_constants.ExtETagKey}, + }, + { + name: "PreservesNonVersioningMetadata", + sourceMetadata: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("version-456"), + s3_constants.ExtETagKey: []byte("\"def456\""), + "X-Amz-Meta-Custom": []byte("value1"), + "X-Amz-Meta-Another": []byte("value2"), + s3_constants.ExtIsLatestKey: []byte("true"), + }, + expectedKeys: []string{"X-Amz-Meta-Custom", "X-Amz-Meta-Another"}, + removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtETagKey, s3_constants.ExtIsLatestKey}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a copy of the source metadata + dstMetadata := make(map[string][]byte) + for k, v := range tc.sourceMetadata { + dstMetadata[k] = v + } + + // Call the actual production function + cleanupVersioningMetadata(dstMetadata) + + // Verify expected keys are present + for _, key := range tc.expectedKeys { + if _, exists := dstMetadata[key]; !exists { + t.Errorf("Expected key %s to be present in destination metadata", key) + } + } + + // Verify removed keys are absent + for _, key := range tc.removedKeys { + if _, exists := dstMetadata[key]; exists { + t.Errorf("Expected key %s to be removed from destination metadata, but it's still present", key) + } + } + + // Verify the count matches to ensure no extra keys are present + if len(dstMetadata) != len(tc.expectedKeys) { + t.Errorf("Expected %d metadata keys, but got %d. Extra keys might be present.", len(tc.expectedKeys), len(dstMetadata)) + } + }) + } +} + +// TestCopyVersioningIntegration validates the interaction between +// shouldCreateVersionForCopy and cleanupVersioningMetadata functions. +// This integration test ensures the complete fix for issue #7505. +func TestCopyVersioningIntegration(t *testing.T) { + testCases := []struct { + name string + versioningState string + sourceMetadata map[string][]byte + expectVersionPath bool + expectMetadataKeys []string + }{ + { + name: "EnabledPreservesMetadata", + versioningState: s3_constants.VersioningEnabled, + sourceMetadata: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("v123"), + "X-Amz-Meta-Custom": []byte("value"), + }, + expectVersionPath: true, + expectMetadataKeys: []string{ + s3_constants.ExtVersionIdKey, + "X-Amz-Meta-Custom", + }, + }, + { + name: "SuspendedCleansMetadata", + versioningState: s3_constants.VersioningSuspended, + sourceMetadata: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("v123"), + "X-Amz-Meta-Custom": []byte("value"), + }, + expectVersionPath: false, + expectMetadataKeys: []string{ + "X-Amz-Meta-Custom", + }, + }, + { + name: "NotConfiguredCleansMetadata", + versioningState: "", + sourceMetadata: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("v123"), + s3_constants.ExtDeleteMarkerKey: []byte("false"), + "X-Amz-Meta-Custom": []byte("value"), + }, + expectVersionPath: false, + expectMetadataKeys: []string{ + "X-Amz-Meta-Custom", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Test version creation decision using production function + shouldCreateVersion := shouldCreateVersionForCopy(tc.versioningState) + if shouldCreateVersion != tc.expectVersionPath { + t.Errorf("shouldCreateVersionForCopy(%q) = %v, expected %v", + tc.versioningState, shouldCreateVersion, tc.expectVersionPath) + } + + // Test metadata cleanup using production function + metadata := make(map[string][]byte) + for k, v := range tc.sourceMetadata { + metadata[k] = v + } + + if !shouldCreateVersion { + cleanupVersioningMetadata(metadata) + } + + // Verify only expected keys remain + for _, expectedKey := range tc.expectMetadataKeys { + if _, exists := metadata[expectedKey]; !exists { + t.Errorf("Expected key %q to be present in metadata", expectedKey) + } + } + + // Verify the count matches (no extra keys) + if len(metadata) != len(tc.expectMetadataKeys) { + t.Errorf("Expected %d metadata keys, got %d", len(tc.expectMetadataKeys), len(metadata)) + } + }) + } +} diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 99d0fcf9a..054c0cb67 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -37,6 +37,7 @@ type volumeCheckDisk struct { verbose bool applyChanges bool syncDeletions bool + fixReadOnly bool nonRepairThreshold float64 } @@ -48,19 +49,27 @@ func (c *commandVolumeCheckDisk) Help() string { return `check all replicated volumes to find and fix inconsistencies. It is optional and resource intensive. How it works: - + find all volumes that are replicated - for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count. - for the pair volume A and B - bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A - uni-directional sync (read-only repair): only sync from source to target without modifying source + + for each writable volume ID, if there are more than 2 replicas, find one pair with the largest 2 in file count + for the pair volume A and B + append entries in A and not in B to B + append entries in B and not in A to A + + optionally, for each non-writable volume replica A + if volume is not full + prune late volume entries not matching its index file + select a writable volume replica B + append missing entries from B into A + mark the volume as writable (healthy) Options: -slow: check all replicas even if file counts are the same -v: verbose mode with detailed progress output -volumeId: check only a specific volume ID (0 for all) -apply: actually apply the fixes (default is simulation mode) - -force-readonly: also check and repair read-only volumes using uni-directional sync + -fixReadOnly: also check and repair read-only volumes using uni-directional sync -syncDeleted: sync deletion records during repair -nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3) @@ -80,7 +89,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write applyChanges := fsckCommand.Bool("apply", false, "apply the fix") // TODO: remove this alias applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") - forceReadonly := fsckCommand.Bool("force-readonly", false, "apply the fix even on readonly volumes") + fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { @@ -103,6 +112,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write verbose: *verbose, applyChanges: *applyChanges, syncDeletions: *syncDeletions, + fixReadOnly: *fixReadOnly, nonRepairThreshold: *nonRepairThreshold, } @@ -123,24 +133,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write } } - vcd.write("Pass #1 (writeable volumes)\n") - if err := vcd.checkWriteableVolumes(volumeReplicas); err != nil { + if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { return err } - if *forceReadonly { - vcd.write("Pass #2 (read-only volumes)\n") - if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { - return err - } - + if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { + return err } return nil } -// checkWriteableVolumes fixes volume replicas which are not read-only. -func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - // pick 1 pairs of volume replica +// checkWritableVolumes fixes volume replicas which are not read-only. +func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { + vcd.write("Pass #1 (writable volumes)\n") + for _, replicas := range volumeReplicas { // filter readonly replica var writableReplicas []*VolumeReplica @@ -157,16 +163,14 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V }) for len(writableReplicas) >= 2 { a, b := writableReplicas[0], writableReplicas[1] - if !vcd.slowMode { - shouldSkip, err := vcd.shouldSkipVolume(a, b) - if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) - // Continue with sync despite error to be safe - } else if shouldSkip { - // always choose the larger volume to be the source - writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) - continue - } + shouldSkip, err := vcd.shouldSkipVolume(a, b) + if err != nil { + vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + // Continue with sync despite error to be safe + } else if shouldSkip { + // always choose the larger volume to be the source + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + continue } if err := vcd.syncTwoReplicas(a, b, true); err != nil { vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) @@ -183,9 +187,107 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V return nil } -// checkReadOnlyVolumes fixes read-only volume replicas. +// makeVolumeWritable flags a volume as writable, by volume ID. +func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) error { + if !vcd.applyChanges { + return nil + } + + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, vsErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ + VolumeId: vid, + }) + return vsErr + }) + if err != nil { + return err + } + + vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id) + return nil +} + +// makeVolumeReadOnly flags a volume as read-only, by volume ID. +func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) error { + if !vcd.applyChanges { + return nil + } + + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, vsErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: vid, + }) + return vsErr + }) + if err != nil { + return err + } + + vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id) + return nil +} + func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)") + if !vcd.fixReadOnly { + return nil + } + vcd.write("Pass #2 (read-only volumes)\n") + + for vid, replicas := range volumeReplicas { + var source *VolumeReplica = nil + roReplicas := []*VolumeReplica{} + + for _, r := range replicas { + if r.info.ReadOnly { + roReplicas = append(roReplicas, r) + } else { + // we assume all writable replicas are identical by this point, after the checkWritableVolumes() pass. + source = r + } + } + if len(roReplicas) == 0 { + vcd.write("no read-only replicas for volume %d\n", vid) + continue + } + if source == nil { + vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid) + continue + } + + // attempt to fix read-only replicas from the know good source + for _, r := range roReplicas { + // TODO: skip full readonly volumes. + skip, err := vcd.shouldSkipVolume(r, source) + if err != nil { + vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err) + continue + } + if skip { + continue + } + + // make volume writable... + if err := vcd.makeVolumeWritable(vid, r); err != nil { + return err + } + + // ...fix it... + // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. + if err := vcd.syncTwoReplicas(source, r, false); err != nil { + vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) + + // ...or revert it back to read-only, if something went wrong. + if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { + return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + } + vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) + + return err + } + } + } + + return nil } func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { @@ -260,6 +362,11 @@ func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool, // Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated. // The Do method logs these errors and continues processing to ensure other volumes are checked. func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) { + if vcd.slowMode { + // never skip volumes on slow mode + return false, nil + } + pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix() doSyncDeletedCount := false if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount {