diff --git a/test/s3/distributed_lock/distributed_lock_cluster_test.go b/test/s3/distributed_lock/distributed_lock_cluster_test.go index d7054b8ae..6cf1c6a4d 100644 --- a/test/s3/distributed_lock/distributed_lock_cluster_test.go +++ b/test/s3/distributed_lock/distributed_lock_cluster_test.go @@ -21,7 +21,9 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -139,6 +141,7 @@ func startDistributedLockCluster(t *testing.T) *distributedLockCluster { require.NoError(t, cluster.waitForTCP(cluster.filerGRPCAddress(i), 30*time.Second), "wait for filer %d grpc\n%s", i, cluster.tailLog(fmt.Sprintf("filer%d.log", i))) } require.NoError(t, cluster.waitForFilerCount(2, 30*time.Second), "wait for filer group registration") + require.NoError(t, cluster.waitForLockRingConverged(30*time.Second), "wait for lock ring convergence") for i := 0; i < 2; i++ { require.NoError(t, cluster.startS3(i), "start s3 %d", i) @@ -411,6 +414,130 @@ func (c *distributedLockCluster) waitForFilerCount(expected int, timeout time.Du return fmt.Errorf("timed out waiting for %d filers in group %q", expected, c.filerGroup) } +// waitForLockRingConverged verifies that both filers have a consistent view of the +// lock ring by acquiring the same lock through each filer and checking mutual exclusion. +// This guards against the window between master seeing both filers and the filers +// actually receiving the LockRingUpdate broadcast (delayed by the stabilization timer). +// +// A single arbitrary key could false-pass: if the key's real primary is filer0 and +// filer0 has a stale ring (only sees itself), it still grants correctly because it IS +// the primary. To catch the stale ring we must also test a key whose real primary is +// filer1. So we generate one test key per primary filer using the same consistent-hash +// ring as production, and require mutual exclusion for all of them. +func (c *distributedLockCluster) waitForLockRingConverged(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + owners := make([]pb.ServerAddress, 0, len(c.filerPorts)) + for i := range c.filerPorts { + owners = append(owners, c.filerServerAddress(i)) + } + ring := lock_manager.NewHashRing(lock_manager.DefaultVnodeCount) + ring.SetServers(owners) + + attempt := 0 + for time.Now().Before(deadline) { + // Generate one unique test key per primary filer + testKeys := c.convergenceKeysPerPrimary(ring, owners, attempt) + attempt++ + + allConverged := true + for _, key := range testKeys { + converged, _ := c.checkLockMutualExclusion(key) + if !converged { + allConverged = false + break + } + } + if allConverged { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("lock ring did not converge: both filers independently grant the same lock") +} + +// convergenceKeysPerPrimary returns one lock key per distinct primary filer, +// using the same consistent-hash ring as production routing. +func (c *distributedLockCluster) convergenceKeysPerPrimary(ring *lock_manager.HashRing, owners []pb.ServerAddress, attempt int) []string { + keysByOwner := make(map[pb.ServerAddress]string, len(owners)) + for i := 0; len(keysByOwner) < len(owners) && i < 1024; i++ { + candidate := fmt.Sprintf("convergence-%d-%d", attempt, i) + primary := ring.GetPrimary(candidate) + if _, exists := keysByOwner[primary]; !exists { + keysByOwner[primary] = candidate + } + } + keys := make([]string, 0, len(keysByOwner)) + for _, k := range keysByOwner { + keys = append(keys, k) + } + return keys +} + +// checkLockMutualExclusion acquires a lock via filer0, then tries the same lock via filer1. +// Returns true if mutual exclusion holds (second attempt is denied). +func (c *distributedLockCluster) checkLockMutualExclusion(testKey string) (bool, error) { + conn0, err := grpc.NewClient(c.filerGRPCAddress(0), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer conn0.Close() + + conn1, err := grpc.NewClient(c.filerGRPCAddress(1), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer conn1.Close() + + client0 := filer_pb.NewSeaweedFilerClient(conn0) + client1 := filer_pb.NewSeaweedFilerClient(conn1) + + // Acquire lock via filer0 + ctx0, cancel0 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel0() + resp0, err := client0.DistributedLock(ctx0, &filer_pb.LockRequest{ + Name: testKey, + SecondsToLock: 5, + Owner: "convergence-filer0", + }) + if err != nil || resp0.RenewToken == "" { + return false, fmt.Errorf("filer0 lock failed: err=%v resp=%v", err, resp0) + } + defer func() { + // Always release the lock we acquired + client0.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + Name: testKey, + RenewToken: resp0.RenewToken, + }) + }() + + // Try the same lock via filer1 - should be denied + ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel1() + resp1, err := client1.DistributedLock(ctx1, &filer_pb.LockRequest{ + Name: testKey, + SecondsToLock: 5, + Owner: "convergence-filer1", + }) + if err != nil { + return false, err + } + if resp1.RenewToken != "" { + // Both filers granted the lock - ring not converged. Release the second lock. + client1.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + Name: testKey, + RenewToken: resp1.RenewToken, + }) + return false, nil + } + // Verify the denial is specifically because the lock is already held, + // not due to a transient error that might give a false positive. + if !strings.Contains(resp1.Error, "lock already owned") { + return false, nil + } + return true, nil +} + func (c *distributedLockCluster) waitForHTTP(url string, timeout time.Duration) error { client := &net.Dialer{Timeout: time.Second} httpClient := &httpClientWithDialer{dialer: client} diff --git a/test/s3/distributed_lock/distributed_lock_test.go b/test/s3/distributed_lock/distributed_lock_test.go index 63adec185..97ebda1b1 100644 --- a/test/s3/distributed_lock/distributed_lock_test.go +++ b/test/s3/distributed_lock/distributed_lock_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "sort" "strings" "sync" "testing" @@ -15,9 +14,9 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/smithy-go" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -149,14 +148,15 @@ func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb for i := range c.filerPorts { owners = append(owners, c.filerServerAddress(i)) } - sort.Slice(owners, func(i, j int) bool { - return owners[i] < owners[j] - }) + + ring := lock_manager.NewHashRing(lock_manager.DefaultVnodeCount) + ring.SetServers(owners) keysByOwner := make(map[pb.ServerAddress]string, len(owners)) for i := 0; i < 1024 && len(keysByOwner) < len(owners); i++ { key := fmt.Sprintf("%s-%03d.txt", prefix, i) - lockOwner := ownerForObjectLock(bucket, key, owners) + lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(key)) + lockOwner := ring.GetPrimary(lockKey) if _, exists := keysByOwner[lockOwner]; !exists { keysByOwner[lockOwner] = key } @@ -164,15 +164,6 @@ func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb return keysByOwner } -func ownerForObjectLock(bucket, object string, owners []pb.ServerAddress) pb.ServerAddress { - lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(object)) - hash := util.HashStringToLong(lockKey) - if hash < 0 { - hash = -hash - } - return owners[hash%int64(len(owners))] -} - func lockOwnerLabel(owner pb.ServerAddress) string { replacer := strings.NewReplacer(":", "_", ".", "_") return "owner_" + replacer.Replace(string(owner))