You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
3.6 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. )
  11. /*
  12. How exclusive lock works?
  13. -----------
  14. Shell
  15. ------
  16. When shell lock,
  17. * lease an admin token (lockTime, token)
  18. * start a goroutine to renew the admin token periodically
  19. When shell unlock
  20. * stop the renewal goroutine
  21. * sends a release lock request
  22. Master
  23. ------
  24. Master maintains:
  25. * randomNumber
  26. * lastLockTime
  27. When master receives the lease/renew request from shell
  28. If lastLockTime still fresh {
  29. if is a renew and token is valid {
  30. // for renew
  31. generate the randomNumber => token
  32. return
  33. }
  34. refuse
  35. return
  36. } else {
  37. // for fresh lease request
  38. generate the randomNumber => token
  39. return
  40. }
  41. When master receives the release lock request from shell
  42. set the lastLockTime to zero
  43. The volume server does not need to verify.
  44. This makes the lock/unlock optional, similar to what golang code usually does.
  45. */
  46. const (
  47. LockDuration = 10 * time.Second
  48. )
  49. type AdminLock struct {
  50. accessSecret int64
  51. accessLockTime time.Time
  52. lastClient string
  53. }
  54. type AdminLocks struct {
  55. locks map[string]*AdminLock
  56. sync.RWMutex
  57. }
  58. func NewAdminLocks() *AdminLocks {
  59. return &AdminLocks{
  60. locks: make(map[string]*AdminLock),
  61. }
  62. }
  63. func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) {
  64. locks.RLock()
  65. defer locks.RUnlock()
  66. adminLock, found := locks.locks[lockName]
  67. if !found {
  68. return "", false
  69. }
  70. glog.V(4).Infof("isLocked %v", adminLock.lastClient)
  71. return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
  72. }
  73. func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
  74. locks.RLock()
  75. defer locks.RUnlock()
  76. adminLock, found := locks.locks[lockName]
  77. if !found {
  78. return false
  79. }
  80. return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
  81. }
  82. func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) {
  83. locks.Lock()
  84. defer locks.Unlock()
  85. lock := &AdminLock{
  86. accessSecret: rand.Int63(),
  87. accessLockTime: time.Now(),
  88. lastClient: clientName,
  89. }
  90. locks.locks[lockName] = lock
  91. return lock.accessLockTime, lock.accessSecret
  92. }
  93. func (locks *AdminLocks) deleteLock(lockName string) {
  94. locks.Lock()
  95. defer locks.Unlock()
  96. delete(locks.locks, lockName)
  97. }
  98. func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
  99. resp := &master_pb.LeaseAdminTokenResponse{}
  100. if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
  101. glog.V(4).Infof("LeaseAdminToken %v", lastClient)
  102. if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  103. // for renew
  104. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  105. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  106. return resp, nil
  107. }
  108. // refuse since still locked
  109. return resp, fmt.Errorf("already locked by " + lastClient)
  110. }
  111. // for fresh lease request
  112. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  113. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  114. return resp, nil
  115. }
  116. func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
  117. resp := &master_pb.ReleaseAdminTokenResponse{}
  118. if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  119. ms.adminLocks.deleteLock(req.LockName)
  120. }
  121. return resp, nil
  122. }