You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							153 lines
						
					
					
						
							4.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							153 lines
						
					
					
						
							4.6 KiB
						
					
					
				
								package weed_server
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"google.golang.org/grpc/codes"
							 | 
						|
									"google.golang.org/grpc/status"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// DistributedLock is a grpc handler to handle FilerServer's LockRequest
							 | 
						|
								func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
							 | 
						|
								
							 | 
						|
									resp = &filer_pb.LockResponse{}
							 | 
						|
								
							 | 
						|
									var movedTo pb.ServerAddress
							 | 
						|
									expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
							 | 
						|
									resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
							 | 
						|
									glog.V(4).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
							 | 
						|
									if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
							 | 
						|
										err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
											secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
							 | 
						|
												Name:          req.Name,
							 | 
						|
												SecondsToLock: req.SecondsToLock,
							 | 
						|
												RenewToken:    req.RenewToken,
							 | 
						|
												IsMoved:       true,
							 | 
						|
												Owner:         req.Owner,
							 | 
						|
											})
							 | 
						|
											if err == nil {
							 | 
						|
												resp.RenewToken = secondResp.RenewToken
							 | 
						|
												resp.LockOwner = secondResp.LockOwner
							 | 
						|
												resp.Error = secondResp.Error
							 | 
						|
											}
							 | 
						|
											return err
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										resp.Error = fmt.Sprintf("%v", err)
							 | 
						|
									}
							 | 
						|
									if movedTo != "" {
							 | 
						|
										resp.LockHostMovedTo = string(movedTo)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return resp, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Unlock is a grpc handler to handle FilerServer's UnlockRequest
							 | 
						|
								func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) {
							 | 
						|
								
							 | 
						|
									resp = &filer_pb.UnlockResponse{}
							 | 
						|
								
							 | 
						|
									var movedTo pb.ServerAddress
							 | 
						|
									movedTo, err = fs.filer.Dlm.Unlock(req.Name, req.RenewToken)
							 | 
						|
								
							 | 
						|
									if !req.IsMoved && movedTo != "" {
							 | 
						|
										err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
											secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
							 | 
						|
												Name:       req.Name,
							 | 
						|
												RenewToken: req.RenewToken,
							 | 
						|
												IsMoved:    true,
							 | 
						|
											})
							 | 
						|
											resp.Error = secondResp.Error
							 | 
						|
											return err
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err != nil {
							 | 
						|
										resp.Error = fmt.Sprintf("%v", err)
							 | 
						|
									}
							 | 
						|
									if movedTo != "" {
							 | 
						|
										resp.MovedTo = string(movedTo)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return resp, nil
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
							 | 
						|
									owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
							 | 
						|
									if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound {
							 | 
						|
										err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
											secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
							 | 
						|
												Name:    req.Name,
							 | 
						|
												IsMoved: true,
							 | 
						|
											})
							 | 
						|
											if err != nil {
							 | 
						|
												return err
							 | 
						|
											}
							 | 
						|
											owner = secondResp.Owner
							 | 
						|
											return nil
							 | 
						|
										})
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, err
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if owner == "" {
							 | 
						|
										glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
							 | 
						|
										return nil, status.Error(codes.NotFound, fmt.Sprintf("lock %s not found", req.Name))
							 | 
						|
									}
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, status.Error(codes.Internal, err.Error())
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &filer_pb.FindLockOwnerResponse{
							 | 
						|
										Owner: owner,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest
							 | 
						|
								func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) {
							 | 
						|
								
							 | 
						|
									for _, lock := range req.Locks {
							 | 
						|
										fs.filer.Dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken, lock.Owner)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &filer_pb.TransferLocksResponse{}, nil
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
							 | 
						|
									locks := fs.filer.Dlm.SelectNotOwnedLocks(snapshot)
							 | 
						|
									if len(locks) == 0 {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									for _, lock := range locks {
							 | 
						|
										server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot)
							 | 
						|
										if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
											_, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
							 | 
						|
												Locks: []*filer_pb.Lock{
							 | 
						|
													{
							 | 
						|
														Name:        lock.Key,
							 | 
						|
														RenewToken:  lock.Token,
							 | 
						|
														ExpiredAtNs: lock.ExpiredAtNs,
							 | 
						|
														Owner:       lock.Owner,
							 | 
						|
													},
							 | 
						|
												},
							 | 
						|
											})
							 | 
						|
											return err
							 | 
						|
										}); err != nil {
							 | 
						|
											// it may not be worth retrying, since the lock may have expired
							 | 
						|
											glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
								}
							 |