diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index ae3e4a8cd..b08e8f458 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -42,6 +42,7 @@ type LiveLock struct { self string lc *LockClient owner string + lockTTL time.Duration } // NewShortLivedLock creates a lock with a 5-second duration @@ -60,17 +61,24 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc } // StartLongLivedLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { +// lockTTL specifies how long the lock should be held. The renewal interval is +// automatically derived as lockTTL / 2 to ensure timely renewals. +func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string), lockTTL time.Duration) (lock *LiveLock) { lock = &LiveLock{ key: key, hostFiler: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(), + expireAtNs: time.Now().Add(lockTTL).UnixNano(), grpcDialOption: lc.grpcDialOption, self: owner, lc: lc, + lockTTL: lockTTL, + } + if lock.lockTTL == 0 { + lock.lockTTL = lock_manager.LiveLockTTL } go func() { + renewInterval := lock.lockTTL / 2 isLocked := false lockOwner := "" for { @@ -83,13 +91,13 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh } if isLocked { - if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil { + if err := lock.AttemptToLock(lock.lockTTL); err != nil { glog.V(0).Infof("Lost lock %s: %v", key, err) isLocked = false atomic.StoreInt32(&lock.isLocked, 0) } } else { - if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil { + if err := lock.AttemptToLock(lock.lockTTL); err == nil { isLocked = true // Note: AttemptToLock already sets lock.isLocked atomically on success } @@ -104,9 +112,9 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh return default: if isLocked { - time.Sleep(5 * lock_manager.RenewInterval) + time.Sleep(renewInterval) } else { - time.Sleep(lock_manager.RenewInterval) + time.Sleep(5 * renewInterval) } } } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 38e022a7c..5116ff5a5 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -142,7 +143,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) newBrokerBalancerCh <- newLockOwner - }) + }, lock_manager.LiveLockTTL) mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) }() diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index eea1b1907..f0af6647a 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -12,6 +12,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -180,6 +181,7 @@ func (cr *CoordinatorRegistry) startLeaderElection() { GatewayLeaderLockKey, cr.gatewayAddress, cr.onLeadershipChange, + lock_manager.LiveLockTTL, ) // Wait for shutdown diff --git a/weed/s3api/bucket_size_metrics.go b/weed/s3api/bucket_size_metrics.go index d5acc6995..12a6afdd5 100644 --- a/weed/s3api/bucket_size_metrics.go +++ b/weed/s3api/bucket_size_metrics.go @@ -61,7 +61,7 @@ func (s3a *S3ApiServer) startBucketSizeMetricsLoop(ctx context.Context) { // Start long-lived lock - this S3 instance will only collect metrics when it holds the lock lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, func(newLockOwner string) { glog.V(1).Infof("S3 bucket size metrics lock owner changed to: %s", newLockOwner) - }) + }, bucketSizeMetricsInterval) defer lock.Stop() ticker := time.NewTicker(bucketSizeMetricsInterval)