Browse Source
dlm: resilient distributed locks via consistent hashing + backup replication (#8860)
dlm: resilient distributed locks via consistent hashing + backup replication (#8860)
* dlm: replace modulo hashing with consistent hash ring Introduce HashRing with virtual nodes (CRC32-based consistent hashing) to replace the modulo-based hashKeyToServer. When a filer node is removed, only keys that hashed to that node are remapped to the next server on the ring, leaving all other mappings stable. This is the foundation for backup replication — the successor on the ring is always the natural takeover node. * dlm: add Generation and IsBackup fields to Lock Lock now carries IsBackup (whether this node holds the lock as a backup replica) and Generation (a monotonic fencing token that increments on each fresh acquisition, stays the same on renewal). Add helper methods: AllLocks, PromoteLock, DemoteLock, InsertBackupLock, RemoveLock, GetLock. * dlm: add ReplicateLock RPC and generation/is_backup proto fields Add generation field to LockResponse for fencing tokens. Add generation and is_backup fields to Lock message. Add ReplicateLock RPC for primary-to-backup lock replication. Add ReplicateLockRequest/ReplicateLockResponse messages. * dlm: add async backup replication to DistributedLockManager Route lock/unlock via consistent hash ring's GetPrimaryAndBackup(). After a successful lock or unlock on the primary, asynchronously replicate the operation to the backup server via ReplicateFunc callback. Single-server deployments skip replication. * dlm: add ReplicateLock handler and backup-aware topology changes Add ReplicateLock gRPC handler for primary-to-backup replication. Revise OnDlmChangeSnapshot to handle three cases on topology change: - Promote backup locks when this node becomes primary - Demote primary locks when this node becomes backup - Transfer locks when this node is neither primary nor backup Wire up SetupDlmReplication during filer server initialization. * dlm: expose generation fencing token in lock client LiveLock now captures the generation from LockResponse and exposes it via Generation() method. Consumers can use this as a fencing token to detect stale lock holders. * dlm: update empty folder cleaner to use consistent hash ring Replace local modulo-based hashKeyToServer with LockRing.GetPrimary() which uses the shared consistent hash ring for folder ownership. * dlm: add unit tests for consistent hash ring Test basic operations, consistency on server removal (only keys from removed server move), backup-is-successor property (backup becomes new primary when primary is removed), and key distribution balance. * dlm: add integration tests for lock replication failure scenarios Test cases: - Primary crash with backup promotion (backup has valid token) - Backup crash with primary continuing - Both primary and backup crash (lock lost, re-acquirable) - Rolling restart across all nodes - Generation fencing token increments on new acquisition - Replication failure (primary still works independently) - Unlock replicates deletion to backup - Lock survives server addition (topology change) - Consistent hashing minimal disruption (only removed server's keys move) * dlm: address PR review findings 1. Causal replication ordering: Add per-lock sequence number (Seq) that increments on every mutation. Backup rejects incoming mutations with seq <= current seq, preventing stale async replications from overwriting newer state. Unlock replication also carries seq and is rejected if stale. 2. Demote-after-handoff: OnDlmChangeSnapshot now transfers the lock to the new primary first and only demotes to backup after a successful TransferLocks RPC. If the transfer fails, the lock stays as primary on this node. 3. SetSnapshot candidateServers leak: Replace the candidateServers map entirely instead of appending, so removed servers don't linger. 4. TransferLocks preserves Generation and Seq: InsertLock now accepts generation and seq parameters. After accepting a transferred lock, the receiving node re-replicates to its backup. 5. Rolling restart test: Add re-replication step after promotion and assert survivedCount > 0. Add TestDLM_StaleReplicationRejected. 6. Mixed-version upgrade note: Add comment on HashRing documenting that all filer nodes must be upgraded together. * dlm: serve renewals locally during transfer window on node join When a new node joins and steals hash ranges from surviving nodes, there's a window between ring update and lock transfer where the client gets redirected to a node that doesn't have the lock yet. Fix: if the ring says primary != self but we still hold the lock locally (non-backup, matching token), serve the renewal/unlock here rather than redirecting. The lock will be transferred by OnDlmChangeSnapshot, and subsequent requests will go to the new primary once the transfer completes. Add tests: - TestDLM_NodeDropAndJoin_OwnershipDisruption: measures disruption when a node drops and a new one joins (14/100 surviving-node locks disrupted, all handled by transfer logic) - TestDLM_RenewalDuringTransferWindow: verifies renewal succeeds on old primary during the transfer window * dlm: master-managed lock ring with stabilization batching The master now owns the lock ring membership. Instead of filers independently reacting to individual ClusterNodeUpdate add/remove events, the master: 1. Tracks filer membership in LockRingManager 2. Batches rapid changes with a 1-second stabilization timer (e.g., a node drop + join within 1 second → single ring update) 3. Broadcasts the complete ring snapshot atomically via the new LockRingUpdate message in KeepConnectedResponse Filers receive the ring as a complete snapshot and apply it via SetSnapshot, ensuring all filers converge to the same ring state without intermediate churn. This eliminates the double-churn problem where a rapid drop+join would fire two separate ring mutations, each triggering lock transfers and disrupting ownership on surviving nodes. * dlm: track ring version, reject stale updates, remove dead code SetSnapshot now takes a version parameter from the master. Stale updates (version < current) are rejected, preventing reordered messages from overwriting a newer ring state. Version 0 is always accepted for bootstrap. Remove AddServer/RemoveServer from LockRing — the ring is now exclusively managed by the master via SetSnapshot. Remove the candidateServers map that was only used by those methods. * dlm: fix SelectLocks data race, advance generation on backup insert - SelectLocks: change RLock to Lock since the function deletes map entries, which is a write operation and causes a data race under RLock. - InsertBackupLock: advance nextGeneration to at least the incoming generation so that after failover promotion, new lock acquisitions get a generation strictly greater than any replicated lock. - Bump replication failure log from V(1) to Warningf for production visibility. * dlm: fix SetSnapshot race, test reliability, timer edge cases - SetSnapshot: hold LockRing lock through both version update and Ring.SetServers() so they're atomic. Prevents a concurrent caller from seeing the new version but applying stale servers. - Transfer window test: search for a key that actually moves primary when filer4 joins, instead of relying on a fixed key that may not. - renewLock redirect: pass the existing token to the new primary instead of empty string, so redirected renewals work correctly. - scheduleBroadcast: check timer.Stop() return value. If the timer already fired, the callback picks up latest state. - FlushPending: only broadcast if timer.Stop() returns true (timer was still pending). If false, the callback is already running. - Fix test comment: "idempotent" → "accepted, state-changing". * dlm: use wall-clock nanoseconds for lock ring version The lock ring version was an in-memory counter that reset to 0 on master restart. A filer that had seen version 5 would reject version 1 from the restarted master. Fix: use time.Now().UnixNano() as the version. This survives master restarts without persistence — the restarted master produces a version greater than any pre-restart value. * dlm: treat expired lock owners as missing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * dlm: reject stale lock transfers Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * dlm: order replication by generation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * dlm: bootstrap lock ring on reconnect Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>cautious-dinosaur
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2955 additions and 612 deletions
-
17other/java/client/src/main/proto/filer.proto
-
12weed/cluster/lock_client.go
-
158weed/cluster/lock_manager/distributed_lock_manager.go
-
710weed/cluster/lock_manager/distributed_lock_manager_test.go
-
198weed/cluster/lock_manager/hash_ring.go
-
173weed/cluster/lock_manager/hash_ring_test.go
-
200weed/cluster/lock_manager/lock_manager.go
-
86weed/cluster/lock_manager/lock_manager_test.go
-
118weed/cluster/lock_manager/lock_ring.go
-
99weed/cluster/lock_manager/lock_ring_test.go
-
164weed/cluster/lock_ring_manager.go
-
229weed/cluster/lock_ring_manager_test.go
-
21weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
-
32weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
-
19weed/filer/filer.go
-
17weed/pb/filer.proto
-
309weed/pb/filer_pb/filer.pb.go
-
38weed/pb/filer_pb/filer_grpc.pb.go
-
11weed/pb/master.proto
-
694weed/pb/master_pb/master.pb.go
-
134weed/server/filer_grpc_server_dlm.go
-
36weed/server/filer_grpc_server_dlm_test.go
-
1weed/server/filer_server.go
-
27weed/server/master_grpc_server.go
-
37weed/server/master_grpc_server_test.go
-
4weed/server/master_server.go
-
23weed/wdclient/masterclient.go
@ -0,0 +1,710 @@ |
|||
package lock_manager |
|||
|
|||
import ( |
|||
"fmt" |
|||
"sync" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
// testCluster simulates a cluster of filer nodes with DLMs.
|
|||
// It wires up ReplicateFn so that replication calls arrive at the
|
|||
// correct peer's DLM, enabling end-to-end backup testing without gRPC.
|
|||
type testCluster struct { |
|||
mu sync.Mutex |
|||
nodes map[pb.ServerAddress]*DistributedLockManager |
|||
} |
|||
|
|||
func newTestCluster(hosts ...pb.ServerAddress) *testCluster { |
|||
c := &testCluster{nodes: make(map[pb.ServerAddress]*DistributedLockManager)} |
|||
servers := make([]pb.ServerAddress, len(hosts)) |
|||
copy(servers, hosts) |
|||
|
|||
for _, host := range hosts { |
|||
dlm := NewDistributedLockManager(host) |
|||
dlm.LockRing.SetSnapshot(servers, 0) |
|||
c.nodes[host] = dlm |
|||
} |
|||
|
|||
// Wire up replication: each node's ReplicateFn calls the backup's DLM directly
|
|||
for _, dlm := range c.nodes { |
|||
d := dlm // capture
|
|||
d.ReplicateFn = func(server pb.ServerAddress, key string, expiredAtNs int64, token string, owner string, generation int64, seq int64, isUnlock bool) { |
|||
c.mu.Lock() |
|||
target, ok := c.nodes[server] |
|||
c.mu.Unlock() |
|||
if !ok { |
|||
return // server is down
|
|||
} |
|||
if isUnlock { |
|||
target.RemoveBackupLockIfSeq(key, generation, seq) |
|||
} else { |
|||
target.InsertBackupLock(key, expiredAtNs, token, owner, generation, seq) |
|||
} |
|||
} |
|||
} |
|||
|
|||
return c |
|||
} |
|||
|
|||
func (c *testCluster) removeNode(host pb.ServerAddress) { |
|||
c.mu.Lock() |
|||
delete(c.nodes, host) |
|||
c.mu.Unlock() |
|||
|
|||
// Update all remaining nodes' rings
|
|||
remaining := c.getServers() |
|||
for _, dlm := range c.getNodes() { |
|||
dlm.LockRing.SetSnapshot(remaining, 0) |
|||
} |
|||
} |
|||
|
|||
func (c *testCluster) addNode(host pb.ServerAddress) { |
|||
c.mu.Lock() |
|||
dlm := NewDistributedLockManager(host) |
|||
c.nodes[host] = dlm |
|||
c.mu.Unlock() |
|||
|
|||
// Wire up replication
|
|||
dlm.ReplicateFn = func(server pb.ServerAddress, key string, expiredAtNs int64, token string, owner string, generation int64, seq int64, isUnlock bool) { |
|||
c.mu.Lock() |
|||
target, ok := c.nodes[server] |
|||
c.mu.Unlock() |
|||
if !ok { |
|||
return |
|||
} |
|||
if isUnlock { |
|||
target.RemoveBackupLockIfSeq(key, generation, seq) |
|||
} else { |
|||
target.InsertBackupLock(key, expiredAtNs, token, owner, generation, seq) |
|||
} |
|||
} |
|||
|
|||
servers := c.getServers() |
|||
for _, n := range c.getNodes() { |
|||
n.LockRing.SetSnapshot(servers, 0) |
|||
} |
|||
} |
|||
|
|||
func (c *testCluster) getNodes() map[pb.ServerAddress]*DistributedLockManager { |
|||
c.mu.Lock() |
|||
defer c.mu.Unlock() |
|||
cp := make(map[pb.ServerAddress]*DistributedLockManager, len(c.nodes)) |
|||
for k, v := range c.nodes { |
|||
cp[k] = v |
|||
} |
|||
return cp |
|||
} |
|||
|
|||
func (c *testCluster) getServers() []pb.ServerAddress { |
|||
c.mu.Lock() |
|||
defer c.mu.Unlock() |
|||
var servers []pb.ServerAddress |
|||
for s := range c.nodes { |
|||
servers = append(servers, s) |
|||
} |
|||
return servers |
|||
} |
|||
|
|||
func (c *testCluster) get(host pb.ServerAddress) *DistributedLockManager { |
|||
c.mu.Lock() |
|||
defer c.mu.Unlock() |
|||
return c.nodes[host] |
|||
} |
|||
|
|||
// acquireLock tries to acquire a lock on the correct primary node.
|
|||
// It follows redirects (movedTo) like a real client would.
|
|||
func (c *testCluster) acquireLock(key, owner string, ttl time.Duration) (renewToken string, generation int64, primaryHost pb.ServerAddress, err error) { |
|||
// Try any node first (simulates client connecting to seed filer)
|
|||
for _, dlm := range c.getNodes() { |
|||
expiry := time.Now().Add(ttl).UnixNano() |
|||
var movedTo pb.ServerAddress |
|||
var lockErr error |
|||
_, renewToken, generation, movedTo, lockErr = dlm.LockWithTimeout(key, expiry, "", owner) |
|||
if movedTo != "" && movedTo != dlm.Host { |
|||
// Follow redirect
|
|||
target := c.get(movedTo) |
|||
if target == nil { |
|||
err = fmt.Errorf("primary %s is down", movedTo) |
|||
return |
|||
} |
|||
_, renewToken, generation, _, lockErr = target.LockWithTimeout(key, expiry, "", owner) |
|||
if lockErr != nil { |
|||
err = lockErr |
|||
return |
|||
} |
|||
primaryHost = movedTo |
|||
// Wait briefly for async replication to complete
|
|||
time.Sleep(10 * time.Millisecond) |
|||
return |
|||
} |
|||
if lockErr != nil { |
|||
err = lockErr |
|||
return |
|||
} |
|||
primaryHost = dlm.Host |
|||
time.Sleep(10 * time.Millisecond) |
|||
return |
|||
} |
|||
err = fmt.Errorf("no nodes available") |
|||
return |
|||
} |
|||
|
|||
// renewLock renews a lock on the primary node
|
|||
func (c *testCluster) renewLock(key, owner, token string, ttl time.Duration, primaryHost pb.ServerAddress) (newToken string, generation int64, err error) { |
|||
target := c.get(primaryHost) |
|||
if target == nil { |
|||
err = fmt.Errorf("primary %s is down", primaryHost) |
|||
return |
|||
} |
|||
expiry := time.Now().Add(ttl).UnixNano() |
|||
var movedTo pb.ServerAddress |
|||
var lockErr error |
|||
_, newToken, generation, movedTo, lockErr = target.LockWithTimeout(key, expiry, token, owner) |
|||
if movedTo != "" && movedTo != primaryHost { |
|||
target = c.get(movedTo) |
|||
if target == nil { |
|||
err = fmt.Errorf("new primary %s is down", movedTo) |
|||
return |
|||
} |
|||
// Pass the existing token so the redirected renewal can match
|
|||
// if the lock was already transferred to the new primary.
|
|||
_, newToken, generation, _, lockErr = target.LockWithTimeout(key, expiry, token, owner) |
|||
} |
|||
err = lockErr |
|||
time.Sleep(10 * time.Millisecond) |
|||
return |
|||
} |
|||
|
|||
// --- Test Cases ---
|
|||
|
|||
func TestDLM_PrimaryCrash_BackupPromotes(t *testing.T) { |
|||
// Scenario: Lock is acquired, primary crashes, backup should have the lock
|
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "test-lock-primary-crash" |
|||
renewToken, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
require.NotEmpty(t, renewToken) |
|||
|
|||
// Find the backup for this key
|
|||
_, backup := cluster.get(primaryHost).LockRing.GetPrimaryAndBackup(key) |
|||
require.NotEmpty(t, backup, "should have a backup server") |
|||
|
|||
// Verify backup has the lock
|
|||
backupDlm := cluster.get(backup) |
|||
backupLock, found := backupDlm.GetLock(key) |
|||
require.True(t, found, "backup should have the lock") |
|||
assert.True(t, backupLock.IsBackup, "lock on backup should be marked as backup") |
|||
assert.Equal(t, renewToken, backupLock.Token, "backup should have the same token") |
|||
|
|||
// Crash the primary
|
|||
cluster.removeNode(primaryHost) |
|||
|
|||
// Simulate topology change: promote backup locks
|
|||
for _, dlm := range cluster.getNodes() { |
|||
locks := dlm.AllLocks() |
|||
for _, lock := range locks { |
|||
newPrimary, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if newPrimary == dlm.Host && lock.IsBackup { |
|||
dlm.PromoteLock(lock.Key) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// The backup should now be the primary
|
|||
newPrimary := backupDlm.LockRing.GetPrimary(key) |
|||
assert.Equal(t, backup, newPrimary, "backup should be the new primary") |
|||
|
|||
// The promoted lock should work — verify it's no longer a backup
|
|||
promotedLock, found := backupDlm.GetLock(key) |
|||
require.True(t, found, "lock should still exist after promotion") |
|||
assert.False(t, promotedLock.IsBackup, "lock should be promoted to primary") |
|||
|
|||
// Client should be able to renew with the same token on the new primary
|
|||
newToken, _, err := cluster.renewLock(key, "owner1", renewToken, 30*time.Second, backup) |
|||
require.NoError(t, err) |
|||
assert.NotEmpty(t, newToken, "renewal on new primary should succeed") |
|||
} |
|||
|
|||
func TestDLM_BackupCrash_PrimaryContinues(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "test-lock-backup-crash" |
|||
renewToken, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
_, backup := cluster.get(primaryHost).LockRing.GetPrimaryAndBackup(key) |
|||
|
|||
// Crash the backup
|
|||
cluster.removeNode(backup) |
|||
|
|||
// Primary should still work — renew the lock
|
|||
newToken, _, err := cluster.renewLock(key, "owner1", renewToken, 30*time.Second, primaryHost) |
|||
require.NoError(t, err) |
|||
assert.NotEmpty(t, newToken, "primary should continue working after backup crash") |
|||
|
|||
// Verify primary is still the primary for this key
|
|||
newPrimary := cluster.get(primaryHost).LockRing.GetPrimary(key) |
|||
assert.Equal(t, primaryHost, newPrimary) |
|||
} |
|||
|
|||
func TestDLM_BothPrimaryAndBackupCrash(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "test-lock-both-crash" |
|||
_, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
_, backup := cluster.get(primaryHost).LockRing.GetPrimaryAndBackup(key) |
|||
|
|||
// Crash both
|
|||
cluster.removeNode(primaryHost) |
|||
cluster.removeNode(backup) |
|||
|
|||
// The lock is lost — the surviving node should be able to acquire it fresh
|
|||
newToken, _, _, err := cluster.acquireLock(key, "owner2", 30*time.Second) |
|||
require.NoError(t, err) |
|||
assert.NotEmpty(t, newToken, "new owner should acquire lock after both crash") |
|||
} |
|||
|
|||
func TestDLM_RollingRestart(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
// Acquire multiple locks
|
|||
type lockState struct { |
|||
key, owner, token string |
|||
generation int64 |
|||
primary pb.ServerAddress |
|||
} |
|||
locks := make([]lockState, 5) |
|||
for i := range locks { |
|||
key := fmt.Sprintf("rolling-lock-%d", i) |
|||
token, gen, primary, err := cluster.acquireLock(key, fmt.Sprintf("owner-%d", i), 30*time.Second) |
|||
require.NoError(t, err) |
|||
locks[i] = lockState{key: key, owner: fmt.Sprintf("owner-%d", i), token: token, generation: gen, primary: primary} |
|||
} |
|||
|
|||
// Rolling restart: remove and re-add each node one at a time.
|
|||
// After removing a node, promote backups and re-replicate to new backups
|
|||
// to maintain the invariant that each lock has a backup copy.
|
|||
for _, host := range hosts { |
|||
cluster.removeNode(host) |
|||
|
|||
// Simulate full OnDlmChangeSnapshot: promote backups and re-replicate
|
|||
for _, dlm := range cluster.getNodes() { |
|||
for _, lock := range dlm.AllLocks() { |
|||
newPrimary, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if newPrimary == dlm.Host && lock.IsBackup { |
|||
dlm.PromoteLock(lock.Key) |
|||
} |
|||
} |
|||
// Re-replicate all primary locks to their new backups
|
|||
for _, lock := range dlm.AllLocks() { |
|||
newPrimary, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if newPrimary == dlm.Host && !lock.IsBackup { |
|||
dlm.replicateToBackup(lock.Key, lock.ExpiredAtNs, lock.Token, lock.Owner, lock.Generation, lock.Seq, false) |
|||
} |
|||
} |
|||
} |
|||
|
|||
time.Sleep(10 * time.Millisecond) |
|||
|
|||
// Re-add the node
|
|||
cluster.addNode(host) |
|||
time.Sleep(10 * time.Millisecond) |
|||
} |
|||
|
|||
// After rolling restart, locks should survive via backup promotion
|
|||
survivedCount := 0 |
|||
for _, ls := range locks { |
|||
for _, dlm := range cluster.getNodes() { |
|||
lock, found := dlm.GetLock(ls.key) |
|||
if found && !lock.IsBackup { |
|||
survivedCount++ |
|||
break |
|||
} |
|||
} |
|||
} |
|||
t.Logf("Locks survived rolling restart: %d / %d", survivedCount, len(locks)) |
|||
require.Greater(t, survivedCount, 0, "at least some locks should survive a rolling restart via backup promotion") |
|||
} |
|||
|
|||
func TestDLM_GenerationIncrementsOnNewAcquisition(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "gen-test-lock" |
|||
|
|||
// Acquire lock — generation should be > 0
|
|||
token1, gen1, primary, err := cluster.acquireLock(key, "owner1", 2*time.Second) |
|||
require.NoError(t, err) |
|||
assert.Greater(t, gen1, int64(0)) |
|||
|
|||
// Renew — generation should stay the same
|
|||
token2, gen2, err := cluster.renewLock(key, "owner1", token1, 2*time.Second, primary) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, gen1, gen2, "generation should not change on renewal") |
|||
|
|||
// Let lock expire
|
|||
time.Sleep(3 * time.Second) |
|||
|
|||
// Re-acquire — generation should increment
|
|||
_, gen3, _, err := cluster.acquireLock(key, "owner2", 30*time.Second) |
|||
require.NoError(t, err) |
|||
assert.Greater(t, gen3, gen1, "generation should increment on new acquisition") |
|||
_ = token2 |
|||
} |
|||
|
|||
func TestDLM_ReplicationFailure_PrimaryStillWorks(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
// Break replication by setting a no-op ReplicateFn on all nodes
|
|||
for _, dlm := range cluster.getNodes() { |
|||
dlm.ReplicateFn = func(server pb.ServerAddress, key string, expiredAtNs int64, token string, owner string, generation int64, seq int64, isUnlock bool) { |
|||
// Simulate replication failure: do nothing
|
|||
} |
|||
} |
|||
|
|||
key := "repl-fail-lock" |
|||
renewToken, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
// Primary should have the lock
|
|||
primaryDlm := cluster.get(primaryHost) |
|||
lock, found := primaryDlm.GetLock(key) |
|||
require.True(t, found, "primary should have the lock") |
|||
assert.False(t, lock.IsBackup) |
|||
|
|||
// Backup should NOT have it (replication failed)
|
|||
_, backup := primaryDlm.LockRing.GetPrimaryAndBackup(key) |
|||
backupDlm := cluster.get(backup) |
|||
_, found = backupDlm.GetLock(key) |
|||
assert.False(t, found, "backup should not have the lock when replication fails") |
|||
|
|||
// Primary should still be able to renew
|
|||
newToken, _, err := cluster.renewLock(key, "owner1", renewToken, 30*time.Second, primaryHost) |
|||
require.NoError(t, err) |
|||
assert.NotEmpty(t, newToken) |
|||
} |
|||
|
|||
func TestDLM_UnlockReplicatesToBackup(t *testing.T) { |
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "unlock-repl-lock" |
|||
renewToken, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
_, backup := cluster.get(primaryHost).LockRing.GetPrimaryAndBackup(key) |
|||
|
|||
// Verify backup has the lock
|
|||
_, found := cluster.get(backup).GetLock(key) |
|||
require.True(t, found, "backup should have the lock") |
|||
|
|||
// Unlock on primary
|
|||
primaryDlm := cluster.get(primaryHost) |
|||
movedTo, err := primaryDlm.Unlock(key, renewToken) |
|||
require.NoError(t, err) |
|||
assert.Empty(t, movedTo) |
|||
|
|||
// Wait for async replication
|
|||
time.Sleep(20 * time.Millisecond) |
|||
|
|||
// Backup should also have removed the lock
|
|||
_, found = cluster.get(backup).GetLock(key) |
|||
assert.False(t, found, "backup should remove lock after unlock replication") |
|||
} |
|||
|
|||
func TestDLM_TopologyChange_LockSurvivesServerAddition(t *testing.T) { |
|||
// Start with 2 servers, acquire lock, add a 3rd server
|
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
key := "topo-add-lock" |
|||
renewToken, _, primaryHost, err := cluster.acquireLock(key, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
// Add a new server
|
|||
cluster.addNode("filer3:8888") |
|||
time.Sleep(20 * time.Millisecond) |
|||
|
|||
// The lock should still be accessible — either the same primary or on a new one
|
|||
// Try to renew on the original primary first
|
|||
newPrimary := cluster.get(primaryHost).LockRing.GetPrimary(key) |
|||
if newPrimary == primaryHost { |
|||
// Still on same primary
|
|||
newToken, _, err := cluster.renewLock(key, "owner1", renewToken, 30*time.Second, primaryHost) |
|||
require.NoError(t, err) |
|||
assert.NotEmpty(t, newToken) |
|||
} |
|||
// If primary changed, the lock may need transfer — that's handled by OnDlmChangeSnapshot
|
|||
// which is tested at the server level
|
|||
} |
|||
|
|||
func TestDLM_ConsistentHashing_MinimalDisruption(t *testing.T) { |
|||
// Verify that removing a server only affects locks on that server
|
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
// Acquire 50 locks
|
|||
type lockInfo struct { |
|||
key, token string |
|||
primary pb.ServerAddress |
|||
} |
|||
locks := make([]lockInfo, 50) |
|||
for i := range locks { |
|||
key := fmt.Sprintf("min-disrupt-%d", i) |
|||
token, _, primary, err := cluster.acquireLock(key, "owner", 30*time.Second) |
|||
require.NoError(t, err) |
|||
locks[i] = lockInfo{key: key, token: token, primary: primary} |
|||
} |
|||
|
|||
// Count locks per server before removal
|
|||
countBefore := make(map[pb.ServerAddress]int) |
|||
for _, l := range locks { |
|||
countBefore[l.primary]++ |
|||
} |
|||
t.Logf("Lock distribution before: %v", countBefore) |
|||
|
|||
// Remove filer2
|
|||
cluster.removeNode("filer2:8888") |
|||
|
|||
// Count how many locks changed primary
|
|||
changed := 0 |
|||
for _, l := range locks { |
|||
// Check where the lock should be now
|
|||
for _, dlm := range cluster.getNodes() { |
|||
newPrimary := dlm.LockRing.GetPrimary(l.key) |
|||
if newPrimary != l.primary { |
|||
changed++ |
|||
} |
|||
break |
|||
} |
|||
} |
|||
|
|||
// Only locks from filer2 should have changed
|
|||
assert.Equal(t, countBefore["filer2:8888"], changed, |
|||
"only locks from removed server should change primary") |
|||
} |
|||
|
|||
func TestDLM_NodeDropAndJoin_OwnershipDisruption(t *testing.T) { |
|||
// Scenario: 3 nodes, acquire locks, one drops and a NEW node joins quickly.
|
|||
// The new node steals hash ranges from surviving nodes, not just from the
|
|||
// departed node. This test measures the disruption.
|
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
// Acquire many locks
|
|||
numLocks := 100 |
|||
type lockInfo struct { |
|||
key, token string |
|||
primary pb.ServerAddress |
|||
} |
|||
locks := make([]lockInfo, numLocks) |
|||
for i := range locks { |
|||
key := fmt.Sprintf("churn-lock-%d", i) |
|||
token, _, primary, err := cluster.acquireLock(key, "owner", 30*time.Second) |
|||
require.NoError(t, err) |
|||
locks[i] = lockInfo{key: key, token: token, primary: primary} |
|||
} |
|||
|
|||
// Record primary for each lock before the change
|
|||
beforePrimary := make(map[string]pb.ServerAddress) |
|||
for _, l := range locks { |
|||
beforePrimary[l.key] = l.primary |
|||
} |
|||
|
|||
// Drop filer3 and immediately add filer4
|
|||
cluster.removeNode("filer3:8888") |
|||
|
|||
// Promote backups on remaining nodes (simulates OnDlmChangeSnapshot)
|
|||
for _, dlm := range cluster.getNodes() { |
|||
for _, lock := range dlm.AllLocks() { |
|||
p, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if p == dlm.Host && lock.IsBackup { |
|||
dlm.PromoteLock(lock.Key) |
|||
} |
|||
} |
|||
// Re-replicate primary locks to new backups
|
|||
for _, lock := range dlm.AllLocks() { |
|||
p, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if p == dlm.Host && !lock.IsBackup { |
|||
dlm.replicateToBackup(lock.Key, lock.ExpiredAtNs, lock.Token, lock.Owner, lock.Generation, lock.Seq, false) |
|||
} |
|||
} |
|||
} |
|||
time.Sleep(10 * time.Millisecond) |
|||
|
|||
// Now add filer4 (new node, empty)
|
|||
cluster.addNode("filer4:8888") |
|||
time.Sleep(10 * time.Millisecond) |
|||
|
|||
// Simulate OnDlmChangeSnapshot on all nodes after filer4 joins:
|
|||
// transfer locks that now belong to filer4
|
|||
for host, dlm := range cluster.getNodes() { |
|||
for _, lock := range dlm.AllLocks() { |
|||
p, _ := dlm.LockRing.GetPrimaryAndBackup(lock.Key) |
|||
if p != host && !lock.IsBackup { |
|||
// This lock should move to the new primary
|
|||
target := cluster.get(p) |
|||
if target != nil { |
|||
target.InsertLock(lock.Key, lock.ExpiredAtNs, lock.Token, lock.Owner, lock.Generation, lock.Seq) |
|||
dlm.DemoteLock(lock.Key) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
time.Sleep(10 * time.Millisecond) |
|||
|
|||
// Count disruptions: locks whose primary changed to a node other than filer3's successor
|
|||
disruptedFromSurvivors := 0 |
|||
disruptedFromDeparted := 0 |
|||
movedToFiler4 := 0 |
|||
for _, l := range locks { |
|||
// What's the new primary?
|
|||
var newPrimary pb.ServerAddress |
|||
for _, dlm := range cluster.getNodes() { |
|||
newPrimary = dlm.LockRing.GetPrimary(l.key) |
|||
break |
|||
} |
|||
oldPrimary := beforePrimary[l.key] |
|||
if newPrimary != oldPrimary { |
|||
if oldPrimary == "filer3:8888" { |
|||
disruptedFromDeparted++ |
|||
} else { |
|||
disruptedFromSurvivors++ |
|||
} |
|||
} |
|||
if newPrimary == "filer4:8888" { |
|||
movedToFiler4++ |
|||
} |
|||
} |
|||
|
|||
t.Logf("Locks disrupted from departed filer3: %d / %d", disruptedFromDeparted, numLocks) |
|||
t.Logf("Locks disrupted from surviving filer1/filer2: %d / %d", disruptedFromSurvivors, numLocks) |
|||
t.Logf("Locks now on new filer4: %d / %d", movedToFiler4, numLocks) |
|||
|
|||
// The key concern: filer4 joining disrupts locks on surviving nodes
|
|||
// With consistent hashing, new node steals ~1/N of each surviving node's keys
|
|||
// Verify that the transfer logic above moved those locks to filer4
|
|||
for _, l := range locks { |
|||
var newPrimary pb.ServerAddress |
|||
for _, dlm := range cluster.getNodes() { |
|||
newPrimary = dlm.LockRing.GetPrimary(l.key) |
|||
break |
|||
} |
|||
target := cluster.get(newPrimary) |
|||
require.NotNil(t, target, "primary %s should exist", newPrimary) |
|||
|
|||
lock, found := target.GetLock(l.key) |
|||
if !found { |
|||
// Lock may have only a backup copy if transfer happened but
|
|||
// the lock was on the departed node and wasn't re-replicated.
|
|||
// Check all nodes for any copy.
|
|||
anyFound := false |
|||
for _, dlm := range cluster.getNodes() { |
|||
if _, f := dlm.GetLock(l.key); f { |
|||
anyFound = true |
|||
break |
|||
} |
|||
} |
|||
if !anyFound { |
|||
t.Errorf("lock %s completely lost (primary should be %s)", l.key, newPrimary) |
|||
} |
|||
continue |
|||
} |
|||
assert.False(t, lock.IsBackup, "lock %s on primary %s should not be a backup", l.key, newPrimary) |
|||
} |
|||
} |
|||
|
|||
func TestDLM_RenewalDuringTransferWindow(t *testing.T) { |
|||
// When a new node joins and steals a key range from a surviving node,
|
|||
// there's a window between ring update and lock transfer. During this
|
|||
// window, a client renewal should still succeed on the old primary
|
|||
// (because it still holds the lock locally).
|
|||
hosts := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
cluster := newTestCluster(hosts...) |
|||
|
|||
// Find a key that will move primary when filer4 is added.
|
|||
// Try candidate keys until we find one whose primary changes.
|
|||
var key, renewToken string |
|||
var primaryHost pb.ServerAddress |
|||
for i := 0; i < 1000; i++ { |
|||
candidate := fmt.Sprintf("transfer-window-lock-%d", i) |
|||
token, _, primary, err := cluster.acquireLock(candidate, "owner1", 30*time.Second) |
|||
require.NoError(t, err) |
|||
|
|||
// Check if adding filer4 would move this key's primary
|
|||
tmpRing := NewHashRing(DefaultVnodeCount) |
|||
tmpRing.SetServers([]pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888", "filer4:8888"}) |
|||
newPrimary := tmpRing.GetPrimary(candidate) |
|||
if newPrimary != primary { |
|||
key = candidate |
|||
renewToken = token |
|||
primaryHost = primary |
|||
break |
|||
} |
|||
} |
|||
require.NotEmpty(t, key, "should find a key that moves primary when filer4 joins") |
|||
|
|||
// Add filer4 — this changes the primary for our key per the ring
|
|||
cluster.addNode("filer4:8888") |
|||
time.Sleep(10 * time.Millisecond) |
|||
|
|||
newPrimary := cluster.get(primaryHost).LockRing.GetPrimary(key) |
|||
require.NotEqual(t, primaryHost, newPrimary, "key should have moved to a different primary") |
|||
|
|||
// Renewal on the OLD primary should still succeed because it holds the lock locally
|
|||
newToken, _, err := cluster.renewLock(key, "owner1", renewToken, 30*time.Second, primaryHost) |
|||
require.NoError(t, err, "renewal on old primary should succeed during transfer window") |
|||
assert.NotEmpty(t, newToken, "should get a new token from old primary") |
|||
t.Logf("Key %s: primary changed from %s to %s, but renewal on old primary succeeded", key, primaryHost, newPrimary) |
|||
} |
|||
|
|||
func TestDLM_StaleReplicationRejected(t *testing.T) { |
|||
// Verify that a stale replication (lower seq) does not overwrite a newer one
|
|||
lm := NewLockManager() |
|||
|
|||
// Insert backup with seq=3
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-new", "owner1", 1, 3) |
|||
lock, found := lm.GetLock("key1") |
|||
require.True(t, found) |
|||
assert.Equal(t, "token-new", lock.Token) |
|||
assert.Equal(t, int64(3), lock.Seq) |
|||
|
|||
// Try to overwrite with stale seq=2 — should be rejected
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-old", "owner1", 1, 2) |
|||
lock, found = lm.GetLock("key1") |
|||
require.True(t, found) |
|||
assert.Equal(t, "token-new", lock.Token, "stale replication should be rejected") |
|||
assert.Equal(t, int64(3), lock.Seq) |
|||
|
|||
// Update with higher seq=4 — should succeed
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-newer", "owner1", 1, 4) |
|||
lock, found = lm.GetLock("key1") |
|||
require.True(t, found) |
|||
assert.Equal(t, "token-newer", lock.Token, "newer replication should be accepted") |
|||
assert.Equal(t, int64(4), lock.Seq) |
|||
|
|||
// Stale unlock (seq=2) should not delete the lock
|
|||
removed := lm.RemoveBackupLockIfSeq("key1", 1, 2) |
|||
assert.False(t, removed, "stale unlock should be rejected") |
|||
_, found = lm.GetLock("key1") |
|||
assert.True(t, found, "lock should still exist after stale unlock") |
|||
|
|||
// Valid unlock (seq=5) should delete
|
|||
removed = lm.RemoveBackupLockIfSeq("key1", 1, 5) |
|||
assert.True(t, removed, "valid unlock should be accepted") |
|||
_, found = lm.GetLock("key1") |
|||
assert.False(t, found, "lock should be removed after valid unlock") |
|||
} |
|||
@ -0,0 +1,198 @@ |
|||
package lock_manager |
|||
|
|||
import ( |
|||
"hash/crc32" |
|||
"sort" |
|||
"sync" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
) |
|||
|
|||
const DefaultVnodeCount = 50 |
|||
|
|||
// HashRing implements consistent hashing with virtual nodes.
|
|||
// When a server is removed, only the keys that hashed to that server
|
|||
// are remapped (to the next server on the ring), leaving all other
|
|||
// key-to-server mappings stable.
|
|||
//
|
|||
// UPGRADE NOTE: This replaces the previous modulo-based hashing
|
|||
// (hash % len(servers)). The two schemes compute different primaries
|
|||
// for the same key, so all filer nodes in the cluster must be upgraded
|
|||
// together (or via a rolling restart that completes within the lock TTL
|
|||
// window of ~7 seconds) to avoid routing disagreements.
|
|||
type HashRing struct { |
|||
mu sync.RWMutex |
|||
vnodeCount int |
|||
sortedHashes []uint32 // sorted ring positions
|
|||
vnodeToServer map[uint32]pb.ServerAddress // ring position → server
|
|||
servers map[pb.ServerAddress]struct{} // set of all servers
|
|||
} |
|||
|
|||
func NewHashRing(vnodeCount int) *HashRing { |
|||
if vnodeCount <= 0 { |
|||
vnodeCount = DefaultVnodeCount |
|||
} |
|||
return &HashRing{ |
|||
vnodeCount: vnodeCount, |
|||
vnodeToServer: make(map[uint32]pb.ServerAddress), |
|||
servers: make(map[pb.ServerAddress]struct{}), |
|||
} |
|||
} |
|||
|
|||
// AddServer adds a server with virtual nodes to the ring.
|
|||
func (hr *HashRing) AddServer(server pb.ServerAddress) { |
|||
hr.mu.Lock() |
|||
defer hr.mu.Unlock() |
|||
|
|||
if _, exists := hr.servers[server]; exists { |
|||
return |
|||
} |
|||
hr.servers[server] = struct{}{} |
|||
hr.rebuildRing() |
|||
} |
|||
|
|||
// RemoveServer removes a server and its virtual nodes from the ring.
|
|||
func (hr *HashRing) RemoveServer(server pb.ServerAddress) { |
|||
hr.mu.Lock() |
|||
defer hr.mu.Unlock() |
|||
|
|||
if _, exists := hr.servers[server]; !exists { |
|||
return |
|||
} |
|||
delete(hr.servers, server) |
|||
hr.rebuildRing() |
|||
} |
|||
|
|||
// SetServers replaces the entire server set.
|
|||
func (hr *HashRing) SetServers(servers []pb.ServerAddress) { |
|||
hr.mu.Lock() |
|||
defer hr.mu.Unlock() |
|||
|
|||
hr.servers = make(map[pb.ServerAddress]struct{}, len(servers)) |
|||
for _, s := range servers { |
|||
hr.servers[s] = struct{}{} |
|||
} |
|||
hr.rebuildRing() |
|||
} |
|||
|
|||
// GetPrimaryAndBackup returns the primary server for a key and its backup
|
|||
// (the next distinct server clockwise on the ring).
|
|||
// If there is only one server, backup is empty.
|
|||
func (hr *HashRing) GetPrimaryAndBackup(key string) (primary, backup pb.ServerAddress) { |
|||
hr.mu.RLock() |
|||
defer hr.mu.RUnlock() |
|||
|
|||
if len(hr.sortedHashes) == 0 { |
|||
return "", "" |
|||
} |
|||
|
|||
hash := hashKey(key) |
|||
idx := hr.search(hash) |
|||
primary = hr.vnodeToServer[hr.sortedHashes[idx]] |
|||
|
|||
// Walk clockwise to find a different server for backup
|
|||
ringLen := len(hr.sortedHashes) |
|||
for i := 1; i < ringLen; i++ { |
|||
candidate := hr.vnodeToServer[hr.sortedHashes[(idx+i)%ringLen]] |
|||
if candidate != primary { |
|||
backup = candidate |
|||
return |
|||
} |
|||
} |
|||
// Only one server — no backup
|
|||
return primary, "" |
|||
} |
|||
|
|||
// GetPrimary returns just the primary server for a key.
|
|||
func (hr *HashRing) GetPrimary(key string) pb.ServerAddress { |
|||
hr.mu.RLock() |
|||
defer hr.mu.RUnlock() |
|||
|
|||
if len(hr.sortedHashes) == 0 { |
|||
return "" |
|||
} |
|||
|
|||
hash := hashKey(key) |
|||
idx := hr.search(hash) |
|||
return hr.vnodeToServer[hr.sortedHashes[idx]] |
|||
} |
|||
|
|||
// GetServers returns a sorted copy of all servers in the ring.
|
|||
func (hr *HashRing) GetServers() []pb.ServerAddress { |
|||
hr.mu.RLock() |
|||
defer hr.mu.RUnlock() |
|||
|
|||
servers := make([]pb.ServerAddress, 0, len(hr.servers)) |
|||
for s := range hr.servers { |
|||
servers = append(servers, s) |
|||
} |
|||
sort.Slice(servers, func(i, j int) bool { |
|||
return servers[i] < servers[j] |
|||
}) |
|||
return servers |
|||
} |
|||
|
|||
// ServerCount returns the number of servers in the ring.
|
|||
func (hr *HashRing) ServerCount() int { |
|||
hr.mu.RLock() |
|||
defer hr.mu.RUnlock() |
|||
return len(hr.servers) |
|||
} |
|||
|
|||
// rebuildRing rebuilds the sorted hash ring from the current server set.
|
|||
// Caller must hold hr.mu write lock.
|
|||
func (hr *HashRing) rebuildRing() { |
|||
hr.vnodeToServer = make(map[uint32]pb.ServerAddress, len(hr.servers)*hr.vnodeCount) |
|||
hr.sortedHashes = make([]uint32, 0, len(hr.servers)*hr.vnodeCount) |
|||
|
|||
for server := range hr.servers { |
|||
for i := 0; i < hr.vnodeCount; i++ { |
|||
vnodeKey := vnodeKeyFor(server, i) |
|||
hash := hashKey(vnodeKey) |
|||
hr.vnodeToServer[hash] = server |
|||
hr.sortedHashes = append(hr.sortedHashes, hash) |
|||
} |
|||
} |
|||
sort.Slice(hr.sortedHashes, func(i, j int) bool { |
|||
return hr.sortedHashes[i] < hr.sortedHashes[j] |
|||
}) |
|||
} |
|||
|
|||
// search finds the first ring position >= hash.
|
|||
func (hr *HashRing) search(hash uint32) int { |
|||
idx := sort.Search(len(hr.sortedHashes), func(i int) bool { |
|||
return hr.sortedHashes[i] >= hash |
|||
}) |
|||
if idx >= len(hr.sortedHashes) { |
|||
idx = 0 // wrap around
|
|||
} |
|||
return idx |
|||
} |
|||
|
|||
func hashKey(key string) uint32 { |
|||
return crc32.ChecksumIEEE([]byte(key)) |
|||
} |
|||
|
|||
func vnodeKeyFor(server pb.ServerAddress, index int) string { |
|||
// Use a format that distributes well across the ring
|
|||
buf := make([]byte, 0, len(server)+10) |
|||
buf = append(buf, []byte(server)...) |
|||
buf = append(buf, '#') |
|||
buf = appendInt(buf, index) |
|||
return string(buf) |
|||
} |
|||
|
|||
func appendInt(buf []byte, n int) []byte { |
|||
if n == 0 { |
|||
return append(buf, '0') |
|||
} |
|||
// Simple int-to-string without importing strconv
|
|||
digits := [20]byte{} |
|||
pos := len(digits) |
|||
for n > 0 { |
|||
pos-- |
|||
digits[pos] = byte('0' + n%10) |
|||
n /= 10 |
|||
} |
|||
return append(buf, digits[pos:]...) |
|||
} |
|||
@ -0,0 +1,173 @@ |
|||
package lock_manager |
|||
|
|||
import ( |
|||
"fmt" |
|||
"math" |
|||
"testing" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestHashRing_BasicOperations(t *testing.T) { |
|||
hr := NewHashRing(50) |
|||
|
|||
// Empty ring
|
|||
p, b := hr.GetPrimaryAndBackup("key1") |
|||
assert.Equal(t, pb.ServerAddress(""), p) |
|||
assert.Equal(t, pb.ServerAddress(""), b) |
|||
|
|||
// Single server — no backup
|
|||
hr.AddServer("filer1:8888") |
|||
p, b = hr.GetPrimaryAndBackup("key1") |
|||
assert.Equal(t, pb.ServerAddress("filer1:8888"), p) |
|||
assert.Equal(t, pb.ServerAddress(""), b) |
|||
|
|||
// Two servers — backup is the other server
|
|||
hr.AddServer("filer2:8888") |
|||
p, b = hr.GetPrimaryAndBackup("key1") |
|||
assert.NotEqual(t, p, b) |
|||
assert.NotEmpty(t, b) |
|||
|
|||
// Three servers
|
|||
hr.AddServer("filer3:8888") |
|||
p, b = hr.GetPrimaryAndBackup("key1") |
|||
assert.NotEqual(t, p, b) |
|||
assert.NotEmpty(t, b) |
|||
|
|||
// Remove server
|
|||
hr.RemoveServer("filer2:8888") |
|||
assert.Equal(t, 2, hr.ServerCount()) |
|||
} |
|||
|
|||
func TestHashRing_DuplicateAddRemove(t *testing.T) { |
|||
hr := NewHashRing(50) |
|||
|
|||
hr.AddServer("filer1:8888") |
|||
hr.AddServer("filer1:8888") // duplicate
|
|||
assert.Equal(t, 1, hr.ServerCount()) |
|||
|
|||
hr.RemoveServer("filer1:8888") |
|||
assert.Equal(t, 0, hr.ServerCount()) |
|||
|
|||
hr.RemoveServer("filer1:8888") // remove non-existent
|
|||
assert.Equal(t, 0, hr.ServerCount()) |
|||
} |
|||
|
|||
func TestHashRing_SetServers(t *testing.T) { |
|||
hr := NewHashRing(50) |
|||
|
|||
hr.SetServers([]pb.ServerAddress{"a:1", "b:2", "c:3"}) |
|||
assert.Equal(t, 3, hr.ServerCount()) |
|||
|
|||
servers := hr.GetServers() |
|||
assert.Equal(t, 3, len(servers)) |
|||
|
|||
// SetServers replaces
|
|||
hr.SetServers([]pb.ServerAddress{"x:1", "y:2"}) |
|||
assert.Equal(t, 2, hr.ServerCount()) |
|||
} |
|||
|
|||
func TestHashRing_ConsistencyOnRemoval(t *testing.T) { |
|||
// The key property of consistent hashing: when a server is removed,
|
|||
// only keys that mapped to the removed server change.
|
|||
hr := NewHashRing(50) |
|||
servers := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
hr.SetServers(servers) |
|||
|
|||
numKeys := 1000 |
|||
// Record where each key maps before removal
|
|||
before := make(map[string]pb.ServerAddress, numKeys) |
|||
for i := 0; i < numKeys; i++ { |
|||
key := fmt.Sprintf("lock-key-%d", i) |
|||
before[key] = hr.GetPrimary(key) |
|||
} |
|||
|
|||
// Remove filer2
|
|||
hr.RemoveServer("filer2:8888") |
|||
|
|||
moved := 0 |
|||
for i := 0; i < numKeys; i++ { |
|||
key := fmt.Sprintf("lock-key-%d", i) |
|||
after := hr.GetPrimary(key) |
|||
if before[key] != after { |
|||
// Only keys from filer2 should move
|
|||
assert.Equal(t, pb.ServerAddress("filer2:8888"), before[key], |
|||
"key %s moved from %s to %s, but it wasn't on the removed server", key, before[key], after) |
|||
moved++ |
|||
} |
|||
} |
|||
// Roughly 1/3 of keys should move (those that were on filer2)
|
|||
t.Logf("Keys that moved: %d / %d", moved, numKeys) |
|||
assert.Greater(t, moved, 0, "some keys should have moved") |
|||
assert.Less(t, moved, numKeys, "not all keys should move") |
|||
} |
|||
|
|||
func TestHashRing_BackupIsSuccessor(t *testing.T) { |
|||
// After removing primary, the backup should become the new primary
|
|||
hr := NewHashRing(50) |
|||
servers := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
hr.SetServers(servers) |
|||
|
|||
// For each key, verify that removing the primary makes the backup the new primary
|
|||
promoted := 0 |
|||
total := 500 |
|||
for i := 0; i < total; i++ { |
|||
key := fmt.Sprintf("test-lock-%d", i) |
|||
primary, backup := hr.GetPrimaryAndBackup(key) |
|||
assert.NotEqual(t, primary, backup) |
|||
|
|||
// Temporarily remove primary
|
|||
hr.RemoveServer(primary) |
|||
newPrimary := hr.GetPrimary(key) |
|||
if newPrimary == backup { |
|||
promoted++ |
|||
} |
|||
// Restore
|
|||
hr.AddServer(primary) |
|||
} |
|||
// The backup should become new primary for all keys
|
|||
assert.Equal(t, total, promoted, |
|||
"backup should become new primary for all keys when primary is removed") |
|||
} |
|||
|
|||
func TestHashRing_Distribution(t *testing.T) { |
|||
hr := NewHashRing(50) |
|||
servers := []pb.ServerAddress{"filer1:8888", "filer2:8888", "filer3:8888"} |
|||
hr.SetServers(servers) |
|||
|
|||
counts := make(map[pb.ServerAddress]int) |
|||
numKeys := 3000 |
|||
for i := 0; i < numKeys; i++ { |
|||
key := fmt.Sprintf("dist-key-%d", i) |
|||
p := hr.GetPrimary(key) |
|||
counts[p]++ |
|||
} |
|||
|
|||
expected := float64(numKeys) / float64(len(servers)) |
|||
for server, count := range counts { |
|||
deviation := math.Abs(float64(count)-expected) / expected |
|||
t.Logf("Server %s: %d keys (%.1f%% deviation)", server, count, deviation*100) |
|||
// Allow up to 40% deviation with 50 vnodes and 3 servers
|
|||
assert.Less(t, deviation, 0.40, |
|||
"server %s has too many or too few keys: %d (expected ~%d)", server, count, int(expected)) |
|||
} |
|||
} |
|||
|
|||
func TestHashRing_GetPrimary(t *testing.T) { |
|||
hr := NewHashRing(50) |
|||
|
|||
// Empty ring
|
|||
assert.Equal(t, pb.ServerAddress(""), hr.GetPrimary("key")) |
|||
|
|||
hr.SetServers([]pb.ServerAddress{"a:1", "b:2"}) |
|||
|
|||
// Deterministic: same key always maps to same server
|
|||
p1 := hr.GetPrimary("mykey") |
|||
p2 := hr.GetPrimary("mykey") |
|||
assert.Equal(t, p1, p2) |
|||
|
|||
// GetPrimary matches the primary from GetPrimaryAndBackup
|
|||
primary, _ := hr.GetPrimaryAndBackup("mykey") |
|||
assert.Equal(t, primary, hr.GetPrimary("mykey")) |
|||
} |
|||
@ -0,0 +1,86 @@ |
|||
package lock_manager |
|||
|
|||
import ( |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/stretchr/testify/assert" |
|||
) |
|||
|
|||
func TestLockManager_GetLockOwnerIgnoresExpiredLock(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
lm.InsertLock("key1", time.Now().Add(-time.Second).UnixNano(), "token1", "owner1", 7, 3) |
|||
|
|||
owner, err := lm.GetLockOwner("key1") |
|||
assert.Empty(t, owner) |
|||
assert.ErrorIs(t, err, LockNotFound) |
|||
} |
|||
|
|||
func TestLockManager_InsertLockRejectsStaleTransfer(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
lm.InsertLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-new", "owner1", 8, 4) |
|||
lm.InsertLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-old", "owner1", 7, 3) |
|||
|
|||
lock, found := lm.GetLock("key1") |
|||
assert.True(t, found) |
|||
assert.Equal(t, "token-new", lock.Token) |
|||
assert.Equal(t, int64(8), lock.Generation) |
|||
assert.Equal(t, int64(4), lock.Seq) |
|||
} |
|||
|
|||
func TestLockManager_InsertLockAdvancesGenerationCounter(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
lm.InsertLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token1", "owner1", 12, 1) |
|||
|
|||
_, renewToken, generation, _, err := lm.Lock("key2", time.Now().Add(30*time.Second).UnixNano(), "", "owner2") |
|||
assert.NoError(t, err) |
|||
assert.NotEmpty(t, renewToken) |
|||
assert.Greater(t, generation, int64(12)) |
|||
} |
|||
|
|||
func TestLockManager_InsertBackupLockRejectsOlderGeneration(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-new", "owner1", 8, 1) |
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-old", "owner1", 7, 9) |
|||
|
|||
lock, found := lm.GetLock("key1") |
|||
assert.True(t, found) |
|||
assert.Equal(t, "token-new", lock.Token) |
|||
assert.Equal(t, int64(8), lock.Generation) |
|||
assert.Equal(t, int64(1), lock.Seq) |
|||
} |
|||
|
|||
func TestLockManager_InsertBackupLockKeepsPrimaryRole(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
ok := lm.InsertLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-old", "owner1", 8, 1) |
|||
assert.True(t, ok) |
|||
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-new", "owner1", 8, 2) |
|||
|
|||
lock, found := lm.GetLock("key1") |
|||
assert.True(t, found) |
|||
assert.False(t, lock.IsBackup) |
|||
assert.Equal(t, "token-new", lock.Token) |
|||
assert.Equal(t, int64(8), lock.Generation) |
|||
assert.Equal(t, int64(2), lock.Seq) |
|||
} |
|||
|
|||
func TestLockManager_RemoveBackupLockRejectsOlderGeneration(t *testing.T) { |
|||
lm := NewLockManager() |
|||
|
|||
lm.InsertBackupLock("key1", time.Now().Add(30*time.Second).UnixNano(), "token-new", "owner1", 8, 1) |
|||
|
|||
removed := lm.RemoveBackupLockIfSeq("key1", 7, 9) |
|||
assert.False(t, removed) |
|||
|
|||
lock, found := lm.GetLock("key1") |
|||
assert.True(t, found) |
|||
assert.Equal(t, "token-new", lock.Token) |
|||
assert.Equal(t, int64(8), lock.Generation) |
|||
assert.Equal(t, int64(1), lock.Seq) |
|||
} |
|||
@ -0,0 +1,164 @@ |
|||
package cluster |
|||
|
|||
import ( |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
|||
) |
|||
|
|||
const LockRingStabilizationInterval = 1 * time.Second |
|||
|
|||
// LockRingManager tracks filer membership for the distributed lock ring.
|
|||
// It batches rapid topology changes (e.g., node drop + join) with a
|
|||
// stabilization timer, then broadcasts the complete member list atomically
|
|||
// so filers receive a single consistent ring update instead of multiple
|
|||
// intermediate states.
|
|||
type LockRingManager struct { |
|||
mu sync.Mutex |
|||
members map[FilerGroupName]map[pb.ServerAddress]struct{} |
|||
version map[FilerGroupName]int64 |
|||
lastBroadcast map[FilerGroupName]*master_pb.LockRingUpdate |
|||
pendingTimer map[FilerGroupName]*time.Timer |
|||
broadcastFn func(resp *master_pb.KeepConnectedResponse) |
|||
stabilizeDelay time.Duration |
|||
} |
|||
|
|||
func NewLockRingManager(broadcastFn func(resp *master_pb.KeepConnectedResponse)) *LockRingManager { |
|||
return &LockRingManager{ |
|||
members: make(map[FilerGroupName]map[pb.ServerAddress]struct{}), |
|||
version: make(map[FilerGroupName]int64), |
|||
lastBroadcast: make(map[FilerGroupName]*master_pb.LockRingUpdate), |
|||
pendingTimer: make(map[FilerGroupName]*time.Timer), |
|||
broadcastFn: broadcastFn, |
|||
stabilizeDelay: LockRingStabilizationInterval, |
|||
} |
|||
} |
|||
|
|||
// AddServer records a filer joining and schedules a batched broadcast.
|
|||
func (lrm *LockRingManager) AddServer(filerGroup FilerGroupName, address pb.ServerAddress) { |
|||
lrm.mu.Lock() |
|||
defer lrm.mu.Unlock() |
|||
|
|||
if _, ok := lrm.members[filerGroup]; !ok { |
|||
lrm.members[filerGroup] = make(map[pb.ServerAddress]struct{}) |
|||
} |
|||
lrm.members[filerGroup][address] = struct{}{} |
|||
lrm.scheduleBroadcast(filerGroup) |
|||
} |
|||
|
|||
// RemoveServer records a filer leaving and schedules a batched broadcast.
|
|||
func (lrm *LockRingManager) RemoveServer(filerGroup FilerGroupName, address pb.ServerAddress) { |
|||
lrm.mu.Lock() |
|||
defer lrm.mu.Unlock() |
|||
|
|||
if members, ok := lrm.members[filerGroup]; ok { |
|||
delete(members, address) |
|||
} |
|||
lrm.scheduleBroadcast(filerGroup) |
|||
} |
|||
|
|||
// GetServers returns the current member list for a filer group.
|
|||
func (lrm *LockRingManager) GetServers(filerGroup FilerGroupName) []string { |
|||
lrm.mu.Lock() |
|||
defer lrm.mu.Unlock() |
|||
|
|||
members, ok := lrm.members[filerGroup] |
|||
if !ok { |
|||
return nil |
|||
} |
|||
servers := make([]string, 0, len(members)) |
|||
for addr := range members { |
|||
servers = append(servers, string(addr)) |
|||
} |
|||
return servers |
|||
} |
|||
|
|||
// GetVersion returns the current version for a filer group.
|
|||
func (lrm *LockRingManager) GetVersion(filerGroup FilerGroupName) int64 { |
|||
lrm.mu.Lock() |
|||
defer lrm.mu.Unlock() |
|||
return lrm.version[filerGroup] |
|||
} |
|||
|
|||
// GetLastUpdate returns a copy of the most recently broadcast lock-ring snapshot
|
|||
// for the filer group. It intentionally does not expose pending, unstabilized changes.
|
|||
func (lrm *LockRingManager) GetLastUpdate(filerGroup FilerGroupName) *master_pb.LockRingUpdate { |
|||
lrm.mu.Lock() |
|||
defer lrm.mu.Unlock() |
|||
|
|||
update, ok := lrm.lastBroadcast[filerGroup] |
|||
if !ok || update == nil { |
|||
return nil |
|||
} |
|||
cp := *update |
|||
cp.Servers = append([]string(nil), update.Servers...) |
|||
return &cp |
|||
} |
|||
|
|||
// scheduleBroadcast resets the stabilization timer. If another change arrives
|
|||
// before the timer fires, the timer resets, batching the changes.
|
|||
// Caller must hold lrm.mu.
|
|||
func (lrm *LockRingManager) scheduleBroadcast(filerGroup FilerGroupName) { |
|||
if timer, ok := lrm.pendingTimer[filerGroup]; ok { |
|||
if !timer.Stop() { |
|||
// Timer already fired, callback is running or queued.
|
|||
// It will pick up the latest state from lrm.members, so
|
|||
// just schedule a new one for any further changes.
|
|||
} |
|||
} |
|||
lrm.pendingTimer[filerGroup] = time.AfterFunc(lrm.stabilizeDelay, func() { |
|||
lrm.doBroadcast(filerGroup) |
|||
}) |
|||
} |
|||
|
|||
func (lrm *LockRingManager) doBroadcast(filerGroup FilerGroupName) { |
|||
lrm.mu.Lock() |
|||
// Use wall-clock nanoseconds so the version survives master restarts
|
|||
// without persistence — a restarted master produces a version greater
|
|||
// than any pre-restart value (assuming clocks don't jump backward).
|
|||
version := time.Now().UnixNano() |
|||
lrm.version[filerGroup] = version |
|||
servers := make([]string, 0) |
|||
if members, ok := lrm.members[filerGroup]; ok { |
|||
for addr := range members { |
|||
servers = append(servers, string(addr)) |
|||
} |
|||
} |
|||
update := &master_pb.LockRingUpdate{ |
|||
FilerGroup: string(filerGroup), |
|||
Servers: append([]string(nil), servers...), |
|||
Version: version, |
|||
} |
|||
lrm.lastBroadcast[filerGroup] = update |
|||
delete(lrm.pendingTimer, filerGroup) |
|||
lrm.mu.Unlock() |
|||
|
|||
glog.V(0).Infof("LockRing: broadcasting ring update for group %q version %d: %v", filerGroup, version, servers) |
|||
|
|||
if lrm.broadcastFn != nil { |
|||
lrm.broadcastFn(&master_pb.KeepConnectedResponse{ |
|||
LockRingUpdate: update, |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// FlushPending fires any pending timer immediately (for testing or shutdown).
|
|||
func (lrm *LockRingManager) FlushPending(filerGroup FilerGroupName) { |
|||
lrm.mu.Lock() |
|||
if timer, ok := lrm.pendingTimer[filerGroup]; ok { |
|||
if timer.Stop() { |
|||
// Timer was pending — we stopped it, so we broadcast now
|
|||
delete(lrm.pendingTimer, filerGroup) |
|||
lrm.mu.Unlock() |
|||
lrm.doBroadcast(filerGroup) |
|||
} else { |
|||
// Timer already fired, callback is running — let it finish
|
|||
lrm.mu.Unlock() |
|||
} |
|||
} else { |
|||
lrm.mu.Unlock() |
|||
} |
|||
} |
|||
@ -0,0 +1,229 @@ |
|||
package cluster |
|||
|
|||
import ( |
|||
"sync" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
func TestLockRingManager_BatchesRapidChanges(t *testing.T) { |
|||
var mu sync.Mutex |
|||
var broadcasts []*master_pb.LockRingUpdate |
|||
|
|||
lrm := NewLockRingManager(func(resp *master_pb.KeepConnectedResponse) { |
|||
mu.Lock() |
|||
if resp.LockRingUpdate != nil { |
|||
broadcasts = append(broadcasts, resp.LockRingUpdate) |
|||
} |
|||
mu.Unlock() |
|||
}) |
|||
lrm.stabilizeDelay = 100 * time.Millisecond |
|||
|
|||
group := FilerGroupName("default") |
|||
|
|||
// Add 3 servers in rapid succession
|
|||
lrm.AddServer(group, "filer1:8888") |
|||
lrm.AddServer(group, "filer2:8888") |
|||
lrm.AddServer(group, "filer3:8888") |
|||
|
|||
// No broadcast should have happened yet (timer hasn't fired)
|
|||
mu.Lock() |
|||
assert.Equal(t, 0, len(broadcasts), "should not broadcast before stabilization delay") |
|||
mu.Unlock() |
|||
|
|||
// Wait for stabilization
|
|||
time.Sleep(200 * time.Millisecond) |
|||
|
|||
mu.Lock() |
|||
require.Equal(t, 1, len(broadcasts), "should batch into a single broadcast") |
|||
assert.Equal(t, 3, len(broadcasts[0].Servers), "should include all 3 servers") |
|||
assert.Greater(t, broadcasts[0].Version, int64(0)) |
|||
mu.Unlock() |
|||
} |
|||
|
|||
func TestLockRingManager_DropAndJoinBatched(t *testing.T) { |
|||
var mu sync.Mutex |
|||
var broadcasts []*master_pb.LockRingUpdate |
|||
|
|||
lrm := NewLockRingManager(func(resp *master_pb.KeepConnectedResponse) { |
|||
mu.Lock() |
|||
if resp.LockRingUpdate != nil { |
|||
broadcasts = append(broadcasts, resp.LockRingUpdate) |
|||
} |
|||
mu.Unlock() |
|||
}) |
|||
lrm.stabilizeDelay = 100 * time.Millisecond |
|||
|
|||
group := FilerGroupName("default") |
|||
|
|||
// Set up initial state
|
|||
lrm.AddServer(group, "filer1:8888") |
|||
lrm.AddServer(group, "filer2:8888") |
|||
lrm.AddServer(group, "filer3:8888") |
|||
lrm.FlushPending(group) |
|||
|
|||
mu.Lock() |
|||
broadcasts = nil // reset
|
|||
mu.Unlock() |
|||
|
|||
// Simulate drop + join in rapid succession
|
|||
lrm.RemoveServer(group, "filer3:8888") |
|||
lrm.AddServer(group, "filer4:8888") |
|||
|
|||
// Should not have broadcast yet
|
|||
mu.Lock() |
|||
assert.Equal(t, 0, len(broadcasts)) |
|||
mu.Unlock() |
|||
|
|||
// Wait for stabilization
|
|||
time.Sleep(200 * time.Millisecond) |
|||
|
|||
mu.Lock() |
|||
require.Equal(t, 1, len(broadcasts), "drop+join should be batched into single broadcast") |
|||
servers := broadcasts[0].Servers |
|||
assert.Equal(t, 3, len(servers), "should have filer1, filer2, filer4") |
|||
// filer3 should be gone, filer4 should be present
|
|||
serverSet := make(map[string]bool) |
|||
for _, s := range servers { |
|||
serverSet[s] = true |
|||
} |
|||
assert.False(t, serverSet["filer3:8888"], "filer3 should be removed") |
|||
assert.True(t, serverSet["filer4:8888"], "filer4 should be added") |
|||
mu.Unlock() |
|||
} |
|||
|
|||
func TestLockRingManager_VersionIncrements(t *testing.T) { |
|||
var mu sync.Mutex |
|||
var broadcasts []*master_pb.LockRingUpdate |
|||
|
|||
lrm := NewLockRingManager(func(resp *master_pb.KeepConnectedResponse) { |
|||
mu.Lock() |
|||
if resp.LockRingUpdate != nil { |
|||
broadcasts = append(broadcasts, resp.LockRingUpdate) |
|||
} |
|||
mu.Unlock() |
|||
}) |
|||
lrm.stabilizeDelay = 50 * time.Millisecond |
|||
|
|||
group := FilerGroupName("default") |
|||
|
|||
lrm.AddServer(group, "filer1:8888") |
|||
time.Sleep(100 * time.Millisecond) |
|||
|
|||
lrm.AddServer(group, "filer2:8888") |
|||
time.Sleep(100 * time.Millisecond) |
|||
|
|||
mu.Lock() |
|||
require.Equal(t, 2, len(broadcasts)) |
|||
assert.Greater(t, broadcasts[0].Version, int64(0), "version should be positive") |
|||
assert.Greater(t, broadcasts[1].Version, broadcasts[0].Version, "versions should be monotonically increasing") |
|||
mu.Unlock() |
|||
} |
|||
|
|||
func TestLockRingManager_FlushPending(t *testing.T) { |
|||
var mu sync.Mutex |
|||
var broadcasts []*master_pb.LockRingUpdate |
|||
|
|||
lrm := NewLockRingManager(func(resp *master_pb.KeepConnectedResponse) { |
|||
mu.Lock() |
|||
if resp.LockRingUpdate != nil { |
|||
broadcasts = append(broadcasts, resp.LockRingUpdate) |
|||
} |
|||
mu.Unlock() |
|||
}) |
|||
lrm.stabilizeDelay = 10 * time.Second // long delay
|
|||
|
|||
group := FilerGroupName("default") |
|||
|
|||
lrm.AddServer(group, "filer1:8888") |
|||
lrm.AddServer(group, "filer2:8888") |
|||
|
|||
// Flush immediately
|
|||
lrm.FlushPending(group) |
|||
|
|||
mu.Lock() |
|||
require.Equal(t, 1, len(broadcasts)) |
|||
assert.Equal(t, 2, len(broadcasts[0].Servers)) |
|||
mu.Unlock() |
|||
} |
|||
|
|||
func TestLockRingManager_MultipleGroups(t *testing.T) { |
|||
var mu sync.Mutex |
|||
broadcastsByGroup := make(map[string][]*master_pb.LockRingUpdate) |
|||
|
|||
lrm := NewLockRingManager(func(resp *master_pb.KeepConnectedResponse) { |
|||
mu.Lock() |
|||
if resp.LockRingUpdate != nil { |
|||
broadcastsByGroup[resp.LockRingUpdate.FilerGroup] = append( |
|||
broadcastsByGroup[resp.LockRingUpdate.FilerGroup], resp.LockRingUpdate) |
|||
} |
|||
mu.Unlock() |
|||
}) |
|||
lrm.stabilizeDelay = 50 * time.Millisecond |
|||
|
|||
lrm.AddServer("group1", "filer1:8888") |
|||
lrm.AddServer("group2", "filer2:8888") |
|||
|
|||
time.Sleep(100 * time.Millisecond) |
|||
|
|||
mu.Lock() |
|||
assert.Equal(t, 1, len(broadcastsByGroup["group1"])) |
|||
assert.Equal(t, 1, len(broadcastsByGroup["group2"])) |
|||
assert.Equal(t, []string{"filer1:8888"}, broadcastsByGroup["group1"][0].Servers) |
|||
assert.Equal(t, []string{"filer2:8888"}, broadcastsByGroup["group2"][0].Servers) |
|||
mu.Unlock() |
|||
} |
|||
|
|||
func TestLockRingManager_GetServers(t *testing.T) { |
|||
lrm := NewLockRingManager(nil) |
|||
|
|||
group := FilerGroupName("default") |
|||
lrm.AddServer(group, "filer1:8888") |
|||
lrm.AddServer(group, "filer2:8888") |
|||
|
|||
servers := lrm.GetServers(group) |
|||
assert.Equal(t, 2, len(servers)) |
|||
|
|||
// Contains both
|
|||
serverSet := make(map[string]bool) |
|||
for _, s := range servers { |
|||
serverSet[s] = true |
|||
} |
|||
assert.True(t, serverSet["filer1:8888"]) |
|||
assert.True(t, serverSet["filer2:8888"]) |
|||
|
|||
// Remove one
|
|||
lrm.RemoveServer(group, "filer1:8888") |
|||
servers = lrm.GetServers(group) |
|||
assert.Equal(t, 1, len(servers)) |
|||
assert.Equal(t, "filer2:8888", servers[0]) |
|||
} |
|||
|
|||
func TestLockRingManager_NoBroadcastWithoutFn(t *testing.T) { |
|||
// No panic when broadcastFn is nil
|
|||
lrm := NewLockRingManager(nil) |
|||
lrm.stabilizeDelay = 10 * time.Millisecond |
|||
|
|||
lrm.AddServer("default", pb.ServerAddress("filer1:8888")) |
|||
time.Sleep(50 * time.Millisecond) // should not panic
|
|||
} |
|||
|
|||
func TestLockRingManager_GetLastUpdateReturnsBroadcastState(t *testing.T) { |
|||
lrm := NewLockRingManager(nil) |
|||
|
|||
group := FilerGroupName("default") |
|||
lrm.AddServer(group, "filer1:8888") |
|||
lrm.AddServer(group, "filer2:8888") |
|||
lrm.FlushPending(group) |
|||
|
|||
update := lrm.GetLastUpdate(group) |
|||
require.NotNil(t, update) |
|||
assert.Equal(t, "default", update.FilerGroup) |
|||
assert.ElementsMatch(t, []string{"filer1:8888", "filer2:8888"}, update.Servers) |
|||
assert.Greater(t, update.Version, int64(0)) |
|||
} |
|||
694
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,36 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" |
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/stretchr/testify/require" |
|||
"google.golang.org/grpc/codes" |
|||
"google.golang.org/grpc/status" |
|||
) |
|||
|
|||
func TestFindLockOwnerExpiredLockReturnsNotFound(t *testing.T) { |
|||
fs := &FilerServer{ |
|||
option: &FilerOption{Host: "filer1:8888"}, |
|||
filer: &filer.Filer{ |
|||
Dlm: lock_manager.NewDistributedLockManager("filer1:8888"), |
|||
}, |
|||
} |
|||
fs.filer.Dlm.LockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}, 0) |
|||
fs.filer.Dlm.InsertLock("expired-lock", time.Now().Add(-time.Second).UnixNano(), "token1", "owner1", 5, 2) |
|||
|
|||
resp, err := fs.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ |
|||
Name: "expired-lock", |
|||
}) |
|||
require.Nil(t, resp) |
|||
require.Error(t, err) |
|||
|
|||
st, ok := status.FromError(err) |
|||
require.True(t, ok) |
|||
require.Equal(t, codes.NotFound, st.Code()) |
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"testing" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/cluster" |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
func TestInitialLockRingUpdateReturnsLastBroadcastForFilers(t *testing.T) { |
|||
ms := &MasterServer{ |
|||
LockRingManager: cluster.NewLockRingManager(nil), |
|||
} |
|||
|
|||
ms.LockRingManager.AddServer("group-a", "filer1:8888") |
|||
ms.LockRingManager.AddServer("group-a", "filer2:8888") |
|||
ms.LockRingManager.FlushPending("group-a") |
|||
|
|||
resp := ms.initialLockRingUpdate(cluster.FilerType, "group-a") |
|||
require.NotNil(t, resp) |
|||
require.NotNil(t, resp.LockRingUpdate) |
|||
assert.Equal(t, "group-a", resp.LockRingUpdate.FilerGroup) |
|||
assert.ElementsMatch(t, []string{"filer1:8888", "filer2:8888"}, resp.LockRingUpdate.Servers) |
|||
assert.Greater(t, resp.LockRingUpdate.Version, int64(0)) |
|||
} |
|||
|
|||
func TestInitialLockRingUpdateSkipsNonFilers(t *testing.T) { |
|||
ms := &MasterServer{ |
|||
LockRingManager: cluster.NewLockRingManager(nil), |
|||
} |
|||
|
|||
ms.LockRingManager.AddServer("group-a", "filer1:8888") |
|||
ms.LockRingManager.FlushPending("group-a") |
|||
|
|||
assert.Nil(t, ms.initialLockRingUpdate(cluster.BrokerType, "group-a")) |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue