You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
3.1 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "time"
  10. )
  11. // Lock is a grpc handler to handle FilerServer's LockRequest
  12. func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
  13. resp = &filer_pb.LockResponse{}
  14. snapshot := fs.filer.LockRing.GetSnapshot()
  15. if snapshot == nil {
  16. resp.Error = "no lock server found"
  17. return
  18. } else {
  19. server := lock_manager.HashKeyToServer(req.Name, snapshot)
  20. if server != fs.option.Host {
  21. resp.Error = fmt.Sprintf("not the lock server for %s", req.Name)
  22. resp.MovedTo = string(server)
  23. return
  24. }
  25. }
  26. renewToken, err := fs.dlm.Lock(req.Name, time.Duration(req.SecondsToLock)*time.Second, req.PreviousLockToken)
  27. if err != nil {
  28. resp.Error = fmt.Sprintf("%v", err)
  29. } else {
  30. resp.RenewToken = renewToken
  31. }
  32. return resp, nil
  33. }
  34. // Unlock is a grpc handler to handle FilerServer's UnlockRequest
  35. func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) {
  36. resp = &filer_pb.UnlockResponse{}
  37. snapshot := fs.filer.LockRing.GetSnapshot()
  38. if snapshot == nil {
  39. resp.Error = "no lock server found"
  40. return
  41. } else {
  42. server := lock_manager.HashKeyToServer(req.Name, snapshot)
  43. if server != fs.option.Host {
  44. resp.Error = fmt.Sprintf("not the lock server for %s", req.Name)
  45. resp.MovedTo = string(server)
  46. return
  47. }
  48. }
  49. _, err = fs.dlm.Unlock(req.Name, req.LockToken)
  50. if err != nil {
  51. resp.Error = fmt.Sprintf("%v", err)
  52. }
  53. return resp, nil
  54. }
  55. // TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest
  56. func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) {
  57. now := time.Now()
  58. for _, lock := range req.Locks {
  59. duration := time.Duration(lock.ExpirationNs - now.UnixNano())
  60. if _, err := fs.dlm.Lock(lock.Name, duration, lock.RenewToken); err != nil {
  61. glog.Errorf("receive transferred lock %v to %v: %v", lock.Name, fs.option.Host, err)
  62. return nil, err
  63. }
  64. }
  65. return &filer_pb.TransferLocksResponse{}, nil
  66. }
  67. func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
  68. locks := fs.dlm.TakeOutLocksByKey(func(key string) bool {
  69. server := lock_manager.HashKeyToServer(key, snapshot)
  70. return server != fs.option.Host
  71. })
  72. if len(locks) == 0 {
  73. return
  74. }
  75. for _, lock := range locks {
  76. server := lock_manager.HashKeyToServer(lock.Key, snapshot)
  77. if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  78. _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
  79. Locks: []*filer_pb.Lock{
  80. {
  81. Name: lock.Key,
  82. RenewToken: lock.Token,
  83. ExpirationNs: lock.ExpirationNs,
  84. },
  85. },
  86. })
  87. return err
  88. }); err != nil {
  89. // it may not be worth retrying, since the lock may have expired
  90. glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
  91. }
  92. }
  93. }