You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
5.4 KiB

3 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/seaweedfs/raft"
  9. "github.com/seaweedfs/seaweedfs/weed/cluster"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. )
  16. /*
  17. How exclusive lock works?
  18. -----------
  19. Shell
  20. ------
  21. When shell lock,
  22. * lease an admin token (lockTime, token)
  23. * start a goroutine to renew the admin token periodically
  24. When shell unlock
  25. * stop the renewal goroutine
  26. * sends a release lock request
  27. Master
  28. ------
  29. Master maintains:
  30. * randomNumber
  31. * lastLockTime
  32. When master receives the lease/renew request from shell
  33. If lastLockTime still fresh {
  34. if is a renew and token is valid {
  35. // for renew
  36. generate the randomNumber => token
  37. return
  38. }
  39. refuse
  40. return
  41. } else {
  42. // for fresh lease request
  43. generate the randomNumber => token
  44. return
  45. }
  46. When master receives the release lock request from shell
  47. set the lastLockTime to zero
  48. The volume server does not need to verify.
  49. This makes the lock/unlock optional, similar to what golang code usually does.
  50. */
  51. const (
  52. LockDuration = 10 * time.Second
  53. )
  54. type AdminLock struct {
  55. accessSecret int64
  56. accessLockTime time.Time
  57. lastClient string
  58. lastMessage string
  59. }
  60. type AdminLocks struct {
  61. locks map[string]*AdminLock
  62. sync.RWMutex
  63. }
  64. func NewAdminLocks() *AdminLocks {
  65. return &AdminLocks{
  66. locks: make(map[string]*AdminLock),
  67. }
  68. }
  69. func (locks *AdminLocks) isLocked(lockName string) (clientName string, message string, isLocked bool) {
  70. locks.RLock()
  71. defer locks.RUnlock()
  72. adminLock, found := locks.locks[lockName]
  73. if !found {
  74. return "", "", false
  75. }
  76. glog.V(4).Infof("isLocked %v: %v", adminLock.lastClient, adminLock.lastMessage)
  77. return adminLock.lastClient, adminLock.lastMessage, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
  78. }
  79. func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
  80. locks.RLock()
  81. defer locks.RUnlock()
  82. adminLock, found := locks.locks[lockName]
  83. if !found {
  84. return false
  85. }
  86. return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
  87. }
  88. func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) {
  89. locks.Lock()
  90. defer locks.Unlock()
  91. lock := &AdminLock{
  92. accessSecret: rand.Int63(),
  93. accessLockTime: time.Now(),
  94. lastClient: clientName,
  95. }
  96. locks.locks[lockName] = lock
  97. return lock.accessLockTime, lock.accessSecret
  98. }
  99. func (locks *AdminLocks) deleteLock(lockName string) {
  100. locks.Lock()
  101. defer locks.Unlock()
  102. delete(locks.locks, lockName)
  103. }
  104. func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
  105. resp := &master_pb.LeaseAdminTokenResponse{}
  106. if !ms.Topo.IsLeader() {
  107. return resp, raft.NotLeaderError
  108. }
  109. if lastClient, lastMessage, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
  110. glog.V(4).Infof("LeaseAdminToken %v", lastClient)
  111. if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  112. // for renew
  113. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  114. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  115. return resp, nil
  116. }
  117. // refuse since still locked
  118. return resp, fmt.Errorf("already locked by %v: %v", lastClient, lastMessage)
  119. }
  120. // for fresh lease request
  121. ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
  122. resp.Token, resp.LockTsNs = token, ts.UnixNano()
  123. return resp, nil
  124. }
  125. func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
  126. resp := &master_pb.ReleaseAdminTokenResponse{}
  127. if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
  128. ms.adminLocks.deleteLock(req.LockName)
  129. }
  130. return resp, nil
  131. }
  132. func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
  133. resp = &master_pb.PingResponse{
  134. StartTimeNs: time.Now().UnixNano(),
  135. }
  136. if req.TargetType == cluster.FilerType {
  137. pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  138. pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
  139. if pingResp != nil {
  140. resp.RemoteTimeNs = pingResp.StartTimeNs
  141. }
  142. return err
  143. })
  144. }
  145. if req.TargetType == cluster.VolumeServerType {
  146. pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  147. pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
  148. if pingResp != nil {
  149. resp.RemoteTimeNs = pingResp.StartTimeNs
  150. }
  151. return err
  152. })
  153. }
  154. if req.TargetType == cluster.MasterType {
  155. pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  156. pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
  157. if pingResp != nil {
  158. resp.RemoteTimeNs = pingResp.StartTimeNs
  159. }
  160. return err
  161. })
  162. }
  163. if pingErr != nil {
  164. pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
  165. }
  166. resp.StopTimeNs = time.Now().UnixNano()
  167. return
  168. }