You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
5.3 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
Merge accumulated changes related to message queue (#5098) * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. seedFiler pb.ServerAddress
  18. }
  19. func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
  20. return &LockClient{
  21. grpcDialOption: grpcDialOption,
  22. maxLockDuration: 5 * time.Second,
  23. sleepDuration: 2473 * time.Millisecond,
  24. seedFiler: seedFiler,
  25. }
  26. }
  27. type LiveLock struct {
  28. key string
  29. renewToken string
  30. expireAtNs int64
  31. filer pb.ServerAddress
  32. cancelCh chan struct{}
  33. grpcDialOption grpc.DialOption
  34. isLocked bool
  35. owner string
  36. lc *LockClient
  37. }
  38. // NewLock creates a lock with a very long duration
  39. func (lc *LockClient) NewLock(key string, owner string) (lock *LiveLock) {
  40. return lc.doNewLock(key, lock_manager.MaxDuration, owner)
  41. }
  42. // StartLock starts a goroutine to lock the key and returns immediately.
  43. func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
  44. lock = &LiveLock{
  45. key: key,
  46. filer: lc.seedFiler,
  47. cancelCh: make(chan struct{}),
  48. expireAtNs: time.Now().Add(lock_manager.MaxDuration).UnixNano(),
  49. grpcDialOption: lc.grpcDialOption,
  50. owner: owner,
  51. lc: lc,
  52. }
  53. go func() {
  54. util.RetryUntil("create lock:"+key, func() error {
  55. errorMessage, err := lock.doLock(lock_manager.MaxDuration)
  56. if err != nil {
  57. glog.V(0).Infof("create lock %s: %s", key, err)
  58. time.Sleep(time.Second)
  59. return err
  60. }
  61. if errorMessage != "" {
  62. glog.V(4).Infof("create lock %s: %s", key, errorMessage)
  63. time.Sleep(time.Second)
  64. return fmt.Errorf("%v", errorMessage)
  65. }
  66. lock.isLocked = true
  67. return nil
  68. }, func(err error) (shouldContinue bool) {
  69. if err != nil {
  70. glog.Warningf("create lock %s: %s", key, err)
  71. time.Sleep(time.Second)
  72. }
  73. return lock.renewToken == ""
  74. })
  75. lc.keepLock(lock)
  76. }()
  77. return
  78. }
  79. func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner string) (lock *LiveLock) {
  80. lock = &LiveLock{
  81. key: key,
  82. filer: lc.seedFiler,
  83. cancelCh: make(chan struct{}),
  84. expireAtNs: time.Now().Add(lockDuration).UnixNano(),
  85. grpcDialOption: lc.grpcDialOption,
  86. owner: owner,
  87. lc: lc,
  88. }
  89. var needRenewal bool
  90. if lockDuration > lc.maxLockDuration {
  91. lockDuration = lc.maxLockDuration
  92. needRenewal = true
  93. }
  94. util.RetryUntil("create lock:"+key, func() error {
  95. errorMessage, err := lock.doLock(lockDuration)
  96. if err != nil {
  97. time.Sleep(time.Second)
  98. return err
  99. }
  100. if errorMessage != "" {
  101. time.Sleep(time.Second)
  102. return fmt.Errorf("%v", errorMessage)
  103. }
  104. lock.isLocked = true
  105. return nil
  106. }, func(err error) (shouldContinue bool) {
  107. if err != nil {
  108. glog.Warningf("create lock %s: %s", key, err)
  109. }
  110. return lock.renewToken == ""
  111. })
  112. if needRenewal {
  113. go lc.keepLock(lock)
  114. }
  115. return
  116. }
  117. func (lock *LiveLock) IsLocked() bool {
  118. return lock!=nil && lock.isLocked
  119. }
  120. func (lock *LiveLock) StopLock() error {
  121. close(lock.cancelCh)
  122. if !lock.isLocked {
  123. return nil
  124. }
  125. return pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  126. _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  127. Name: lock.key,
  128. RenewToken: lock.renewToken,
  129. })
  130. return err
  131. })
  132. }
  133. func (lc *LockClient) keepLock(lock *LiveLock) {
  134. ticker := time.Tick(lc.sleepDuration)
  135. for {
  136. select {
  137. case <-ticker:
  138. // renew the lock if lock.expireAtNs is still greater than now
  139. util.RetryUntil("keep lock:"+lock.key, func() error {
  140. lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
  141. if lockDuration > lc.maxLockDuration {
  142. lockDuration = lc.maxLockDuration
  143. }
  144. if lockDuration <= 0 {
  145. return nil
  146. }
  147. errorMessage, err := lock.doLock(lockDuration)
  148. if err != nil {
  149. lock.isLocked = false
  150. time.Sleep(time.Second)
  151. return err
  152. }
  153. if errorMessage != "" {
  154. lock.isLocked = false
  155. time.Sleep(time.Second)
  156. return fmt.Errorf("%v", errorMessage)
  157. }
  158. return nil
  159. }, func(err error) (shouldContinue bool) {
  160. if err == nil {
  161. return false
  162. }
  163. glog.Warningf("keep lock %s: %v", lock.key, err)
  164. return true
  165. })
  166. if !lock.isLocked {
  167. return
  168. }
  169. case <-lock.cancelCh:
  170. return
  171. }
  172. }
  173. }
  174. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  175. err = pb.WithFilerClient(false, 0, lock.filer, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  176. resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  177. Name: lock.key,
  178. SecondsToLock: int64(lockDuration.Seconds()),
  179. RenewToken: lock.renewToken,
  180. IsMoved: false,
  181. Owner: lock.owner,
  182. })
  183. if err == nil {
  184. lock.renewToken = resp.RenewToken
  185. }
  186. if resp != nil {
  187. errorMessage = resp.Error
  188. if resp.MovedTo != "" {
  189. lock.filer = pb.ServerAddress(resp.MovedTo)
  190. lock.lc.seedFiler = lock.filer
  191. }
  192. }
  193. return err
  194. })
  195. return
  196. }