From 98714b9f70b8271974bf0c8649ce19115a561ce1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 13:11:04 -0700 Subject: [PATCH] fix(test): address flaky S3 distributed lock integration test (#8888) * fix(test): address flaky S3 distributed lock integration test Two root causes: 1. Lock ring convergence race: After waitForFilerCount(2) confirms the master sees both filers, there's a window where filer0's lock ring still only contains itself (master's LockRingUpdate broadcast is delayed by the 1s stabilization timer). During this window filer0 considers itself primary for ALL keys, so both filers can independently grant the same lock. Fix: Add waitForLockRingConverged() that acquires the same lock through both filers and verifies mutual exclusion before proceeding. 2. Hash function mismatch: ownerForObjectLock used util.HashStringToLong (MD5 + modulo) to predict lock owners, but the production DLM uses CRC32 consistent hashing via HashRing. This meant the test could pick keys that route to the same filer, not exercising the cross-filer coordination it intended to test. Fix: Use lock_manager.NewHashRing + GetPrimary() to match production routing exactly. * fix(test): verify lock denial reason in convergence check Ensure the convergence check only returns true when the second lock attempt is denied specifically because the lock is already owned, avoiding false positives from transient errors. * fix(test): check one key per primary filer in convergence wait A single arbitrary key can false-pass: if its real primary is the filer with the stale ring, mutual exclusion holds trivially because that filer IS the correct primary. Generate one test key per distinct primary using the same consistent-hash ring as production, so a stale ring on any filer is caught deterministically. --- .../distributed_lock_cluster_test.go | 127 ++++++++++++++++++ .../distributed_lock/distributed_lock_test.go | 21 +-- 2 files changed, 133 insertions(+), 15 deletions(-) diff --git a/test/s3/distributed_lock/distributed_lock_cluster_test.go b/test/s3/distributed_lock/distributed_lock_cluster_test.go index d7054b8ae..6cf1c6a4d 100644 --- a/test/s3/distributed_lock/distributed_lock_cluster_test.go +++ b/test/s3/distributed_lock/distributed_lock_cluster_test.go @@ -21,7 +21,9 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -139,6 +141,7 @@ func startDistributedLockCluster(t *testing.T) *distributedLockCluster { require.NoError(t, cluster.waitForTCP(cluster.filerGRPCAddress(i), 30*time.Second), "wait for filer %d grpc\n%s", i, cluster.tailLog(fmt.Sprintf("filer%d.log", i))) } require.NoError(t, cluster.waitForFilerCount(2, 30*time.Second), "wait for filer group registration") + require.NoError(t, cluster.waitForLockRingConverged(30*time.Second), "wait for lock ring convergence") for i := 0; i < 2; i++ { require.NoError(t, cluster.startS3(i), "start s3 %d", i) @@ -411,6 +414,130 @@ func (c *distributedLockCluster) waitForFilerCount(expected int, timeout time.Du return fmt.Errorf("timed out waiting for %d filers in group %q", expected, c.filerGroup) } +// waitForLockRingConverged verifies that both filers have a consistent view of the +// lock ring by acquiring the same lock through each filer and checking mutual exclusion. +// This guards against the window between master seeing both filers and the filers +// actually receiving the LockRingUpdate broadcast (delayed by the stabilization timer). +// +// A single arbitrary key could false-pass: if the key's real primary is filer0 and +// filer0 has a stale ring (only sees itself), it still grants correctly because it IS +// the primary. To catch the stale ring we must also test a key whose real primary is +// filer1. So we generate one test key per primary filer using the same consistent-hash +// ring as production, and require mutual exclusion for all of them. +func (c *distributedLockCluster) waitForLockRingConverged(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + owners := make([]pb.ServerAddress, 0, len(c.filerPorts)) + for i := range c.filerPorts { + owners = append(owners, c.filerServerAddress(i)) + } + ring := lock_manager.NewHashRing(lock_manager.DefaultVnodeCount) + ring.SetServers(owners) + + attempt := 0 + for time.Now().Before(deadline) { + // Generate one unique test key per primary filer + testKeys := c.convergenceKeysPerPrimary(ring, owners, attempt) + attempt++ + + allConverged := true + for _, key := range testKeys { + converged, _ := c.checkLockMutualExclusion(key) + if !converged { + allConverged = false + break + } + } + if allConverged { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("lock ring did not converge: both filers independently grant the same lock") +} + +// convergenceKeysPerPrimary returns one lock key per distinct primary filer, +// using the same consistent-hash ring as production routing. +func (c *distributedLockCluster) convergenceKeysPerPrimary(ring *lock_manager.HashRing, owners []pb.ServerAddress, attempt int) []string { + keysByOwner := make(map[pb.ServerAddress]string, len(owners)) + for i := 0; len(keysByOwner) < len(owners) && i < 1024; i++ { + candidate := fmt.Sprintf("convergence-%d-%d", attempt, i) + primary := ring.GetPrimary(candidate) + if _, exists := keysByOwner[primary]; !exists { + keysByOwner[primary] = candidate + } + } + keys := make([]string, 0, len(keysByOwner)) + for _, k := range keysByOwner { + keys = append(keys, k) + } + return keys +} + +// checkLockMutualExclusion acquires a lock via filer0, then tries the same lock via filer1. +// Returns true if mutual exclusion holds (second attempt is denied). +func (c *distributedLockCluster) checkLockMutualExclusion(testKey string) (bool, error) { + conn0, err := grpc.NewClient(c.filerGRPCAddress(0), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer conn0.Close() + + conn1, err := grpc.NewClient(c.filerGRPCAddress(1), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer conn1.Close() + + client0 := filer_pb.NewSeaweedFilerClient(conn0) + client1 := filer_pb.NewSeaweedFilerClient(conn1) + + // Acquire lock via filer0 + ctx0, cancel0 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel0() + resp0, err := client0.DistributedLock(ctx0, &filer_pb.LockRequest{ + Name: testKey, + SecondsToLock: 5, + Owner: "convergence-filer0", + }) + if err != nil || resp0.RenewToken == "" { + return false, fmt.Errorf("filer0 lock failed: err=%v resp=%v", err, resp0) + } + defer func() { + // Always release the lock we acquired + client0.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + Name: testKey, + RenewToken: resp0.RenewToken, + }) + }() + + // Try the same lock via filer1 - should be denied + ctx1, cancel1 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel1() + resp1, err := client1.DistributedLock(ctx1, &filer_pb.LockRequest{ + Name: testKey, + SecondsToLock: 5, + Owner: "convergence-filer1", + }) + if err != nil { + return false, err + } + if resp1.RenewToken != "" { + // Both filers granted the lock - ring not converged. Release the second lock. + client1.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ + Name: testKey, + RenewToken: resp1.RenewToken, + }) + return false, nil + } + // Verify the denial is specifically because the lock is already held, + // not due to a transient error that might give a false positive. + if !strings.Contains(resp1.Error, "lock already owned") { + return false, nil + } + return true, nil +} + func (c *distributedLockCluster) waitForHTTP(url string, timeout time.Duration) error { client := &net.Dialer{Timeout: time.Second} httpClient := &httpClientWithDialer{dialer: client} diff --git a/test/s3/distributed_lock/distributed_lock_test.go b/test/s3/distributed_lock/distributed_lock_test.go index 63adec185..97ebda1b1 100644 --- a/test/s3/distributed_lock/distributed_lock_test.go +++ b/test/s3/distributed_lock/distributed_lock_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "sort" "strings" "sync" "testing" @@ -15,9 +14,9 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/smithy-go" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -149,14 +148,15 @@ func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb for i := range c.filerPorts { owners = append(owners, c.filerServerAddress(i)) } - sort.Slice(owners, func(i, j int) bool { - return owners[i] < owners[j] - }) + + ring := lock_manager.NewHashRing(lock_manager.DefaultVnodeCount) + ring.SetServers(owners) keysByOwner := make(map[pb.ServerAddress]string, len(owners)) for i := 0; i < 1024 && len(keysByOwner) < len(owners); i++ { key := fmt.Sprintf("%s-%03d.txt", prefix, i) - lockOwner := ownerForObjectLock(bucket, key, owners) + lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(key)) + lockOwner := ring.GetPrimary(lockKey) if _, exists := keysByOwner[lockOwner]; !exists { keysByOwner[lockOwner] = key } @@ -164,15 +164,6 @@ func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb return keysByOwner } -func ownerForObjectLock(bucket, object string, owners []pb.ServerAddress) pb.ServerAddress { - lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(object)) - hash := util.HashStringToLong(lockKey) - if hash < 0 { - hash = -hash - } - return owners[hash%int64(len(owners))] -} - func lockOwnerLabel(owner pb.ServerAddress) string { replacer := strings.NewReplacer(":", "_", ".", "_") return "owner_" + replacer.Replace(string(owner))