You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.8 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. seedFiler pb.ServerAddress
  18. }
  19. func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
  20. return &LockClient{
  21. grpcDialOption: grpcDialOption,
  22. maxLockDuration: 5 * time.Second,
  23. sleepDuration: 2473 * time.Millisecond,
  24. seedFiler: seedFiler,
  25. }
  26. }
  27. type LiveLock struct {
  28. key string
  29. renewToken string
  30. expireAtNs int64
  31. filer pb.ServerAddress
  32. cancelCh chan struct{}
  33. grpcDialOption grpc.DialOption
  34. isLocked bool
  35. self string
  36. lc *LockClient
  37. owner string
  38. }
  39. // NewShortLivedLock creates a lock with a 5-second duration
  40. func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
  41. lock = &LiveLock{
  42. key: key,
  43. filer: lc.seedFiler,
  44. cancelCh: make(chan struct{}),
  45. expireAtNs: time.Now().Add(5 * time.Second).UnixNano(),
  46. grpcDialOption: lc.grpcDialOption,
  47. self: owner,
  48. lc: lc,
  49. }
  50. lock.retryUntilLocked(5 * time.Second)
  51. return
  52. }
  53. // StartLongLivedLock starts a goroutine to lock the key and returns immediately.
  54. func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
  55. lock = &LiveLock{
  56. key: key,
  57. filer: lc.seedFiler,
  58. cancelCh: make(chan struct{}),
  59. expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(),
  60. grpcDialOption: lc.grpcDialOption,
  61. self: owner,
  62. lc: lc,
  63. }
  64. go func() {
  65. isLocked := false
  66. lockOwner := ""
  67. for {
  68. if isLocked {
  69. if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
  70. glog.V(0).Infof("Lost lock %s: %v", key, err)
  71. isLocked = false
  72. }
  73. } else {
  74. if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil {
  75. isLocked = true
  76. }
  77. }
  78. if lockOwner != lock.LockOwner() && lock.LockOwner() != "" {
  79. glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner())
  80. onLockOwnerChange(lock.LockOwner())
  81. lockOwner = lock.LockOwner()
  82. }
  83. select {
  84. case <-lock.cancelCh:
  85. return
  86. default:
  87. time.Sleep(lock_manager.RenewInterval)
  88. }
  89. }
  90. }()
  91. return
  92. }
  93. func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
  94. util.RetryUntil("create lock:"+lock.key, func() error {
  95. return lock.AttemptToLock(lockDuration)
  96. }, func(err error) (shouldContinue bool) {
  97. if err != nil {
  98. glog.Warningf("create lock %s: %s", lock.key, err)
  99. }
  100. return lock.renewToken == ""
  101. })
  102. }
  103. func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
  104. errorMessage, err := lock.doLock(lockDuration)
  105. if err != nil {
  106. glog.Warningf("lock1 %s: %v", lock.key, err)
  107. time.Sleep(time.Second)
  108. return err
  109. }
  110. if errorMessage != "" {
  111. glog.Warningf("lock2 %s: %v", lock.key, errorMessage)
  112. time.Sleep(time.Second)
  113. return fmt.Errorf("%v", errorMessage)
  114. }
  115. lock.isLocked = true
  116. return nil
  117. }
  118. func (lock *LiveLock) IsLocked() bool {
  119. return lock != nil && lock.isLocked
  120. }
  121. func (lock *LiveLock) StopShortLivedLock() error {
  122. if !lock.isLocked {
  123. return nil
  124. }
  125. defer func() {
  126. lock.isLocked = false
  127. }()
  128. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  129. _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  130. Name: lock.key,
  131. RenewToken: lock.renewToken,
  132. })
  133. return err
  134. })
  135. }
  136. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  137. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  138. resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  139. Name: lock.key,
  140. SecondsToLock: int64(lockDuration.Seconds()),
  141. RenewToken: lock.renewToken,
  142. IsMoved: false,
  143. Owner: lock.self,
  144. })
  145. if err == nil && resp != nil {
  146. lock.renewToken = resp.RenewToken
  147. } else {
  148. //this can be retried. Need to remember the last valid renewToken
  149. lock.renewToken = ""
  150. }
  151. if resp != nil {
  152. errorMessage = resp.Error
  153. if resp.LockHostMovedTo != "" {
  154. lock.filer = pb.ServerAddress(resp.LockHostMovedTo)
  155. lock.lc.seedFiler = lock.filer
  156. }
  157. if resp.LockOwner != "" {
  158. lock.owner = resp.LockOwner
  159. } else {
  160. lock.owner = ""
  161. }
  162. }
  163. return err
  164. })
  165. return
  166. }
  167. func (lock *LiveLock) LockOwner() string {
  168. return lock.owner
  169. }