|
|
@ -6,21 +6,29 @@ import ( |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
const MaxDuration = 1<<63 - 1 |
|
|
|
|
|
|
|
var NoLockServerError = fmt.Errorf("no lock server found") |
|
|
|
|
|
|
|
type DistributedLockManager struct { |
|
|
|
lockManager *LockManager |
|
|
|
LockRing *LockRing |
|
|
|
Host pb.ServerAddress |
|
|
|
} |
|
|
|
|
|
|
|
func NewDistributedLockManager() *DistributedLockManager { |
|
|
|
func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager { |
|
|
|
return &DistributedLockManager{ |
|
|
|
lockManager: NewLockManager(), |
|
|
|
LockRing: NewLockRing(time.Second * 5), |
|
|
|
Host: host, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (dlm *DistributedLockManager) Lock(key string, token string) (renewToken string, movedTo pb.ServerAddress, err error) { |
|
|
|
return dlm.LockWithTimeout(key, MaxDuration, token) |
|
|
|
} |
|
|
|
|
|
|
|
func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { |
|
|
|
func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { |
|
|
|
servers := dlm.LockRing.GetSnapshot() |
|
|
|
if servers == nil { |
|
|
|
err = NoLockServerError |
|
|
@ -28,7 +36,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir |
|
|
|
} |
|
|
|
|
|
|
|
server := hashKeyToServer(key, servers) |
|
|
|
if server != host { |
|
|
|
if server != dlm.Host { |
|
|
|
movedTo = server |
|
|
|
return |
|
|
|
} |
|
|
@ -36,7 +44,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, token string) (movedTo pb.ServerAddress, err error) { |
|
|
|
func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) { |
|
|
|
servers := dlm.LockRing.GetSnapshot() |
|
|
|
if servers == nil { |
|
|
|
err = NoLockServerError |
|
|
@ -44,7 +52,7 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok |
|
|
|
} |
|
|
|
|
|
|
|
server := hashKeyToServer(key, servers) |
|
|
|
if server != host { |
|
|
|
if server != dlm.Host { |
|
|
|
movedTo = server |
|
|
|
return |
|
|
|
} |
|
|
@ -57,12 +65,20 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok |
|
|
|
func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) { |
|
|
|
dlm.lockManager.InsertLock(key, expiredAtNs, token) |
|
|
|
} |
|
|
|
func (dlm *DistributedLockManager) SelectNotOwnedLocks(host pb.ServerAddress, servers []pb.ServerAddress) (locks []*Lock) { |
|
|
|
func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) { |
|
|
|
return dlm.lockManager.SelectLocks(func(key string) bool { |
|
|
|
server := hashKeyToServer(key, servers) |
|
|
|
return server != host |
|
|
|
return server != dlm.Host |
|
|
|
}) |
|
|
|
} |
|
|
|
func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress { |
|
|
|
return hashKeyToServer(key, servers) |
|
|
|
} |
|
|
|
|
|
|
|
func (dlm *DistributedLockManager) IsLocal(key string) bool { |
|
|
|
servers := dlm.LockRing.GetSnapshot() |
|
|
|
if len(servers) <= 1 { |
|
|
|
return true |
|
|
|
} |
|
|
|
return hashKeyToServer(key, servers) == dlm.Host |
|
|
|
} |