Browse Source

Make lock_manager.RenewInterval configurable in LiveLock

Co-authored-by: chrislusf <1543151+chrislusf@users.noreply.github.com>
copilot/make-renew-interval-configurable
copilot-swe-agent[bot] 3 weeks ago
parent
commit
22cb364770
  1. 12
      weed/cluster/lock_client.go
  2. 3
      weed/mq/broker/broker_server.go
  3. 2
      weed/mq/kafka/gateway/coordinator_registry.go
  4. 3
      weed/s3api/bucket_size_metrics.go

12
weed/cluster/lock_client.go

@ -42,6 +42,7 @@ type LiveLock struct {
self string
lc *LockClient
owner string
renewInterval time.Duration
}
// NewShortLivedLock creates a lock with a 5-second duration
@ -60,7 +61,11 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc
}
// StartLongLivedLock starts a goroutine to lock the key and returns immediately.
func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
func (lc *LockClient) StartLongLivedLock(key string, owner string, renewInterval time.Duration, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
// Default to lock_manager.RenewInterval if zero duration is passed
if renewInterval == 0 {
renewInterval = lock_manager.RenewInterval
}
lock = &LiveLock{
key: key,
hostFiler: lc.seedFiler,
@ -69,6 +74,7 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
renewInterval: renewInterval,
}
go func() {
isLocked := false
@ -104,9 +110,9 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
return
default:
if isLocked {
time.Sleep(5 * lock_manager.RenewInterval)
time.Sleep(5 * lock.renewInterval)
} else {
time.Sleep(lock_manager.RenewInterval)
time.Sleep(lock.renewInterval)
}
}
}

3
weed/mq/broker/broker_server.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@ -139,7 +140,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
newBrokerBalancerCh := make(chan string, 1)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), lock_manager.RenewInterval, func(newLockOwner string) {
glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
newBrokerBalancerCh <- newLockOwner
})

2
weed/mq/kafka/gateway/coordinator_registry.go

@ -12,6 +12,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -179,6 +180,7 @@ func (cr *CoordinatorRegistry) startLeaderElection() {
cr.leaderLock = cr.lockClient.StartLongLivedLock(
GatewayLeaderLockKey,
cr.gatewayAddress,
lock_manager.RenewInterval,
cr.onLeadershipChange,
)

3
weed/s3api/bucket_size_metrics.go

@ -8,6 +8,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -59,7 +60,7 @@ func (s3a *S3ApiServer) startBucketSizeMetricsLoop(ctx context.Context) {
owner := string(filer) + "-s3-metrics"
// Start long-lived lock - this S3 instance will only collect metrics when it holds the lock
lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, func(newLockOwner string) {
lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, lock_manager.RenewInterval, func(newLockOwner string) {
glog.V(1).Infof("S3 bucket size metrics lock owner changed to: %s", newLockOwner)
})
defer lock.Stop()

Loading…
Cancel
Save