diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index f212f9ea0..82d6785a1 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -35,10 +35,10 @@ type LiveLock struct { filer pb.ServerAddress cancelCh chan struct{} grpcDialOption grpc.DialOption - isLocked bool - self string - lc *LockClient - owner string + isLocked bool + self string + lc *LockClient + owner string } // NewShortLivedLock creates a lock with a 5-second duration @@ -47,12 +47,12 @@ func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLoc key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(5*time.Second).UnixNano(), + expireAtNs: time.Now().Add(5 * time.Second).UnixNano(), grpcDialOption: lc.grpcDialOption, self: owner, lc: lc, } - lock.retryUntilLocked(5*time.Second) + lock.retryUntilLocked(5 * time.Second) return } @@ -62,7 +62,7 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh key: key, filer: lc.seedFiler, cancelCh: make(chan struct{}), - expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(), + expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(), grpcDialOption: lc.grpcDialOption, self: owner, lc: lc, @@ -72,12 +72,12 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh lockOwner := "" for { if isLocked { - if err := lock.AttemptToLock(lock_manager.MaxDuration); err != nil { + if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil { glog.V(0).Infof("Lost lock %s: %v", key, err) isLocked = false } } else { - if err := lock.AttemptToLock(lock_manager.MaxDuration); err == nil { + if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil { isLocked = true } } @@ -90,7 +90,7 @@ func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerCh case <-lock.cancelCh: return default: - time.Sleep(5*time.Second) + time.Sleep(lock_manager.RenewInterval) } } }() @@ -111,10 +111,12 @@ func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) { func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { errorMessage, err := lock.doLock(lockDuration) if err != nil { + glog.Warningf("lock1 %s: %v", lock.key, err) time.Sleep(time.Second) return err } if errorMessage != "" { + glog.Warningf("lock2 %s: %v", lock.key, errorMessage) time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } @@ -123,7 +125,7 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { } func (lock *LiveLock) IsLocked() bool { - return lock!=nil && lock.isLocked + return lock != nil && lock.isLocked } func (lock *LiveLock) StopShortLivedLock() error { @@ -154,8 +156,8 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e if err == nil && resp != nil { lock.renewToken = resp.RenewToken } else { - // this can be retried. Need to remember the last valid renewToken - // lock.renewToken = "" + //this can be retried. Need to remember the last valid renewToken + lock.renewToken = "" } if resp != nil { errorMessage = resp.Error diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go index 8d7a20dbb..fe2fb5213 100644 --- a/weed/cluster/lock_manager/distributed_lock_manager.go +++ b/weed/cluster/lock_manager/distributed_lock_manager.go @@ -7,7 +7,8 @@ import ( "time" ) -const MaxDuration = time.Hour * 24 * 365 * 100 +const RenewInterval = time.Second * 3 +const LiveLockTTL = time.Second * 7 var NoLockServerError = fmt.Errorf("no lock server found") diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go index a619ccbbe..ebc9dfeaa 100644 --- a/weed/cluster/lock_manager/lock_manager.go +++ b/weed/cluster/lock_manager/lock_manager.go @@ -3,8 +3,8 @@ package lock_manager import ( "fmt" "github.com/google/uuid" - "github.com/puzpuzpuz/xsync/v2" "github.com/seaweedfs/seaweedfs/weed/glog" + "sync" "time" ) @@ -16,7 +16,8 @@ var LockNotFound = fmt.Errorf("lock not found") // LockManager local lock manager, used by distributed lock manager type LockManager struct { - locks *xsync.MapOf[string, *Lock] + locks map[string]*Lock + accessLock sync.RWMutex } type Lock struct { Token string @@ -27,125 +28,155 @@ type Lock struct { func NewLockManager() *LockManager { t := &LockManager{ - locks: xsync.NewMapOf[*Lock](), + locks: make(map[string]*Lock), } go t.CleanUp() return t } func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (lockOwner, renewToken string, err error) { - lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { - if oldValue != nil { - if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() { - // lock is expired, set to a new lock - if token != "" { - err = LockErrorNonEmptyTokenOnExpiredLock - return nil, false - } else { - // new lock - renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false - } - } - // not expired - lockOwner = oldValue.Owner - if oldValue.Token == token { - // token matches, renew the lock - renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false + lm.accessLock.Lock() + defer lm.accessLock.Unlock() + + glog.V(4).Infof("lock %s %v %v %v", path, time.Unix(0, expiredAtNs), token, owner) + + if oldValue, found := lm.locks[path]; found { + if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() { + // lock is expired, set to a new lock + if token != "" { + glog.V(4).Infof("lock expired key %s non-empty token %v owner %v ts %s", path, token, owner, time.Unix(0, oldValue.ExpiredAtNs)) + err = LockErrorNonEmptyTokenOnExpiredLock + return } else { - err = LockErrorTokenMismatch - return oldValue, false + // new lock + renewToken = uuid.New().String() + glog.V(4).Infof("key %s new token %v owner %v", path, renewToken, owner) + lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner} + return } + } + // not expired + lockOwner = oldValue.Owner + if oldValue.Token == token { + // token matches, renew the lock + renewToken = uuid.New().String() + glog.V(4).Infof("key %s old token %v owner %v => %v owner %v", path, oldValue.Token, oldValue.Owner, renewToken, owner) + lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner} + return } else { if token == "" { // new lock - renewToken = uuid.New().String() - return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false - } else { - err = LockErrorNonEmptyTokenOnNewLock - return nil, false + glog.V(4).Infof("key %s locked by %v", path, oldValue.Owner) + err = fmt.Errorf("lock already owned by %v", oldValue.Owner) + return } + glog.V(4).Infof("key %s expected token %v owner %v received %v from %v", path, oldValue.Token, oldValue.Owner, token, owner) + err = fmt.Errorf("lock: token mismatch") + return } - }) - return + } else { + glog.V(4).Infof("key %s no lock owner %v", path, owner) + if token == "" { + // new lock + glog.V(4).Infof("key %s new token %v owner %v", path, token, owner) + renewToken = uuid.New().String() + lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner} + return + } else { + glog.V(4).Infof("key %s non-empty token %v owner %v", path, token, owner) + err = LockErrorNonEmptyTokenOnNewLock + return + } + } } func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) { - lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { - if oldValue != nil { - now := time.Now() - if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() { - // lock is expired, delete it - isUnlocked = true - return nil, true - } - if oldValue.Token == token { - if oldValue.ExpiredAtNs <= now.UnixNano() { - isUnlocked = true - return nil, true - } - return oldValue, false - } else { - isUnlocked = false - err = UnlockErrorTokenMismatch - return oldValue, false - } - } else { + lm.accessLock.Lock() + defer lm.accessLock.Unlock() + + if oldValue, found := lm.locks[path]; found { + now := time.Now() + if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() { + // lock is expired, delete it + isUnlocked = true + glog.V(4).Infof("key %s expired at %v", path, time.Unix(0, oldValue.ExpiredAtNs)) + delete(lm.locks, path) + return + } + if oldValue.Token == token { isUnlocked = true - return nil, true + glog.V(4).Infof("key %s unlocked with %v", path, token) + delete(lm.locks, path) + return + } else { + isUnlocked = false + err = UnlockErrorTokenMismatch + return } - }) + } + err = LockNotFound return } func (lm *LockManager) CleanUp() { + for { time.Sleep(1 * time.Minute) now := time.Now().UnixNano() - lm.locks.Range(func(key string, value *Lock) bool { + + lm.accessLock.Lock() + for key, value := range lm.locks { if value == nil { - return true + continue } if now > value.ExpiredAtNs { - lm.locks.Delete(key) - return true + glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, value.ExpiredAtNs)) + delete(lm.locks, key) } - return true - }) + } + lm.accessLock.Unlock() } } // SelectLocks takes out locks by key // if keyFn return true, the lock will be taken out func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) { + lm.accessLock.RLock() + defer lm.accessLock.RUnlock() + now := time.Now().UnixNano() - lm.locks.Range(func(key string, lock *Lock) bool { + + for key, lock := range lm.locks { if now > lock.ExpiredAtNs { - lm.locks.Delete(key) - return true + glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, lock.ExpiredAtNs)) + delete(lm.locks, key) + continue } if selectFn(key) { - lm.locks.Delete(key) + glog.V(4).Infof("key %s selected and deleted", key) + delete(lm.locks, key) lock.Key = key locks = append(locks, lock) } - return true - }) + } return } // InsertLock inserts a lock unconditionally func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) { - lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner}) + lm.accessLock.Lock() + defer lm.accessLock.Unlock() + + lm.locks[path] = &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner} } func (lm *LockManager) GetLockOwner(key string) (owner string, err error) { - lock, _ := lm.locks.Load(key) - if lock != nil { + lm.accessLock.RLock() + defer lm.accessLock.RUnlock() + + if lock, found := lm.locks[key]; found { return lock.Owner, nil } - glog.V(0).Infof("get lock %s %+v", key, lock) err = LockNotFound return } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 3a18c3971..6a9e475d1 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -95,7 +95,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial for { time.Sleep(time.Second) - if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil { + if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.RenewInterval); err != nil { glog.V(0).Infof("AttemptToLock: %v", err) } }