You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
2.4 KiB

  1. package shell
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/wdclient"
  9. )
  10. const (
  11. RenewInteval = 4 * time.Second
  12. SafeRenewInteval = 3 * time.Second
  13. )
  14. type ExclusiveLocker struct {
  15. masterClient *wdclient.MasterClient
  16. token int64
  17. lockTsNs int64
  18. isLocking bool
  19. }
  20. func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
  21. return &ExclusiveLocker{
  22. masterClient: masterClient,
  23. }
  24. }
  25. func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
  26. for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
  27. // wait until now is within the safe lock period, no immediate renewal to change the token
  28. time.Sleep(100 * time.Millisecond)
  29. }
  30. return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
  31. }
  32. func (l *ExclusiveLocker) Lock() {
  33. // retry to get the lease
  34. for {
  35. if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  36. resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
  37. PreviousToken: atomic.LoadInt64(&l.token),
  38. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  39. })
  40. if err == nil {
  41. atomic.StoreInt64(&l.token, resp.Token)
  42. atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
  43. }
  44. return err
  45. }); err != nil {
  46. time.Sleep(RenewInteval)
  47. } else {
  48. break
  49. }
  50. }
  51. l.isLocking = true
  52. // start a goroutine to renew the lease
  53. go func() {
  54. for l.isLocking {
  55. if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  56. resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
  57. PreviousToken: atomic.LoadInt64(&l.token),
  58. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  59. })
  60. if err == nil {
  61. atomic.StoreInt64(&l.token, resp.Token)
  62. atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
  63. }
  64. return err
  65. }); err != nil {
  66. glog.Error("failed to renew lock: %v", err)
  67. return
  68. } else {
  69. time.Sleep(RenewInteval)
  70. }
  71. }
  72. }()
  73. }
  74. func (l *ExclusiveLocker) Unlock() {
  75. l.isLocking = false
  76. l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
  77. client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
  78. PreviousToken: atomic.LoadInt64(&l.token),
  79. PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
  80. })
  81. return nil
  82. })
  83. }