diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go new file mode 100644 index 000000000..6a6f7a450 --- /dev/null +++ b/weed/cluster/lock_client.go @@ -0,0 +1,151 @@ +package cluster + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "time" +) + +type LockClient struct { + grpcDialOption grpc.DialOption + maxLockDuration time.Duration + sleepDuration time.Duration +} + +func NewLockClient(grpcDialOption grpc.DialOption) *LockClient { + return &LockClient{ + grpcDialOption: grpcDialOption, + maxLockDuration: 5 * time.Second, + sleepDuration: 4 * time.Millisecond, + } +} + +type LiveLock struct { + key string + renewToken string + expireAtNs int64 + filer pb.ServerAddress + cancelCh chan struct{} + grpcDialOption grpc.DialOption + isLocked bool +} + +func (lc *LockClient) NewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { + lock = &LiveLock{ + key: key, + filer: filer, + cancelCh: make(chan struct{}), + expireAtNs: time.Now().Add(lockDuration).UnixNano(), + grpcDialOption: lc.grpcDialOption, + } + var needRenewal bool + if lockDuration > lc.maxLockDuration { + lockDuration = lc.maxLockDuration + needRenewal = true + } + util.RetryForever("create lock:"+key, func() error { + errorMessage, err := lock.doLock(lockDuration) + if err != nil { + return err + } + if errorMessage != "" { + return fmt.Errorf("%v", errorMessage) + } + return nil + }, func(err error) (shouldContinue bool) { + if err != nil { + glog.Warningf("create lock %s: %s", key, err) + } + return lock.renewToken == "" + }) + + lock.isLocked = true + + if needRenewal { + go lc.keepLock(lock) + } + + return +} + +func (lock *LiveLock) IsLocked() bool { + return lock.isLocked +} + +func (lock *LiveLock) Unlock() error { + close(lock.cancelCh) + return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{ + Name: lock.key, + RenewToken: lock.renewToken, + }) + return err + }) +} + +func (lc *LockClient) keepLock(lock *LiveLock) { + for { + select { + case <-time.After(lc.sleepDuration): + // renew the lock if lock.expireAtNs is still greater than now + util.RetryForever("keep lock:"+lock.key, func() error { + lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond + if lockDuration > lc.maxLockDuration { + lockDuration = lc.maxLockDuration + } + if lockDuration <= 0 { + return nil + } + + errorMessage, err := lock.doLock(lockDuration) + if err != nil { + lock.isLocked = false + return err + } + if errorMessage != "" { + lock.isLocked = false + return fmt.Errorf("%v", errorMessage) + } + return nil + }, func(err error) (shouldContinue bool) { + if err == nil { + return false + } + glog.Warningf("keep lock %s: %v", lock.key, err) + return true + }) + if !lock.isLocked { + return + } + case <-lock.cancelCh: + return + } + } +} + +func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { + err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.Lock(context.Background(), &filer_pb.LockRequest{ + Name: lock.key, + SecondsToLock: int64(lockDuration.Seconds()), + RenewToken: lock.renewToken, + IsMoved: false, + }) + if err == nil { + lock.renewToken = resp.RenewToken + } + if resp != nil { + errorMessage = resp.Error + if resp.MovedTo != "" { + lock.filer = pb.ServerAddress(resp.MovedTo) + } + } + return err + }) + return +}