Browse Source

Merge branch 'master' into auto-create-bucket

pull/7549/head
Chris Lu 1 week ago
committed by GitHub
parent
commit
5d69a6a990
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 31
      weed/s3api/s3api_object_handlers_copy.go
  2. 203
      weed/s3api/s3api_object_handlers_copy_test.go
  3. 167
      weed/shell/command_volume_check_disk.go

31
weed/s3api/s3api_object_handlers_copy.go

@ -230,10 +230,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
}
// Check if destination bucket has versioning configured
dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket)
// Check if destination bucket has versioning enabled
// Only create versions if versioning is explicitly "Enabled", not "Suspended" or unconfigured
dstVersioningState, err := s3a.getVersioningState(dstBucket)
if err != nil {
glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err)
glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
@ -241,7 +242,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
var dstVersionId string
var etag string
if dstVersioningConfigured {
if shouldCreateVersionForCopy(dstVersioningState) {
// For versioned destination, create a new version
dstVersionId = generateVersionId()
glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
@ -294,6 +295,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
w.Header().Set("x-amz-version-id", dstVersionId)
} else {
// For non-versioned destination, use regular copy
// Remove any versioning-related metadata from source that shouldn't carry over
cleanupVersioningMetadata(dstEntry.Extended)
dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dstDir, dstName := dstPath.DirAndName()
@ -2327,6 +2331,25 @@ func (ctx *EncryptionHeaderContext) shouldSkipEncryptedToUnencryptedHeader() boo
return hasSourceEncryption && !hasDestinationEncryption && isAnyEncryptionHeader
}
// cleanupVersioningMetadata removes versioning-related metadata from Extended attributes
// when copying to non-versioned or suspended-versioning buckets.
// This prevents objects in non-versioned buckets from carrying invalid versioning metadata.
// It also removes the source ETag to prevent metadata inconsistency, as a new ETag will be
// calculated for the destination object.
func cleanupVersioningMetadata(metadata map[string][]byte) {
delete(metadata, s3_constants.ExtVersionIdKey)
delete(metadata, s3_constants.ExtDeleteMarkerKey)
delete(metadata, s3_constants.ExtIsLatestKey)
delete(metadata, s3_constants.ExtETagKey)
}
// shouldCreateVersionForCopy determines whether a version should be created during a copy operation
// based on the destination bucket's versioning state.
// Returns true only if versioning is explicitly "Enabled", not "Suspended" or unconfigured.
func shouldCreateVersionForCopy(versioningState string) bool {
return versioningState == s3_constants.VersioningEnabled
}
// shouldSkipEncryptionHeader determines if a header should be skipped when copying extended attributes
// based on the source and destination encryption types. This consolidates the repetitive logic for
// filtering encryption-related headers during copy operations.

203
weed/s3api/s3api_object_handlers_copy_test.go

@ -436,3 +436,206 @@ func transferHeaderToH(data map[string][]string) H {
}
return m
}
// TestShouldCreateVersionForCopy tests the production function that determines
// whether a version should be created during a copy operation.
// This addresses issue #7505 where copies were incorrectly creating versions for non-versioned buckets.
func TestShouldCreateVersionForCopy(t *testing.T) {
testCases := []struct {
name string
versioningState string
expectedResult bool
description string
}{
{
name: "VersioningEnabled",
versioningState: s3_constants.VersioningEnabled,
expectedResult: true,
description: "Should create versions in .versions/ directory when versioning is Enabled",
},
{
name: "VersioningSuspended",
versioningState: s3_constants.VersioningSuspended,
expectedResult: false,
description: "Should NOT create versions when versioning is Suspended",
},
{
name: "VersioningNotConfigured",
versioningState: "",
expectedResult: false,
description: "Should NOT create versions when versioning is not configured",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Call the actual production function
result := shouldCreateVersionForCopy(tc.versioningState)
if result != tc.expectedResult {
t.Errorf("Test case %s failed: %s\nExpected shouldCreateVersionForCopy(%q)=%v, got %v",
tc.name, tc.description, tc.versioningState, tc.expectedResult, result)
}
})
}
}
// TestCleanupVersioningMetadata tests the production function that removes versioning metadata.
// This ensures objects copied to non-versioned buckets don't carry invalid versioning metadata
// or stale ETag values from the source.
func TestCleanupVersioningMetadata(t *testing.T) {
testCases := []struct {
name string
sourceMetadata map[string][]byte
expectedKeys []string // Keys that should be present after cleanup
removedKeys []string // Keys that should be removed
}{
{
name: "RemovesAllVersioningMetadata",
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("version-123"),
s3_constants.ExtDeleteMarkerKey: []byte("false"),
s3_constants.ExtIsLatestKey: []byte("true"),
s3_constants.ExtETagKey: []byte("\"abc123\""),
"X-Amz-Meta-Custom": []byte("value"),
},
expectedKeys: []string{"X-Amz-Meta-Custom"},
removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtDeleteMarkerKey, s3_constants.ExtIsLatestKey, s3_constants.ExtETagKey},
},
{
name: "HandlesEmptyMetadata",
sourceMetadata: map[string][]byte{},
expectedKeys: []string{},
removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtDeleteMarkerKey, s3_constants.ExtIsLatestKey, s3_constants.ExtETagKey},
},
{
name: "PreservesNonVersioningMetadata",
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("version-456"),
s3_constants.ExtETagKey: []byte("\"def456\""),
"X-Amz-Meta-Custom": []byte("value1"),
"X-Amz-Meta-Another": []byte("value2"),
s3_constants.ExtIsLatestKey: []byte("true"),
},
expectedKeys: []string{"X-Amz-Meta-Custom", "X-Amz-Meta-Another"},
removedKeys: []string{s3_constants.ExtVersionIdKey, s3_constants.ExtETagKey, s3_constants.ExtIsLatestKey},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a copy of the source metadata
dstMetadata := make(map[string][]byte)
for k, v := range tc.sourceMetadata {
dstMetadata[k] = v
}
// Call the actual production function
cleanupVersioningMetadata(dstMetadata)
// Verify expected keys are present
for _, key := range tc.expectedKeys {
if _, exists := dstMetadata[key]; !exists {
t.Errorf("Expected key %s to be present in destination metadata", key)
}
}
// Verify removed keys are absent
for _, key := range tc.removedKeys {
if _, exists := dstMetadata[key]; exists {
t.Errorf("Expected key %s to be removed from destination metadata, but it's still present", key)
}
}
// Verify the count matches to ensure no extra keys are present
if len(dstMetadata) != len(tc.expectedKeys) {
t.Errorf("Expected %d metadata keys, but got %d. Extra keys might be present.", len(tc.expectedKeys), len(dstMetadata))
}
})
}
}
// TestCopyVersioningIntegration validates the interaction between
// shouldCreateVersionForCopy and cleanupVersioningMetadata functions.
// This integration test ensures the complete fix for issue #7505.
func TestCopyVersioningIntegration(t *testing.T) {
testCases := []struct {
name string
versioningState string
sourceMetadata map[string][]byte
expectVersionPath bool
expectMetadataKeys []string
}{
{
name: "EnabledPreservesMetadata",
versioningState: s3_constants.VersioningEnabled,
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("v123"),
"X-Amz-Meta-Custom": []byte("value"),
},
expectVersionPath: true,
expectMetadataKeys: []string{
s3_constants.ExtVersionIdKey,
"X-Amz-Meta-Custom",
},
},
{
name: "SuspendedCleansMetadata",
versioningState: s3_constants.VersioningSuspended,
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("v123"),
"X-Amz-Meta-Custom": []byte("value"),
},
expectVersionPath: false,
expectMetadataKeys: []string{
"X-Amz-Meta-Custom",
},
},
{
name: "NotConfiguredCleansMetadata",
versioningState: "",
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("v123"),
s3_constants.ExtDeleteMarkerKey: []byte("false"),
"X-Amz-Meta-Custom": []byte("value"),
},
expectVersionPath: false,
expectMetadataKeys: []string{
"X-Amz-Meta-Custom",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Test version creation decision using production function
shouldCreateVersion := shouldCreateVersionForCopy(tc.versioningState)
if shouldCreateVersion != tc.expectVersionPath {
t.Errorf("shouldCreateVersionForCopy(%q) = %v, expected %v",
tc.versioningState, shouldCreateVersion, tc.expectVersionPath)
}
// Test metadata cleanup using production function
metadata := make(map[string][]byte)
for k, v := range tc.sourceMetadata {
metadata[k] = v
}
if !shouldCreateVersion {
cleanupVersioningMetadata(metadata)
}
// Verify only expected keys remain
for _, expectedKey := range tc.expectMetadataKeys {
if _, exists := metadata[expectedKey]; !exists {
t.Errorf("Expected key %q to be present in metadata", expectedKey)
}
}
// Verify the count matches (no extra keys)
if len(metadata) != len(tc.expectMetadataKeys) {
t.Errorf("Expected %d metadata keys, got %d", len(tc.expectMetadataKeys), len(metadata))
}
})
}
}

167
weed/shell/command_volume_check_disk.go

@ -37,6 +37,7 @@ type volumeCheckDisk struct {
verbose bool
applyChanges bool
syncDeletions bool
fixReadOnly bool
nonRepairThreshold float64
}
@ -48,19 +49,27 @@ func (c *commandVolumeCheckDisk) Help() string {
return `check all replicated volumes to find and fix inconsistencies. It is optional and resource intensive.
How it works:
find all volumes that are replicated
for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count.
for the pair volume A and B
bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A
uni-directional sync (read-only repair): only sync from source to target without modifying source
for each writable volume ID, if there are more than 2 replicas, find one pair with the largest 2 in file count
for the pair volume A and B
append entries in A and not in B to B
append entries in B and not in A to A
optionally, for each non-writable volume replica A
if volume is not full
prune late volume entries not matching its index file
select a writable volume replica B
append missing entries from B into A
mark the volume as writable (healthy)
Options:
-slow: check all replicas even if file counts are the same
-v: verbose mode with detailed progress output
-volumeId: check only a specific volume ID (0 for all)
-apply: actually apply the fixes (default is simulation mode)
-force-readonly: also check and repair read-only volumes using uni-directional sync
-fixReadOnly: also check and repair read-only volumes using uni-directional sync
-syncDeleted: sync deletion records during repair
-nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3)
@ -80,7 +89,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChanges := fsckCommand.Bool("apply", false, "apply the fix")
// TODO: remove this alias
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
forceReadonly := fsckCommand.Bool("force-readonly", false, "apply the fix even on readonly volumes")
fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
@ -103,6 +112,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
verbose: *verbose,
applyChanges: *applyChanges,
syncDeletions: *syncDeletions,
fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold,
}
@ -123,24 +133,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
}
vcd.write("Pass #1 (writeable volumes)\n")
if err := vcd.checkWriteableVolumes(volumeReplicas); err != nil {
if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err
}
if *forceReadonly {
vcd.write("Pass #2 (read-only volumes)\n")
if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
return err
}
if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
return err
}
return nil
}
// checkWriteableVolumes fixes volume replicas which are not read-only.
func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
// pick 1 pairs of volume replica
// checkWritableVolumes fixes volume replicas which are not read-only.
func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
vcd.write("Pass #1 (writable volumes)\n")
for _, replicas := range volumeReplicas {
// filter readonly replica
var writableReplicas []*VolumeReplica
@ -157,16 +163,14 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
})
for len(writableReplicas) >= 2 {
a, b := writableReplicas[0], writableReplicas[1]
if !vcd.slowMode {
shouldSkip, err := vcd.shouldSkipVolume(a, b)
if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
continue
}
shouldSkip, err := vcd.shouldSkipVolume(a, b)
if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
continue
}
if err := vcd.syncTwoReplicas(a, b, true); err != nil {
vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
@ -183,9 +187,107 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
return nil
}
// checkReadOnlyVolumes fixes read-only volume replicas.
// makeVolumeWritable flags a volume as writable, by volume ID.
func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) error {
if !vcd.applyChanges {
return nil
}
err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, vsErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: vid,
})
return vsErr
})
if err != nil {
return err
}
vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id)
return nil
}
// makeVolumeReadOnly flags a volume as read-only, by volume ID.
func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) error {
if !vcd.applyChanges {
return nil
}
err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, vsErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: vid,
})
return vsErr
})
if err != nil {
return err
}
vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id)
return nil
}
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)")
if !vcd.fixReadOnly {
return nil
}
vcd.write("Pass #2 (read-only volumes)\n")
for vid, replicas := range volumeReplicas {
var source *VolumeReplica = nil
roReplicas := []*VolumeReplica{}
for _, r := range replicas {
if r.info.ReadOnly {
roReplicas = append(roReplicas, r)
} else {
// we assume all writable replicas are identical by this point, after the checkWritableVolumes() pass.
source = r
}
}
if len(roReplicas) == 0 {
vcd.write("no read-only replicas for volume %d\n", vid)
continue
}
if source == nil {
vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid)
continue
}
// attempt to fix read-only replicas from the know good source
for _, r := range roReplicas {
// TODO: skip full readonly volumes.
skip, err := vcd.shouldSkipVolume(r, source)
if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
continue
}
if skip {
continue
}
// make volume writable...
if err := vcd.makeVolumeWritable(vid, r); err != nil {
return err
}
// ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...or revert it back to read-only, if something went wrong.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
}
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
return err
}
}
}
return nil
}
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
@ -260,6 +362,11 @@ func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool,
// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated.
// The Do method logs these errors and continues processing to ensure other volumes are checked.
func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) {
if vcd.slowMode {
// never skip volumes on slow mode
return false, nil
}
pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix()
doSyncDeletedCount := false
if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount {

Loading…
Cancel
Save