diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index de37476a9..c76b3c9bd 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -42,7 +43,7 @@ func (lc *LockClient) NewLockWithTimeout(filer pb.ServerAddress, key string, loc // NewLock creates a lock with a very long duration func (lc *LockClient) NewLock(filer pb.ServerAddress, key string) (lock *LiveLock) { - return lc.doNewLock(filer, key, time.Duration(1<<63-1)) + return lc.doNewLock(filer, key, lock_manager.MaxDuration) } func (lc *LockClient) doNewLock(filer pb.ServerAddress, key string, lockDuration time.Duration) (lock *LiveLock) { diff --git a/weed/cluster/lock_manager/distributed_lock_manager.go b/weed/cluster/lock_manager/distributed_lock_manager.go index 7577fb830..7061dfd1a 100644 --- a/weed/cluster/lock_manager/distributed_lock_manager.go +++ b/weed/cluster/lock_manager/distributed_lock_manager.go @@ -6,21 +6,29 @@ import ( "time" ) +const MaxDuration = 1<<63 - 1 + var NoLockServerError = fmt.Errorf("no lock server found") type DistributedLockManager struct { lockManager *LockManager LockRing *LockRing + Host pb.ServerAddress } -func NewDistributedLockManager() *DistributedLockManager { +func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager { return &DistributedLockManager{ lockManager: NewLockManager(), LockRing: NewLockRing(time.Second * 5), + Host: host, } } -func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) Lock(key string, token string) (renewToken string, movedTo pb.ServerAddress, err error) { + return dlm.LockWithTimeout(key, MaxDuration, token) +} + +func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string) (renewToken string, movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { err = NoLockServerError @@ -28,7 +36,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir } server := hashKeyToServer(key, servers) - if server != host { + if server != dlm.Host { movedTo = server return } @@ -36,7 +44,7 @@ func (dlm *DistributedLockManager) Lock(host pb.ServerAddress, key string, expir return } -func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, token string) (movedTo pb.ServerAddress, err error) { +func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) { servers := dlm.LockRing.GetSnapshot() if servers == nil { err = NoLockServerError @@ -44,7 +52,7 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok } server := hashKeyToServer(key, servers) - if server != host { + if server != dlm.Host { movedTo = server return } @@ -57,12 +65,20 @@ func (dlm *DistributedLockManager) Unlock(host pb.ServerAddress, key string, tok func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string) { dlm.lockManager.InsertLock(key, expiredAtNs, token) } -func (dlm *DistributedLockManager) SelectNotOwnedLocks(host pb.ServerAddress, servers []pb.ServerAddress) (locks []*Lock) { +func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) { return dlm.lockManager.SelectLocks(func(key string) bool { server := hashKeyToServer(key, servers) - return server != host + return server != dlm.Host }) } func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress { return hashKeyToServer(key, servers) } + +func (dlm *DistributedLockManager) IsLocal(key string) bool { + servers := dlm.LockRing.GetSnapshot() + if len(servers) <= 1 { + return true + } + return hashKeyToServer(key, servers) == dlm.Host +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 7625d4bde..82b822971 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -61,7 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), UniqueFilerId: util.RandomInt32(), - Dlm: lock_manager.NewDistributedLockManager(), + Dlm: lock_manager.NewDistributedLockManager(filerHost), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 6cdd69e42..5da8d9718 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -16,7 +16,7 @@ func (fs *FilerServer) Lock(ctx context.Context, req *filer_pb.LockRequest) (res var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() - resp.RenewToken, movedTo, err = fs.filer.Dlm.Lock(fs.option.Host, req.Name, expiredAtNs, req.RenewToken) + resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken) if !req.IsMoved && movedTo != "" { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { secondResp, err := client.Lock(context.Background(), &filer_pb.LockRequest{ @@ -50,7 +50,7 @@ func (fs *FilerServer) Unlock(ctx context.Context, req *filer_pb.UnlockRequest) resp = &filer_pb.UnlockResponse{} var movedTo pb.ServerAddress - movedTo, err = fs.filer.Dlm.Unlock(fs.option.Host, req.Name, req.RenewToken) + movedTo, err = fs.filer.Dlm.Unlock(req.Name, req.RenewToken) if !req.IsMoved && movedTo != "" { err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -87,7 +87,7 @@ func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.Transfer } func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) { - locks := fs.filer.Dlm.SelectNotOwnedLocks(fs.option.Host, snapshot) + locks := fs.filer.Dlm.SelectNotOwnedLocks(snapshot) if len(locks) == 0 { return }