diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 03112cb08..175718cd2 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -19,10 +19,13 @@ const ( type ExclusiveLocker struct { token int64 lockTsNs int64 - isLocked bool + isLocked atomic.Bool masterClient *wdclient.MasterClient lockName string message string + clientName string + // Each lock has and only has one goroutine + renewGoroutineRunning atomic.Bool } func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker { @@ -33,7 +36,7 @@ func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *E } func (l *ExclusiveLocker) IsLocked() bool { - return l.isLocked + return l.isLocked.Load() } func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { @@ -45,7 +48,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { } func (l *ExclusiveLocker) RequestLock(clientName string) { - if l.isLocked { + if l.isLocked.Load() { return } @@ -74,43 +77,51 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { } } - l.isLocked = true - - // start a goroutine to renew the lease - go func() { - ctx2, cancel2 := context.WithCancel(context.Background()) - defer cancel2() - - for l.isLocked { - if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ - PreviousToken: atomic.LoadInt64(&l.token), - PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), - LockName: l.lockName, - ClientName: clientName, - Message: l.message, - }) - if err == nil { - atomic.StoreInt64(&l.token, resp.Token) - atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) - // println("ts", l.lockTsNs, "token", l.token) + l.isLocked.Store(true) + l.clientName = clientName + + // Each lock has and only has one goroutine + if l.renewGoroutineRunning.CompareAndSwap(false, true) { + // start a goroutine to renew the lease + go func() { + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + + for { + if l.isLocked.Load() { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ + PreviousToken: atomic.LoadInt64(&l.token), + PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), + LockName: l.lockName, + ClientName: l.clientName, + Message: l.message, + }) + if err == nil { + atomic.StoreInt64(&l.token, resp.Token) + atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) + // println("ts", l.lockTsNs, "token", l.token) + } + return err + }); err != nil { + glog.Errorf("failed to renew lock: %v", err) + l.isLocked.Store(false) + return + } else { + time.Sleep(RenewInterval) + } + } else { + time.Sleep(RenewInterval) } - return err - }); err != nil { - glog.Errorf("failed to renew lock: %v", err) - l.isLocked = false - return - } else { - time.Sleep(RenewInterval) } - - } - }() + }() + } } func (l *ExclusiveLocker) ReleaseLock() { - l.isLocked = false + l.isLocked.Store(false) + l.clientName = "" ctx, cancel := context.WithCancel(context.Background()) defer cancel()