From e7fe274e17d2bb10924992432f423234c1d8a3c5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Dec 2025 14:42:39 -0800 Subject: [PATCH] Make lock_manager.RenewInterval configurable in LiveLock - Add renewInterval field to LiveLock struct - Modify StartLongLivedLock to accept renewInterval parameter - Update all call sites to pass lock_manager.RenewInterval - Default to lock_manager.RenewInterval if zero is passed --- weed/cluster/lock_client.go | 11 ++++++++--- weed/mq/broker/broker_server.go | 3 ++- weed/mq/kafka/gateway/coordinator_registry.go | 2 ++ weed/s3api/bucket_size_metrics.go | 3 ++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index ae3e4a8cd..214c3e2f7 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -42,6 +42,7 @@ type LiveLock struct { self string lc *LockClient owner string + renewInterval time.Duration } // NewShortLivedLock creates a lock with a 5-second duration @@ -60,7 +61,7 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc } // StartLongLivedLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string), renewInterval time.Duration) (lock *LiveLock) { lock = &LiveLock{ key: key, hostFiler: lc.seedFiler, @@ -69,6 +70,10 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh grpcDialOption: lc.grpcDialOption, self: owner, lc: lc, + renewInterval: renewInterval, + } + if lock.renewInterval == 0 { + lock.renewInterval = lock_manager.RenewInterval } go func() { isLocked := false @@ -104,9 +109,9 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh return default: if isLocked { - time.Sleep(5 * lock_manager.RenewInterval) + time.Sleep(5 * lock.renewInterval) } else { - time.Sleep(lock_manager.RenewInterval) + time.Sleep(lock.renewInterval) } } } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 38e022a7c..02be25396 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -142,7 +143,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) newBrokerBalancerCh <- newLockOwner - }) + }, lock_manager.RenewInterval) mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) }() diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index eea1b1907..758fd8215 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -12,6 +12,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -180,6 +181,7 @@ func (cr *CoordinatorRegistry) startLeaderElection() { GatewayLeaderLockKey, cr.gatewayAddress, cr.onLeadershipChange, + lock_manager.RenewInterval, ) // Wait for shutdown diff --git a/weed/s3api/bucket_size_metrics.go b/weed/s3api/bucket_size_metrics.go index d5acc6995..d8194d0f9 100644 --- a/weed/s3api/bucket_size_metrics.go +++ b/weed/s3api/bucket_size_metrics.go @@ -8,6 +8,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -61,7 +62,7 @@ func (s3a *S3ApiServer) startBucketSizeMetricsLoop(ctx context.Context) { // Start long-lived lock - this S3 instance will only collect metrics when it holds the lock lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, func(newLockOwner string) { glog.V(1).Infof("S3 bucket size metrics lock owner changed to: %s", newLockOwner) - }) + }, lock_manager.RenewInterval) defer lock.Stop() ticker := time.NewTicker(bucketSizeMetricsInterval)