From f67ba35f4a6b0e30953483333edf12f1e491bd76 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Dec 2025 15:25:47 -0800 Subject: [PATCH] Make lock_manager.RenewInterval configurable in LiveLock (#7830) * Make lock_manager.RenewInterval configurable in LiveLock - Add renewInterval field to LiveLock struct - Modify StartLongLivedLock to accept renewInterval parameter - Update all call sites to pass lock_manager.RenewInterval - Default to lock_manager.RenewInterval if zero is passed * S3 metrics: reduce collection interval to half of bucketSizeMetricsInterval Since S3 metrics collection is not critical, check more frequently but only collect when holding the distributed lock. This allows faster detection of any issues while avoiding overhead on non-leader instances. * Remove unused lock_manager import from bucket_size_metrics.go * Refactor: Make lockTTL the primary parameter, derive renewInterval from it Instead of configurable renew interval, lockTTL is now the input parameter. The renewal interval is automatically derived as lockTTL / 2, ensuring that locks are renewed well before expiration. Changes: - Replace renewInterval parameter with lockTTL - Rename LiveLock.renewInterval field to lockTTL - Calculate renewInterval as lockTTL / 2 inside the goroutine - Update all call sites to pass lockTTL values - Simplify sleep logic to use consistent renewInterval for both states This approach is more intuitive and guarantees safe renewal windows. * When locked, renew more aggressively to actively keep the lock When holding the lock, sleep for renewInterval/2 to renew more frequently. When seeking the lock, sleep for renewInterval to retry with normal frequency. This ensures we actively maintain lock ownership while being less aggressive when competing for the lock. * Simplify: use consistent renewInterval for all lock states Since renewInterval is already lockTTL / 2, there's no need to differentiate between locked and unlocked states. Both use the same interval for consistency. * Adjust sleep intervals for different lock states - Locked instances sleep for renewInterval (lockTTL/2) to renew the lock - Unlocked instances sleep for 5*renewInterval (2.5*lockTTL) to retry acquisition less frequently --- weed/cluster/lock_client.go | 20 +++++++++++++------ weed/mq/broker/broker_server.go | 3 ++- weed/mq/kafka/gateway/coordinator_registry.go | 2 ++ weed/s3api/bucket_size_metrics.go | 2 +- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index ae3e4a8cd..b08e8f458 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -42,6 +42,7 @@ type LiveLock struct { self string lc *LockClient owner string + lockTTL time.Duration } // NewShortLivedLock creates a lock with a 5-second duration @@ -60,17 +61,24 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc } // StartLongLivedLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { +// lockTTL specifies how long the lock should be held. The renewal interval is +// automatically derived as lockTTL / 2 to ensure timely renewals. +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string), lockTTL time.Duration) (lock *LiveLock) { lock = &LiveLock{ key: key, hostFiler: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(), + expireAtNs: time.Now().Add(lockTTL).UnixNano(), grpcDialOption: lc.grpcDialOption, self: owner, lc: lc, + lockTTL: lockTTL, + } + if lock.lockTTL == 0 { + lock.lockTTL = lock_manager.LiveLockTTL } go func() { + renewInterval := lock.lockTTL / 2 isLocked := false lockOwner := "" for { @@ -83,13 +91,13 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh } if isLocked { - if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil { + if err := lock.AttemptToLock(lock.lockTTL); err != nil { glog.V(0).Infof("Lost lock %s: %v", key, err) isLocked = false atomic.StoreInt32(&lock.isLocked, 0) } } else { - if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil { + if err := lock.AttemptToLock(lock.lockTTL); err == nil { isLocked = true // Note: AttemptToLock already sets lock.isLocked atomically on success } @@ -104,9 +112,9 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh return default: if isLocked { - time.Sleep(5 * lock_manager.RenewInterval) + time.Sleep(renewInterval) } else { - time.Sleep(lock_manager.RenewInterval) + time.Sleep(5 * renewInterval) } } } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 38e022a7c..5116ff5a5 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -142,7 +143,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) newBrokerBalancerCh <- newLockOwner - }) + }, lock_manager.LiveLockTTL) mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) }() diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index eea1b1907..f0af6647a 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -12,6 +12,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -180,6 +181,7 @@ func (cr *CoordinatorRegistry) startLeaderElection() { GatewayLeaderLockKey, cr.gatewayAddress, cr.onLeadershipChange, + lock_manager.LiveLockTTL, ) // Wait for shutdown diff --git a/weed/s3api/bucket_size_metrics.go b/weed/s3api/bucket_size_metrics.go index d5acc6995..12a6afdd5 100644 --- a/weed/s3api/bucket_size_metrics.go +++ b/weed/s3api/bucket_size_metrics.go @@ -61,7 +61,7 @@ func (s3a *S3ApiServer) startBucketSizeMetricsLoop(ctx context.Context) { // Start long-lived lock - this S3 instance will only collect metrics when it holds the lock lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, func(newLockOwner string) { glog.V(1).Infof("S3 bucket size metrics lock owner changed to: %s", newLockOwner) - }) + }, bucketSizeMetricsInterval) defer lock.Stop() ticker := time.NewTicker(bucketSizeMetricsInterval)