diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index e222807e6..3d8dae8cb 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -57,27 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) { lc: lc, } go func() { - util.RetryUntil("create lock:"+key, func() error { - errorMessage, err := lock.doLock(lock_manager.MaxDuration) - if err != nil { - glog.V(0).Infof("create lock %s: %s", key, err) - time.Sleep(time.Second) - return err - } - if errorMessage != "" { - glog.V(4).Infof("create lock %s: %s", key, errorMessage) - time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) - } - lock.isLocked = true - return nil - }, func(err error) (shouldContinue bool) { - if err != nil { - glog.Warningf("create lock %s: %s", key, err) - time.Sleep(time.Second) - } - return lock.renewToken == "" - }) + lock.CreateLock(lock_manager.MaxDuration) lc.keepLock(lock) }() return @@ -98,30 +78,39 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st lockDuration = lc.maxLockDuration needRenewal = true } - util.RetryUntil("create lock:"+key, func() error { - errorMessage, err := lock.doLock(lockDuration) - if err != nil { - time.Sleep(time.Second) - return err - } - if errorMessage != "" { - time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) - } - lock.isLocked = true - return nil + + lock.CreateLock(lockDuration) + + if needRenewal { + go lc.keepLock(lock) + } + + return +} + +func (lock *LiveLock) CreateLock(lockDuration time.Duration) { + util.RetryUntil("create lock:"+lock.key, func() error { + return lock.DoLock(lockDuration) }, func(err error) (shouldContinue bool) { if err != nil { - glog.Warningf("create lock %s: %s", key, err) + glog.Warningf("create lock %s: %s", lock.key, err) } return lock.renewToken == "" }) +} - if needRenewal { - go lc.keepLock(lock) +func (lock *LiveLock) DoLock(lockDuration time.Duration) error { + errorMessage, err := lock.doLock(lockDuration) + if err != nil { + time.Sleep(time.Second) + return err } - - return + if errorMessage != "" { + time.Sleep(time.Second) + return fmt.Errorf("%v", errorMessage) + } + lock.isLocked = true + return nil } func (lock *LiveLock) IsLocked() bool { @@ -161,12 +150,14 @@ func (lc *LockClient) keepLock(lock *LiveLock) { if err != nil { lock.isLocked = false time.Sleep(time.Second) + glog.V(0).Infof("keep lock %s: %v", lock.key, err) return err } if errorMessage != "" { lock.isLocked = false time.Sleep(time.Second) - return fmt.Errorf("%v", errorMessage) + glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage) + return fmt.Errorf("keep lock error: %v", errorMessage) } return nil }, func(err error) (shouldContinue bool) { diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index 49b951dd9..acf5b93da 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/google/uuid" "github.com/puzpuzpuz/xsync/v2" + "github.com/seaweedfs/seaweedfs/weed/glog" "time" ) @@ -11,6 +12,7 @@ var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock") var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch") var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch") +var LockNotFound = fmt.Errorf("lock not found") // LockManager local lock manager, used by distributed lock manager type LockManager struct { @@ -138,13 +140,11 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, } func (lm *LockManager) GetLockOwner(key string) (owner string, err error) { - lm.locks.Range(func(k string, lock *Lock) bool { - if k == key && lock != nil { - owner = lock.Owner - return false - } - return true - }) + lock, _ := lm.locks.Load(key) + if lock != nil { + return lock.Owner, nil + } + glog.V(0).Infof("get lock %s %+v", key, lock) + err = LockNotFound return - } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index f41ec87ca..a009af693 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,7 +1,7 @@ package broker import ( - "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -88,11 +88,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self)) for { - err := mqBroker.BrokerConnectToBalancer(string(self)) - if err != nil { - fmt.Printf("BrokerConnectToBalancer: %v\n", err) + if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil { + glog.V(0).Infof("BrokerConnectToBalancer: %v", err) } time.Sleep(time.Second) + if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("DoLock: %v", err) + } } }() diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 0d7c801f5..a7cef4032 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -81,13 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) { owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name) - if owner == "" { - glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err) - } - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - if !req.IsMoved && movedTo != "" { + if !req.IsMoved && movedTo != "" && err == lock_manager.LockNotFound { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ Name: req.Name, @@ -103,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock return nil, err } } + + if owner == "" { + glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err) + return nil, status.Error(codes.NotFound, err.Error()) + } + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &filer_pb.FindLockOwnerResponse{ Owner: owner, }, nil