diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 3d8dae8cb..5c17e1918 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -40,57 +40,42 @@ type LiveLock struct { lc *LockClient } -// NewLock creates a lock with a very long duration -func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) { - return lc.doNewLock(key, lock_manager.MaxDuration, owner) -} - -// StartLock starts a goroutine to lock the key and returns immediately. -func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { +// NewShortLivedLock creates a lock with a 5-second duration +func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + expireAtNs: time.Now().Add(5*time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, owner: owner, lc: lc, } - go func() { - lock.CreateLock(lock_manager.MaxDuration) - lc.keepLock(lock) - }() + lock.retryUntilLocked(5*time.Second) return } -func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) { +// StartLock starts a goroutine to lock the key and returns immediately. +func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { lock = &LiveLock{ key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lockDuration).UnixNano(), + expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), grpcDialOption: lc.grpcDialOption, owner: owner, lc: lc, } - var needRenewal bool - if lockDuration > lc.maxLockDuration { - lockDuration = lc.maxLockDuration - needRenewal = true - } - - lock.CreateLock(lockDuration) - - if needRenewal { - go lc.keepLock(lock) - } - + go func() { + lock.retryUntilLocked(lock_manager.MaxDuration) + lc.keepLock(lock) + }() return } -func (lock *LiveLock) CreateLock(lockDuration time.Duration) { +func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) { util.RetryUntil("create lock:"+lock.key, func() error { - return lock.DoLock(lockDuration) + return lock.AttemptToLock(lockDuration) }, func(err error) (shouldContinue bool) { if err != nil { glog.Warningf("create lock %s: %s", lock.key, err) @@ -99,7 +84,7 @@ func (lock *LiveLock) CreateLock(lockDuration time.Duration) { }) } -func (lock *LiveLock) DoLock(lockDuration time.Duration) error { +func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { errorMessage, err := lock.doLock(lockDuration) if err != nil { time.Sleep(time.Second) @@ -117,11 +102,13 @@ func (lock *LiveLock) IsLocked() bool { return lock!=nil && lock.isLocked } -func (lock *LiveLock) StopLock() error { - close(lock.cancelCh) +func (lock *LiveLock) StopShortLivedLock() error { if !lock.isLocked { return nil } + defer func() { + lock.isLocked = false + }() return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{ Name: lock.key, diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index a009af693..3a25a9691 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -92,8 +92,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial glog.V(0).Infof("BrokerConnectToBalancer: %v", err) } time.Sleep(time.Second) - if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil { - glog.V(0).Infof("DoLock: %v", err) + if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("AttemptToLock: %v", err) } } }() diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index f32273f26..eeb031cd1 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host) - lock := lockClient.NewLock(string(fullpath), string(fs.option.Host)) - defer lock.StopLock() + lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host)) + defer lock.StopShortLivedLock() var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, fullpath)