Browse Source

Make lock_manager.RenewInterval configurable in LiveLock

- Add renewInterval field to LiveLock struct
- Modify StartLongLivedLock to accept renewInterval parameter
- Update all call sites to pass lock_manager.RenewInterval
- Default to lock_manager.RenewInterval if zero is passed
copilot/make-renew-interval-configurable-again
Chris Lu 3 weeks ago
parent
commit
e7fe274e17
  1. 11
      weed/cluster/lock_client.go
  2. 3
      weed/mq/broker/broker_server.go
  3. 2
      weed/mq/kafka/gateway/coordinator_registry.go
  4. 3
      weed/s3api/bucket_size_metrics.go

11
weed/cluster/lock_client.go

@ -42,6 +42,7 @@ type LiveLock struct {
self string
lc *LockClient
owner string
renewInterval time.Duration
}
// NewShortLivedLock creates a lock with a 5-second duration
@ -60,7 +61,7 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc
}
// StartLongLivedLock starts a goroutine to lock the key and returns immediately.
func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string), renewInterval time.Duration) (lock *LiveLock) {
lock = &LiveLock{
key: key,
hostFiler: lc.seedFiler,
@ -69,6 +70,10 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
grpcDialOption: lc.grpcDialOption,
self: owner,
lc: lc,
renewInterval: renewInterval,
}
if lock.renewInterval == 0 {
lock.renewInterval = lock_manager.RenewInterval
}
go func() {
isLocked := false
@ -104,9 +109,9 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh
return
default:
if isLocked {
time.Sleep(5 * lock_manager.RenewInterval)
time.Sleep(5 * lock.renewInterval)
} else {
time.Sleep(lock_manager.RenewInterval)
time.Sleep(lock.renewInterval)
}
}
}

3
weed/mq/broker/broker_server.go

@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@ -142,7 +143,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
newBrokerBalancerCh <- newLockOwner
})
}, lock_manager.RenewInterval)
mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh)
}()

2
weed/mq/kafka/gateway/coordinator_registry.go

@ -12,6 +12,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -180,6 +181,7 @@ func (cr *CoordinatorRegistry) startLeaderElection() {
GatewayLeaderLockKey,
cr.gatewayAddress,
cr.onLeadershipChange,
lock_manager.RenewInterval,
)
// Wait for shutdown

3
weed/s3api/bucket_size_metrics.go

@ -8,6 +8,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -61,7 +62,7 @@ func (s3a *S3ApiServer) startBucketSizeMetricsLoop(ctx context.Context) {
// Start long-lived lock - this S3 instance will only collect metrics when it holds the lock
lock := lockClient.StartLongLivedLock(s3MetricsLockName, owner, func(newLockOwner string) {
glog.V(1).Infof("S3 bucket size metrics lock owner changed to: %s", newLockOwner)
})
}, lock_manager.RenewInterval)
defer lock.Stop()
ticker := time.NewTicker(bucketSizeMetricsInterval)

Loading…
Cancel
Save