You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.6 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
11 months ago
1 year ago
1 year ago
2 years ago
1 year ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "time"
  12. )
  13. // DistributedLock is a grpc handler to handle FilerServer's LockRequest
  14. func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
  15. resp = &filer_pb.LockResponse{}
  16. var movedTo pb.ServerAddress
  17. expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
  18. resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
  19. glog.V(3).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
  20. if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
  21. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  22. secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  23. Name: req.Name,
  24. SecondsToLock: req.SecondsToLock,
  25. RenewToken: req.RenewToken,
  26. IsMoved: true,
  27. Owner: req.Owner,
  28. })
  29. if err == nil {
  30. resp.RenewToken = secondResp.RenewToken
  31. resp.LockOwner = secondResp.LockOwner
  32. } else {
  33. resp.Error = secondResp.Error
  34. }
  35. return err
  36. })
  37. }
  38. if err != nil {
  39. resp.Error = fmt.Sprintf("%v", err)
  40. }
  41. if movedTo != "" {
  42. resp.LockHostMovedTo = string(movedTo)
  43. }
  44. return resp, nil
  45. }
  46. // Unlock is a grpc handler to handle FilerServer's UnlockRequest
  47. func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) {
  48. resp = &filer_pb.UnlockResponse{}
  49. var movedTo pb.ServerAddress
  50. movedTo, err = fs.filer.Dlm.Unlock(req.Name, req.RenewToken)
  51. if !req.IsMoved && movedTo != "" {
  52. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  53. secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  54. Name: req.Name,
  55. RenewToken: req.RenewToken,
  56. IsMoved: true,
  57. })
  58. resp.Error = secondResp.Error
  59. return err
  60. })
  61. }
  62. if err != nil {
  63. resp.Error = fmt.Sprintf("%v", err)
  64. }
  65. if movedTo != "" {
  66. resp.MovedTo = string(movedTo)
  67. }
  68. return resp, nil
  69. }
  70. func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
  71. owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
  72. if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound {
  73. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  74. secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
  75. Name: req.Name,
  76. IsMoved: true,
  77. })
  78. if err != nil {
  79. return err
  80. }
  81. owner = secondResp.Owner
  82. return nil
  83. })
  84. if err != nil {
  85. return nil, err
  86. }
  87. }
  88. if owner == "" {
  89. glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
  90. return nil, status.Error(codes.NotFound, fmt.Sprintf("lock %s not found", req.Name))
  91. }
  92. if err != nil {
  93. return nil, status.Error(codes.Internal, err.Error())
  94. }
  95. return &filer_pb.FindLockOwnerResponse{
  96. Owner: owner,
  97. }, nil
  98. }
  99. // TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest
  100. func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) {
  101. for _, lock := range req.Locks {
  102. fs.filer.Dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken, lock.Owner)
  103. }
  104. return &filer_pb.TransferLocksResponse{}, nil
  105. }
  106. func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
  107. locks := fs.filer.Dlm.SelectNotOwnedLocks(snapshot)
  108. if len(locks) == 0 {
  109. return
  110. }
  111. for _, lock := range locks {
  112. server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot)
  113. if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  114. _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
  115. Locks: []*filer_pb.Lock{
  116. {
  117. Name: lock.Key,
  118. RenewToken: lock.Token,
  119. ExpiredAtNs: lock.ExpiredAtNs,
  120. Owner: lock.Owner,
  121. },
  122. },
  123. })
  124. return err
  125. }); err != nil {
  126. // it may not be worth retrying, since the lock may have expired
  127. glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
  128. }
  129. }
  130. }