Browse Source

refactoring

pull/3495/head
chrislu 2 years ago
parent
commit
57e7582c36
  1. 6
      weed/shell/commands.go
  2. 15
      weed/wdclient/exclusive_locks/exclusive_locker.go

6
weed/shell/commands.go

@ -73,7 +73,7 @@ func (ce *CommandEnv) isDirectory(path string) bool {
func (ce *CommandEnv) confirmIsLocked(args []string) error { func (ce *CommandEnv) confirmIsLocked(args []string) error {
if ce.locker.IsLocking() {
if ce.locker.IsLocked() {
return nil return nil
} }
ce.locker.SetMessage(fmt.Sprintf("%v", args)) ce.locker.SetMessage(fmt.Sprintf("%v", args))
@ -82,6 +82,10 @@ func (ce *CommandEnv) confirmIsLocked(args []string) error {
} }
func (ce *CommandEnv) isLocked() bool {
return ce.locker.IsLocked()
}
func (ce *CommandEnv) checkDirectory(path string) error { func (ce *CommandEnv) checkDirectory(path string) error {
dir, name := util.FullPath(path).DirAndName() dir, name := util.FullPath(path).DirAndName()

15
weed/wdclient/exclusive_locks/exclusive_locker.go

@ -19,7 +19,7 @@ const (
type ExclusiveLocker struct { type ExclusiveLocker struct {
token int64 token int64
lockTsNs int64 lockTsNs int64
isLocking bool
isLocked bool
masterClient *wdclient.MasterClient masterClient *wdclient.MasterClient
lockName string lockName string
message string message string
@ -32,8 +32,8 @@ func NewExclusiveLocker(masterClient *wdclient.MasterClient, lockName string) *E
} }
} }
func (l *ExclusiveLocker) IsLocking() bool {
return l.isLocking
func (l *ExclusiveLocker) IsLocked() bool {
return l.isLocked
} }
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
@ -45,7 +45,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
} }
func (l *ExclusiveLocker) RequestLock(clientName string) { func (l *ExclusiveLocker) RequestLock(clientName string) {
if l.isLocking {
if l.isLocked {
return return
} }
@ -74,14 +74,14 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
} }
} }
l.isLocking = true
l.isLocked = true
// start a goroutine to renew the lease // start a goroutine to renew the lease
go func() { go func() {
ctx2, cancel2 := context.WithCancel(context.Background()) ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2() defer cancel2()
for l.isLocking {
for l.isLocked {
if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token), PreviousToken: atomic.LoadInt64(&l.token),
@ -98,6 +98,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
return err return err
}); err != nil { }); err != nil {
glog.Errorf("failed to renew lock: %v", err) glog.Errorf("failed to renew lock: %v", err)
l.isLocked = false
return return
} else { } else {
time.Sleep(RenewInteval) time.Sleep(RenewInteval)
@ -109,7 +110,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
} }
func (l *ExclusiveLocker) ReleaseLock() { func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
l.isLocked = false
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

Loading…
Cancel
Save