You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
3.6 KiB

  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "time"
  11. )
  12. type LockClient struct {
  13. grpcDialOption grpc.DialOption
  14. maxLockDuration time.Duration
  15. sleepDuration time.Duration
  16. }
  17. func NewLockClient(grpcDialOption grpc.DialOption) *LockClient {
  18. return &LockClient{
  19. grpcDialOption: grpcDialOption,
  20. maxLockDuration: 5 * time.Second,
  21. sleepDuration: 4 * time.Millisecond,
  22. }
  23. }
  24. type LiveLock struct {
  25. key string
  26. renewToken string
  27. expireAtNs int64
  28. filer pb.ServerAddress
  29. cancelCh chan struct{}
  30. grpcDialOption grpc.DialOption
  31. isLocked bool
  32. }
  33. func (lc *LockClient) NewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) {
  34. lock = &LiveLock{
  35. key: key,
  36. filer: filer,
  37. cancelCh: make(chan struct{}),
  38. expireAtNs: time.Now().Add(lockDuration).UnixNano(),
  39. grpcDialOption: lc.grpcDialOption,
  40. }
  41. var needRenewal bool
  42. if lockDuration > lc.maxLockDuration {
  43. lockDuration = lc.maxLockDuration
  44. needRenewal = true
  45. }
  46. util.RetryForever("create lock:"+key, func() error {
  47. errorMessage, err := lock.doLock(lockDuration)
  48. if err != nil {
  49. return err
  50. }
  51. if errorMessage != "" {
  52. return fmt.Errorf("%v", errorMessage)
  53. }
  54. return nil
  55. }, func(err error) (shouldContinue bool) {
  56. if err != nil {
  57. glog.Warningf("create lock %s: %s", key, err)
  58. }
  59. return lock.renewToken == ""
  60. })
  61. lock.isLocked = true
  62. if needRenewal {
  63. go lc.keepLock(lock)
  64. }
  65. return
  66. }
  67. func (lock *LiveLock) IsLocked() bool {
  68. return lock.isLocked
  69. }
  70. func (lock *LiveLock) Unlock() error {
  71. close(lock.cancelCh)
  72. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  73. _, err := client.Unlock(context.Background(), &filer_pb.UnlockRequest{
  74. Name: lock.key,
  75. RenewToken: lock.renewToken,
  76. })
  77. return err
  78. })
  79. }
  80. func (lc *LockClient) keepLock(lock *LiveLock) {
  81. for {
  82. select {
  83. case <-time.After(lc.sleepDuration):
  84. // renew the lock if lock.expireAtNs is still greater than now
  85. util.RetryForever("keep lock:"+lock.key, func() error {
  86. lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
  87. if lockDuration > lc.maxLockDuration {
  88. lockDuration = lc.maxLockDuration
  89. }
  90. if lockDuration <= 0 {
  91. return nil
  92. }
  93. errorMessage, err := lock.doLock(lockDuration)
  94. if err != nil {
  95. lock.isLocked = false
  96. return err
  97. }
  98. if errorMessage != "" {
  99. lock.isLocked = false
  100. return fmt.Errorf("%v", errorMessage)
  101. }
  102. return nil
  103. }, func(err error) (shouldContinue bool) {
  104. if err == nil {
  105. return false
  106. }
  107. glog.Warningf("keep lock %s: %v", lock.key, err)
  108. return true
  109. })
  110. if !lock.isLocked {
  111. return
  112. }
  113. case <-lock.cancelCh:
  114. return
  115. }
  116. }
  117. }
  118. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  119. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  120. resp, err := client.Lock(context.Background(), &filer_pb.LockRequest{
  121. Name: lock.key,
  122. SecondsToLock: int64(lockDuration.Seconds()),
  123. RenewToken: lock.renewToken,
  124. IsMoved: false,
  125. })
  126. if err == nil {
  127. lock.renewToken = resp.RenewToken
  128. }
  129. if resp != nil {
  130. errorMessage = resp.Error
  131. if resp.MovedTo != "" {
  132. lock.filer = pb.ServerAddress(resp.MovedTo)
  133. }
  134. }
  135. return err
  136. })
  137. return
  138. }