You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.1 KiB

  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. }
  18. func NewLockClient(grpcDialOption grpc.DialOption) *LockClient {
  19. return &LockClient{
  20. grpcDialOption: grpcDialOption,
  21. maxLockDuration: 5 * time.Second,
  22. sleepDuration: 4 * time.Millisecond,
  23. }
  24. }
  25. type LiveLock struct {
  26. key string
  27. renewToken string
  28. expireAtNs int64
  29. filer pb.ServerAddress
  30. cancelCh chan struct{}
  31. grpcDialOption grpc.DialOption
  32. isLocked bool
  33. }
  34. // NewLockWithTimeout locks the key with the given duration
  35. func (lc *LockClient) NewLockWithTimeout(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) {
  36. return lc.doNewLock(filer, key, lockDuration)
  37. }
  38. // NewLock creates a lock with a very long duration
  39. func (lc *LockClient) NewLock(filer pb.ServerAddress, key string) (lock *LiveLock) {
  40. return lc.doNewLock(filer, key, lock_manager.MaxDuration)
  41. }
  42. func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) {
  43. lock = &LiveLock{
  44. key: key,
  45. filer: filer,
  46. cancelCh: make(chan struct{}),
  47. expireAtNs: time.Now().Add(lockDuration).UnixNano(),
  48. grpcDialOption: lc.grpcDialOption,
  49. }
  50. var needRenewal bool
  51. if lockDuration > lc.maxLockDuration {
  52. lockDuration = lc.maxLockDuration
  53. needRenewal = true
  54. }
  55. util.RetryForever("create lock:"+key, func() error {
  56. errorMessage, err := lock.doLock(lockDuration)
  57. if err != nil {
  58. return err
  59. }
  60. if errorMessage != "" {
  61. return fmt.Errorf("%v", errorMessage)
  62. }
  63. return nil
  64. }, func(err error) (shouldContinue bool) {
  65. if err != nil {
  66. glog.Warningf("create lock %s: %s", key, err)
  67. }
  68. return lock.renewToken == ""
  69. })
  70. lock.isLocked = true
  71. if needRenewal {
  72. go lc.keepLock(lock)
  73. }
  74. return
  75. }
  76. func (lock *LiveLock) IsLocked() bool {
  77. return lock.isLocked
  78. }
  79. func (lock *LiveLock) Unlock() error {
  80. close(lock.cancelCh)
  81. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  82. _, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{
  83. Name: lock.key,
  84. RenewToken: lock.renewToken,
  85. })
  86. return err
  87. })
  88. }
  89. func (lc *LockClient) keepLock(lock *LiveLock) {
  90. for {
  91. select {
  92. case <-time.After(lc.sleepDuration):
  93. // renew the lock if lock.expireAtNs is still greater than now
  94. util.RetryForever("keep lock:"+lock.key, func() error {
  95. lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
  96. if lockDuration > lc.maxLockDuration {
  97. lockDuration = lc.maxLockDuration
  98. }
  99. if lockDuration <= 0 {
  100. return nil
  101. }
  102. errorMessage, err := lock.doLock(lockDuration)
  103. if err != nil {
  104. lock.isLocked = false
  105. return err
  106. }
  107. if errorMessage != "" {
  108. lock.isLocked = false
  109. return fmt.Errorf("%v", errorMessage)
  110. }
  111. return nil
  112. }, func(err error) (shouldContinue bool) {
  113. if err == nil {
  114. return false
  115. }
  116. glog.Warningf("keep lock %s: %v", lock.key, err)
  117. return true
  118. })
  119. if !lock.isLocked {
  120. return
  121. }
  122. case <-lock.cancelCh:
  123. return
  124. }
  125. }
  126. }
  127. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  128. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  129. resp, err := client.Lock(context.Background(), &filer_pb.LockRequest{
  130. Name: lock.key,
  131. SecondsToLock: int64(lockDuration.Seconds()),
  132. RenewToken: lock.renewToken,
  133. IsMoved: false,
  134. })
  135. if err == nil {
  136. lock.renewToken = resp.RenewToken
  137. }
  138. if resp != nil {
  139. errorMessage = resp.Error
  140. if resp.MovedTo != "" {
  141. lock.filer = pb.ServerAddress(resp.MovedTo)
  142. }
  143. }
  144. return err
  145. })
  146. return
  147. }