You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
2.6 KiB

5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/wdclient"
  9. )
  10. const (
  11. RenewInteval = 4 * time.Second
  12. SafeRenewInteval = 3 * time.Second
  13. InitLockInteval = 1 * time.Second
  14. )
  15. type ExclusiveLocker struct {
  16. masterClient *wdclient.MasterClient
  17. token int64
  18. lockTsNs int64
  19. isLocking bool
  20. }
  21. func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
  22. return &ExclusiveLocker{
  23. masterClient: masterClient,
  24. }
  25. }
  26. func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
  27. for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
  28. // wait until now is within the safe lock period, no immediate renewal to change the token
  29. time.Sleep(100 * time.Millisecond)
  30. }
  31. return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
  32. }
  33. func (l *ExclusiveLocker) RequestLock() {
  34. // retry to get the lease
  35. for {
  36. if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  37. resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
  38. PreviousToken: atomic.LoadInt64(&l.token),
  39. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  40. })
  41. if err == nil {
  42. atomic.StoreInt64(&l.token, resp.Token)
  43. atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
  44. }
  45. return err
  46. }); err != nil {
  47. // println("leasing problem", err.Error())
  48. time.Sleep(InitLockInteval)
  49. } else {
  50. break
  51. }
  52. }
  53. l.isLocking = true
  54. // start a goroutine to renew the lease
  55. go func() {
  56. for l.isLocking {
  57. if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  58. resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
  59. PreviousToken: atomic.LoadInt64(&l.token),
  60. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  61. })
  62. if err == nil {
  63. atomic.StoreInt64(&l.token, resp.Token)
  64. atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
  65. // println("ts", l.lockTsNs, "token", l.token)
  66. }
  67. return err
  68. }); err != nil {
  69. glog.Errorf("failed to renew lock: %v", err)
  70. return
  71. } else {
  72. time.Sleep(RenewInteval)
  73. }
  74. }
  75. }()
  76. }
  77. func (l *ExclusiveLocker) ReleaseLock() {
  78. l.isLocking = false
  79. l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  80. client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
  81. PreviousToken: atomic.LoadInt64(&l.token),
  82. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  83. })
  84. return nil
  85. })
  86. atomic.StoreInt64(&l.token, 0)
  87. atomic.StoreInt64(&l.lockTsNs, 0)
  88. }