|
|
@ -31,9 +31,9 @@ func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) * |
|
|
|
type LiveLock struct { |
|
|
|
key string |
|
|
|
renewToken string |
|
|
|
expireAtNs int64 |
|
|
|
filer pb.ServerAddress |
|
|
|
cancelCh chan struct{} |
|
|
|
expireAtNs int64 |
|
|
|
hostFiler pb.ServerAddress |
|
|
|
cancelCh chan struct{} |
|
|
|
grpcDialOption grpc.DialOption |
|
|
|
isLocked bool |
|
|
|
self string |
|
|
@ -45,7 +45,7 @@ type LiveLock struct { |
|
|
|
func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) { |
|
|
|
lock = &LiveLock{ |
|
|
|
key: key, |
|
|
|
filer: lc.seedFiler, |
|
|
|
hostFiler: lc.seedFiler, |
|
|
|
cancelCh: make(chan struct{}), |
|
|
|
expireAtNs: time.Now().Add(5 * time.Second).UnixNano(), |
|
|
|
grpcDialOption: lc.grpcDialOption, |
|
|
@ -60,7 +60,7 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc |
|
|
|
func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) { |
|
|
|
lock = &LiveLock{ |
|
|
|
key: key, |
|
|
|
filer: lc.seedFiler, |
|
|
|
hostFiler: lc.seedFiler, |
|
|
|
cancelCh: make(chan struct{}), |
|
|
|
expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(), |
|
|
|
grpcDialOption: lc.grpcDialOption, |
|
|
@ -129,7 +129,7 @@ func (lock *LiveLock) StopShortLivedLock() error { |
|
|
|
defer func() { |
|
|
|
lock.isLocked = false |
|
|
|
}() |
|
|
|
return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
return pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
_, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ |
|
|
|
Name: lock.key, |
|
|
|
RenewToken: lock.renewToken, |
|
|
@ -139,7 +139,7 @@ func (lock *LiveLock) StopShortLivedLock() error { |
|
|
|
} |
|
|
|
|
|
|
|
func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { |
|
|
|
err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ |
|
|
|
Name: lock.key, |
|
|
|
SecondsToLock: int64(lockDuration.Seconds()), |
|
|
@ -156,8 +156,8 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e |
|
|
|
if resp != nil { |
|
|
|
errorMessage = resp.Error |
|
|
|
if resp.LockHostMovedTo != "" { |
|
|
|
lock.filer = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
|
lock.lc.seedFiler = lock.filer |
|
|
|
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
|
lock.lc.seedFiler = lock.hostFiler |
|
|
|
} |
|
|
|
if resp.LockOwner != "" { |
|
|
|
lock.owner = resp.LockOwner |
|
|
|