138 lines
3.3 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. )
  10. /*
  11. How exclusive lock works?
  12. -----------
  13. Shell
  14. ------
  15. When shell lock,
  16. * lease an admin token (lockTime, token)
  17. * start a goroutine to renew the admin token periodically
  18. When shell unlock
  19. * stop the renewal goroutine
  20. * sends a release lock request
  21. Master
  22. ------
  23. Master maintains:
  24. * randomNumber
  25. * lastLockTime
  26. When master receives the lease/renew request from shell
  27. If lastLockTime still fresh {
  28. if is a renew and token is valid {
  29. // for renew
  30. generate the randomNumber => token
  31. return
  32. }
  33. refuse
  34. return
  35. } else {
  36. // for fresh lease request
  37. generate the randomNumber => token
  38. return
  39. }
  40. When master receives the release lock request from shell
  41. set the lastLockTime to zero
  42. The volume server does not need to verify.
  43. This makes the lock/unlock optional, similar to what golang code usually does.
  44. */
  45. const (
  46. LockDuration = 10 * time.Second
  47. )
  48. type AdminLock struct {
  49. accessSecret int64
  50. accessLockTime time.Time
  51. }
  52. type AdminLocks struct {
  53. locks map[string]*AdminLock
  54. sync.RWMutex
  55. }
  56. func NewAdminLocks() *AdminLocks {
  57. return &AdminLocks{
  58. locks: make(map[string]*AdminLock),
  59. }
  60. }
  61. func (locks *AdminLocks) isLocked(lockName string) bool {
  62. locks.RLock()
  63. defer locks.RUnlock()
  64. adminLock, found := locks.locks[lockName]
  65. if !found {
  66. return false
  67. }
  68. return adminLock.accessLockTime.Add(LockDuration).After(time.Now())
  69. }
  70. func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
  71. locks.RLock()
  72. defer locks.RUnlock()
  73. adminLock, found := locks.locks[lockName]
  74. if !found {
  75. return false
  76. }
  77. return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
  78. }
  79. func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) {
  80. locks.Lock()
  81. defer locks.Unlock()
  82. lock := &AdminLock{
  83. accessSecret: rand.Int63(),
  84. accessLockTime: time.Now(),
  85. }
  86. locks.locks[lockName] = lock
  87. return lock.accessLockTime, lock.accessSecret
  88. }
  89. func (locks *AdminLocks) deleteLock(lockName string) {
  90. locks.Lock()
  91. defer locks.Unlock()
  92. delete(locks.locks, lockName)
  93. }
  94. func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
  95. resp := &master_pb.LeaseAdminTokenResponse{}
  96. if ms.adminLocks.isLocked(req.LockName) {
  97. if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  98. // for renew
  99. ts, token := ms.adminLocks.generateToken(req.LockName)
  100. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  101. return resp, nil
  102. }
  103. // refuse since still locked
  104. return resp, fmt.Errorf("already locked")
  105. }
  106. // for fresh lease request
  107. ts, token := ms.adminLocks.generateToken(req.LockName)
  108. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  109. return resp, nil
  110. }
  111. func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
  112. resp := &master_pb.ReleaseAdminTokenResponse{}
  113. if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  114. ms.adminLocks.deleteLock(req.LockName)
  115. }
  116. return resp, nil
  117. }