Browse Source

ExclusiveLocker only create one renew goroutine (#6269)

Co-authored-by: liguowei <liguowei@xinye.com>
pull/6288/head
Numblgw 1 month ago
committed by GitHub
parent
commit
e56327e3b0
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 79
      weed/wdclient/exclusive_locks/exclusive_locker.go

79
weed/wdclient/exclusive_locks/exclusive_locker.go

@ -19,10 +19,13 @@ const (
type ExclusiveLocker struct { type ExclusiveLocker struct {
token int64 token int64
lockTsNs int64 lockTsNs int64
isLocked bool
isLocked atomic.Bool
masterClient *wdclient.MasterClient masterClient *wdclient.MasterClient
lockName string lockName string
message string message string
clientName string
// Each lock has and only has one goroutine
renewGoroutineRunning atomic.Bool
} }
func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker { func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *ExclusiveLocker {
@ -33,7 +36,7 @@ func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *E
} }
func (l *ExclusiveLocker) IsLocked() bool { func (l *ExclusiveLocker) IsLocked() bool {
return l.isLocked
return l.isLocked.Load()
} }
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
@ -45,7 +48,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
} }
func (l *ExclusiveLocker) RequestLock(clientName string) { func (l *ExclusiveLocker) RequestLock(clientName string) {
if l.isLocked {
if l.isLocked.Load() {
return return
} }
@ -74,43 +77,51 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
} }
} }
l.isLocked = true
// start a goroutine to renew the lease
go func() {
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
for l.isLocked {
if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: l.lockName,
ClientName: clientName,
Message: l.message,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
// println("ts", l.lockTsNs, "token", l.token)
l.isLocked.Store(true)
l.clientName = clientName
// Each lock has and only has one goroutine
if l.renewGoroutineRunning.CompareAndSwap(false, true) {
// start a goroutine to renew the lease
go func() {
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
for {
if l.isLocked.Load() {
if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: l.lockName,
ClientName: l.clientName,
Message: l.message,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
// println("ts", l.lockTsNs, "token", l.token)
}
return err
}); err != nil {
glog.Errorf("failed to renew lock: %v", err)
l.isLocked.Store(false)
return
} else {
time.Sleep(RenewInterval)
}
} else {
time.Sleep(RenewInterval)
} }
return err
}); err != nil {
glog.Errorf("failed to renew lock: %v", err)
l.isLocked = false
return
} else {
time.Sleep(RenewInterval)
} }
}
}()
}()
}
} }
func (l *ExclusiveLocker) ReleaseLock() { func (l *ExclusiveLocker) ReleaseLock() {
l.isLocked = false
l.isLocked.Store(false)
l.clientName = ""
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

Loading…
Cancel
Save