You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
5.3 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. seedFiler pb.ServerAddress
  18. }
  19. func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
  20. return &LockClient{
  21. grpcDialOption: grpcDialOption,
  22. maxLockDuration: 5 * time.Second,
  23. sleepDuration: 2473 * time.Millisecond,
  24. seedFiler: seedFiler,
  25. }
  26. }
  27. type LiveLock struct {
  28. key string
  29. renewToken string
  30. expireAtNs int64
  31. filer pb.ServerAddress
  32. cancelCh chan struct{}
  33. grpcDialOption grpc.DialOption
  34. isLocked bool
  35. owner string
  36. lc *LockClient
  37. }
  38. // NewLock creates a lock with a very long duration
  39. func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) {
  40. return lc.doNewLock(key, lock_manager.MaxDuration, owner)
  41. }
  42. // StartLock starts a goroutine to lock the key and returns immediately.
  43. func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
  44. lock = &LiveLock{
  45. key: key,
  46. filer: lc.seedFiler,
  47. cancelCh: make(chan struct{}),
  48. expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
  49. grpcDialOption: lc.grpcDialOption,
  50. owner: owner,
  51. lc: lc,
  52. }
  53. go func() {
  54. util.RetryUntil("create lock:"+key, func() error {
  55. errorMessage, err := lock.doLock(lock_manager.MaxDuration)
  56. if err != nil {
  57. glog.V(0).Infof("create lock %s: %s", key, err)
  58. time.Sleep(time.Second)
  59. return err
  60. }
  61. if errorMessage != "" {
  62. glog.V(4).Infof("create lock %s: %s", key, errorMessage)
  63. time.Sleep(time.Second)
  64. return fmt.Errorf("%v", errorMessage)
  65. }
  66. lock.isLocked = true
  67. return nil
  68. }, func(err error) (shouldContinue bool) {
  69. if err != nil {
  70. time.Sleep(time.Second)
  71. }
  72. return lock.renewToken == ""
  73. })
  74. lc.keepLock(lock)
  75. }()
  76. return
  77. }
  78. func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) {
  79. lock = &LiveLock{
  80. key: key,
  81. filer: lc.seedFiler,
  82. cancelCh: make(chan struct{}),
  83. expireAtNs: time.Now().Add(lockDuration).UnixNano(),
  84. grpcDialOption: lc.grpcDialOption,
  85. owner: owner,
  86. lc: lc,
  87. }
  88. var needRenewal bool
  89. if lockDuration > lc.maxLockDuration {
  90. lockDuration = lc.maxLockDuration
  91. needRenewal = true
  92. }
  93. util.RetryUntil("create lock:"+key, func() error {
  94. errorMessage, err := lock.doLock(lockDuration)
  95. if err != nil {
  96. time.Sleep(time.Second)
  97. return err
  98. }
  99. if errorMessage != "" {
  100. time.Sleep(time.Second)
  101. return fmt.Errorf("%v", errorMessage)
  102. }
  103. lock.isLocked = true
  104. return nil
  105. }, func(err error) (shouldContinue bool) {
  106. if err != nil {
  107. glog.Warningf("create lock %s: %s", key, err)
  108. }
  109. return lock.renewToken == ""
  110. })
  111. if needRenewal {
  112. go lc.keepLock(lock)
  113. }
  114. return
  115. }
  116. func (lock *LiveLock) IsLocked() bool {
  117. return lock.isLocked
  118. }
  119. func (lock *LiveLock) StopLock() error {
  120. close(lock.cancelCh)
  121. if !lock.isLocked {
  122. return nil
  123. }
  124. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  125. _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  126. Name: lock.key,
  127. RenewToken: lock.renewToken,
  128. })
  129. return err
  130. })
  131. }
  132. func (lc *LockClient) keepLock(lock *LiveLock) {
  133. ticker := time.Tick(lc.sleepDuration)
  134. for {
  135. select {
  136. case <-ticker:
  137. // renew the lock if lock.expireAtNs is still greater than now
  138. util.RetryUntil("keep lock:"+lock.key, func() error {
  139. lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
  140. if lockDuration > lc.maxLockDuration {
  141. lockDuration = lc.maxLockDuration
  142. }
  143. if lockDuration <= 0 {
  144. return nil
  145. }
  146. errorMessage, err := lock.doLock(lockDuration)
  147. if err != nil {
  148. lock.isLocked = false
  149. time.Sleep(time.Second)
  150. return err
  151. }
  152. if errorMessage != "" {
  153. lock.isLocked = false
  154. time.Sleep(time.Second)
  155. return fmt.Errorf("%v", errorMessage)
  156. }
  157. return nil
  158. }, func(err error) (shouldContinue bool) {
  159. if err == nil {
  160. return false
  161. }
  162. glog.Warningf("keep lock %s: %v", lock.key, err)
  163. return true
  164. })
  165. if !lock.isLocked {
  166. return
  167. }
  168. case <-lock.cancelCh:
  169. return
  170. }
  171. }
  172. }
  173. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  174. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  175. resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  176. Name: lock.key,
  177. SecondsToLock: int64(lockDuration.Seconds()),
  178. RenewToken: lock.renewToken,
  179. IsMoved: false,
  180. Owner: lock.owner,
  181. })
  182. if err == nil {
  183. lock.renewToken = resp.RenewToken
  184. }
  185. if resp != nil {
  186. errorMessage = resp.Error
  187. if resp.MovedTo != "" {
  188. lock.filer = pb.ServerAddress(resp.MovedTo)
  189. lock.lc.seedFiler = lock.filer
  190. }
  191. }
  192. return err
  193. })
  194. return
  195. }