You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
1.9 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lock_manager
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "time"
  6. )
  7. type DistributedLockManager struct {
  8. lockManager *LockManager
  9. LockRing *LockRing
  10. }
  11. func NewDistributedLockManager() *DistributedLockManager {
  12. return &DistributedLockManager{
  13. lockManager: NewLockManager(),
  14. LockRing: NewLockRing(time.Second * 5),
  15. }
  16. }
  17. func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) {
  18. servers := dlm.LockRing.GetSnapshot()
  19. if servers == nil {
  20. err = fmt.Errorf("no lock server found")
  21. return
  22. }
  23. server := hashKeyToServer(key, servers)
  24. if server != host {
  25. movedTo = server
  26. return
  27. }
  28. renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token)
  29. return
  30. }
  31. func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, token string) (movedTo pb.ServerAddress, err error) {
  32. servers := dlm.LockRing.GetSnapshot()
  33. if servers == nil {
  34. err = fmt.Errorf("no lock server found")
  35. return
  36. }
  37. server := hashKeyToServer(key, servers)
  38. if server != host {
  39. movedTo = server
  40. return
  41. }
  42. _, err = dlm.lockManager.Unlock(key, token)
  43. return
  44. }
  45. // InsertLock is used to insert a lock to a server unconditionally
  46. // It is used when a server is down and the lock is moved to another server
  47. func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) {
  48. dlm.lockManager.InsertLock(key, expiredAtNs, token)
  49. }
  50. func (dlm *DistributedLockManager) SelectNotOwnedLocks(host pb.ServerAddress, servers []pb.ServerAddress) (locks []*Lock) {
  51. return dlm.lockManager.SelectLocks(func(key string) bool {
  52. server := hashKeyToServer(key, servers)
  53. return server != host
  54. })
  55. }
  56. func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  57. return hashKeyToServer(key, servers)
  58. }