From 95261a712ee272e1924939d827d18df048f2c72e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 24 Jun 2025 23:04:39 -0700 Subject: [PATCH] Improve lock ring (#6921) * fix flaky lock ring test * add more tests --- weed/cluster/lock_manager/lock_ring.go | 7 +++ weed/cluster/lock_manager/lock_ring_test.go | 67 +++++++++++++++++---- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go index a7934e1eb..a36c8e222 100644 --- a/weed/cluster/lock_manager/lock_ring.go +++ b/weed/cluster/lock_manager/lock_ring.go @@ -184,6 +184,13 @@ func (r *LockRing) WaitForCleanup() { r.cleanupWg.Wait() } +// GetSnapshotCount safely returns the number of snapshots for testing +func (r *LockRing) GetSnapshotCount() int { + r.RLock() + defer r.RUnlock() + return len(r.snapshots) +} + func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { if len(servers) == 0 { return "" diff --git a/weed/cluster/lock_manager/lock_ring_test.go b/weed/cluster/lock_manager/lock_ring_test.go index cb5c5c8d9..f82a5ffe4 100644 --- a/weed/cluster/lock_manager/lock_ring_test.go +++ b/weed/cluster/lock_manager/lock_ring_test.go @@ -10,37 +10,82 @@ import ( func TestAddServer(t *testing.T) { r := NewLockRing(100 * time.Millisecond) + + // Add servers r.AddServer("localhost:8080") - assert.Equal(t, 1, len(r.snapshots)) r.AddServer("localhost:8081") r.AddServer("localhost:8082") r.AddServer("localhost:8083") r.AddServer("localhost:8084") + + // Verify all servers are present + servers := r.GetSnapshot() + assert.Equal(t, 5, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8080")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8081")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8082")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8083")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8084")) + + // Remove servers r.RemoveServer("localhost:8084") r.RemoveServer("localhost:8082") r.RemoveServer("localhost:8080") - assert.Equal(t, 8, len(r.snapshots)) + // Wait for all cleanup operations to complete + r.WaitForCleanup() - // Wait for all cleanup operations to complete instead of using time.Sleep - time.Sleep(110 * time.Millisecond) // Still need to wait for the cleanup interval - r.WaitForCleanup() // Ensure all cleanup goroutines have finished + // Verify only 2 servers remain (localhost:8081 and localhost:8083) + servers = r.GetSnapshot() + assert.Equal(t, 2, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8081")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8083")) - assert.Equal(t, 2, len(r.snapshots)) + // Verify cleanup has happened - wait for snapshot interval and check snapshots are compacted + time.Sleep(110 * time.Millisecond) + r.WaitForCleanup() + // Verify snapshot history is cleaned up properly (should have at most 2 snapshots after compaction) + snapshotCount := r.GetSnapshotCount() + assert.LessOrEqual(t, snapshotCount, 2, "Snapshot history should be compacted") } func TestLockRing(t *testing.T) { r := NewLockRing(100 * time.Millisecond) + + // Test initial snapshot r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081"}) - assert.Equal(t, 1, len(r.snapshots)) + assert.Equal(t, 1, r.GetSnapshotCount()) + servers := r.GetSnapshot() + assert.Equal(t, 2, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8080")) + assert.Contains(t, servers, pb.ServerAddress("localhost:8081")) + + // Add another server r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082"}) - assert.Equal(t, 2, len(r.snapshots)) + assert.Equal(t, 2, r.GetSnapshotCount()) + servers = r.GetSnapshot() + assert.Equal(t, 3, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8082")) + + // Wait for cleanup interval and add another server time.Sleep(110 * time.Millisecond) + r.WaitForCleanup() r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083"}) - assert.Equal(t, 3, len(r.snapshots)) + assert.LessOrEqual(t, r.GetSnapshotCount(), 3) + servers = r.GetSnapshot() + assert.Equal(t, 4, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8083")) + + // Wait for cleanup and verify compaction time.Sleep(110 * time.Millisecond) - assert.Equal(t, 2, len(r.snapshots)) + r.WaitForCleanup() + assert.LessOrEqual(t, r.GetSnapshotCount(), 2, "Snapshots should be compacted") + + // Add final server r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083", "localhost:8084"}) - assert.Equal(t, 3, len(r.snapshots)) + servers = r.GetSnapshot() + assert.Equal(t, 5, len(servers)) + assert.Contains(t, servers, pb.ServerAddress("localhost:8084")) + assert.LessOrEqual(t, r.GetSnapshotCount(), 3) }