You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
5.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/cluster"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "math/rand"
  11. "sync"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. )
  15. /*
  16. How exclusive lock works?
  17. -----------
  18. Shell
  19. ------
  20. When shell lock,
  21. * lease an admin token (lockTime, token)
  22. * start a goroutine to renew the admin token periodically
  23. When shell unlock
  24. * stop the renewal goroutine
  25. * sends a release lock request
  26. Master
  27. ------
  28. Master maintains:
  29. * randomNumber
  30. * lastLockTime
  31. When master receives the lease/renew request from shell
  32. If lastLockTime still fresh {
  33. if is a renew and token is valid {
  34. // for renew
  35. generate the randomNumber => token
  36. return
  37. }
  38. refuse
  39. return
  40. } else {
  41. // for fresh lease request
  42. generate the randomNumber => token
  43. return
  44. }
  45. When master receives the release lock request from shell
  46. set the lastLockTime to zero
  47. The volume server does not need to verify.
  48. This makes the lock/unlock optional, similar to what golang code usually does.
  49. */
  50. const (
  51. LockDuration = 10 * time.Second
  52. )
  53. type AdminLock struct {
  54. accessSecret int64
  55. accessLockTime time.Time
  56. lastClient string
  57. lastMessage string
  58. }
  59. type AdminLocks struct {
  60. locks map[string]*AdminLock
  61. sync.RWMutex
  62. }
  63. func NewAdminLocks() *AdminLocks {
  64. return &AdminLocks{
  65. locks: make(map[string]*AdminLock),
  66. }
  67. }
  68. func (locks *AdminLocks) isLocked(lockName string) (clientName string, message string, isLocked bool) {
  69. locks.RLock()
  70. defer locks.RUnlock()
  71. adminLock, found := locks.locks[lockName]
  72. if !found {
  73. return "", "", false
  74. }
  75. glog.V(4).Infof("isLocked %v: %v", adminLock.lastClient, adminLock.lastMessage)
  76. return adminLock.lastClient, adminLock.lastMessage, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
  77. }
  78. func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
  79. locks.RLock()
  80. defer locks.RUnlock()
  81. adminLock, found := locks.locks[lockName]
  82. if !found {
  83. return false
  84. }
  85. return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
  86. }
  87. func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) {
  88. locks.Lock()
  89. defer locks.Unlock()
  90. lock := &AdminLock{
  91. accessSecret: rand.Int63(),
  92. accessLockTime: time.Now(),
  93. lastClient: clientName,
  94. }
  95. locks.locks[lockName] = lock
  96. return lock.accessLockTime, lock.accessSecret
  97. }
  98. func (locks *AdminLocks) deleteLock(lockName string) {
  99. locks.Lock()
  100. defer locks.Unlock()
  101. delete(locks.locks, lockName)
  102. }
  103. func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
  104. resp := &master_pb.LeaseAdminTokenResponse{}
  105. if lastClient, lastMessage, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
  106. glog.V(4).Infof("LeaseAdminToken %v", lastClient)
  107. if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  108. // for renew
  109. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  110. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  111. return resp, nil
  112. }
  113. // refuse since still locked
  114. return resp, fmt.Errorf("already locked by %v: %v", lastClient, lastMessage)
  115. }
  116. // for fresh lease request
  117. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  118. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  119. return resp, nil
  120. }
  121. func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
  122. resp := &master_pb.ReleaseAdminTokenResponse{}
  123. if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  124. ms.adminLocks.deleteLock(req.LockName)
  125. }
  126. return resp, nil
  127. }
  128. func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
  129. resp = &master_pb.PingResponse{}
  130. if req.TargetType == cluster.FilerType {
  131. pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  132. _, err := client.Ping(ctx, &filer_pb.PingRequest{})
  133. return err
  134. })
  135. }
  136. if req.TargetType == cluster.VolumeServerType {
  137. pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  138. _, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
  139. return err
  140. })
  141. }
  142. if req.TargetType == cluster.MasterType {
  143. pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
  144. _, err := client.Ping(ctx, &master_pb.PingRequest{})
  145. return err
  146. })
  147. }
  148. if pingErr != nil {
  149. pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
  150. }
  151. return
  152. }