You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
5.1 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. seedFiler pb.ServerAddress
  18. }
  19. func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
  20. return &LockClient{
  21. grpcDialOption: grpcDialOption,
  22. maxLockDuration: 5 * time.Second,
  23. sleepDuration: 2473 * time.Millisecond,
  24. seedFiler: seedFiler,
  25. }
  26. }
  27. type LiveLock struct {
  28. key string
  29. renewToken string
  30. expireAtNs int64
  31. filer pb.ServerAddress
  32. cancelCh chan struct{}
  33. grpcDialOption grpc.DialOption
  34. isLocked bool
  35. owner string
  36. lc *LockClient
  37. }
  38. // NewLock creates a lock with a very long duration
  39. func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) {
  40. return lc.doNewLock(key, lock_manager.MaxDuration, owner)
  41. }
  42. // StartLock starts a goroutine to lock the key and returns immediately.
  43. func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
  44. lock = &LiveLock{
  45. key: key,
  46. filer: lc.seedFiler,
  47. cancelCh: make(chan struct{}),
  48. expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
  49. grpcDialOption: lc.grpcDialOption,
  50. owner: owner,
  51. lc: lc,
  52. }
  53. go func() {
  54. lock.CreateLock(lock_manager.MaxDuration)
  55. lc.keepLock(lock)
  56. }()
  57. return
  58. }
  59. func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) {
  60. lock = &LiveLock{
  61. key: key,
  62. filer: lc.seedFiler,
  63. cancelCh: make(chan struct{}),
  64. expireAtNs: time.Now().Add(lockDuration).UnixNano(),
  65. grpcDialOption: lc.grpcDialOption,
  66. owner: owner,
  67. lc: lc,
  68. }
  69. var needRenewal bool
  70. if lockDuration > lc.maxLockDuration {
  71. lockDuration = lc.maxLockDuration
  72. needRenewal = true
  73. }
  74. lock.CreateLock(lockDuration)
  75. if needRenewal {
  76. go lc.keepLock(lock)
  77. }
  78. return
  79. }
  80. func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
  81. util.RetryUntil("create lock:"+lock.key, func() error {
  82. return lock.DoLock(lockDuration)
  83. }, func(err error) (shouldContinue bool) {
  84. if err != nil {
  85. glog.Warningf("create lock %s: %s", lock.key, err)
  86. }
  87. return lock.renewToken == ""
  88. })
  89. }
  90. func (lock *LiveLock) DoLock(lockDuration time.Duration) error {
  91. errorMessage, err := lock.doLock(lockDuration)
  92. if err != nil {
  93. time.Sleep(time.Second)
  94. return err
  95. }
  96. if errorMessage != "" {
  97. time.Sleep(time.Second)
  98. return fmt.Errorf("%v", errorMessage)
  99. }
  100. lock.isLocked = true
  101. return nil
  102. }
  103. func (lock *LiveLock) IsLocked() bool {
  104. return lock!=nil && lock.isLocked
  105. }
  106. func (lock *LiveLock) StopLock() error {
  107. close(lock.cancelCh)
  108. if !lock.isLocked {
  109. return nil
  110. }
  111. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  112. _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  113. Name: lock.key,
  114. RenewToken: lock.renewToken,
  115. })
  116. return err
  117. })
  118. }
  119. func (lc *LockClient) keepLock(lock *LiveLock) {
  120. ticker := time.Tick(lc.sleepDuration)
  121. for {
  122. select {
  123. case <-ticker:
  124. // renew the lock if lock.expireAtNs is still greater than now
  125. util.RetryUntil("keep lock:"+lock.key, func() error {
  126. lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
  127. if lockDuration > lc.maxLockDuration {
  128. lockDuration = lc.maxLockDuration
  129. }
  130. if lockDuration <= 0 {
  131. return nil
  132. }
  133. errorMessage, err := lock.doLock(lockDuration)
  134. if err != nil {
  135. lock.isLocked = false
  136. time.Sleep(time.Second)
  137. glog.V(0).Infof("keep lock %s: %v", lock.key, err)
  138. return err
  139. }
  140. if errorMessage != "" {
  141. lock.isLocked = false
  142. time.Sleep(time.Second)
  143. glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage)
  144. return fmt.Errorf("keep lock error: %v", errorMessage)
  145. }
  146. return nil
  147. }, func(err error) (shouldContinue bool) {
  148. if err == nil {
  149. return false
  150. }
  151. glog.Warningf("keep lock %s: %v", lock.key, err)
  152. return true
  153. })
  154. if !lock.isLocked {
  155. return
  156. }
  157. case <-lock.cancelCh:
  158. return
  159. }
  160. }
  161. }
  162. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  163. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  164. resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  165. Name: lock.key,
  166. SecondsToLock: int64(lockDuration.Seconds()),
  167. RenewToken: lock.renewToken,
  168. IsMoved: false,
  169. Owner: lock.owner,
  170. })
  171. if err == nil {
  172. lock.renewToken = resp.RenewToken
  173. }
  174. if resp != nil {
  175. errorMessage = resp.Error
  176. if resp.MovedTo != "" {
  177. lock.filer = pb.ServerAddress(resp.MovedTo)
  178. lock.lc.seedFiler = lock.filer
  179. }
  180. }
  181. return err
  182. })
  183. return
  184. }