You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							121 lines
						
					
					
						
							3.1 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							121 lines
						
					
					
						
							3.1 KiB
						
					
					
				| package exclusive_locks | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"sync/atomic" | |
| 	"time" | |
| 
 | |
| 	"github.com/chrislusf/seaweedfs/weed/glog" | |
| 	"github.com/chrislusf/seaweedfs/weed/pb/master_pb" | |
| 	"github.com/chrislusf/seaweedfs/weed/wdclient" | |
| ) | |
| 
 | |
| const ( | |
| 	RenewInteval     = 4 * time.Second | |
| 	SafeRenewInteval = 3 * time.Second | |
| 	InitLockInteval  = 1 * time.Second | |
| 	AdminLockName    = "admin" | |
| ) | |
| 
 | |
| type ExclusiveLocker struct { | |
| 	masterClient *wdclient.MasterClient | |
| 	token        int64 | |
| 	lockTsNs     int64 | |
| 	isLocking    bool | |
| } | |
| 
 | |
| func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker { | |
| 	return &ExclusiveLocker{ | |
| 		masterClient: masterClient, | |
| 	} | |
| } | |
| func (l *ExclusiveLocker) IsLocking() bool { | |
| 	return l.isLocking | |
| } | |
| 
 | |
| func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { | |
| 	for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) { | |
| 		// wait until now is within the safe lock period, no immediate renewal to change the token | |
| 		time.Sleep(100 * time.Millisecond) | |
| 	} | |
| 	return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs) | |
| } | |
| 
 | |
| func (l *ExclusiveLocker) RequestLock() { | |
| 	if l.isLocking { | |
| 		return | |
| 	} | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background()) | |
| 	defer cancel() | |
| 
 | |
| 	// retry to get the lease | |
| 	for { | |
| 		if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { | |
| 			resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ | |
| 				PreviousToken:    atomic.LoadInt64(&l.token), | |
| 				PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), | |
| 				LockName:         AdminLockName, | |
| 			}) | |
| 			if err == nil { | |
| 				atomic.StoreInt64(&l.token, resp.Token) | |
| 				atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) | |
| 			} | |
| 			return err | |
| 		}); err != nil { | |
| 			// println("leasing problem", err.Error()) | |
| 			time.Sleep(InitLockInteval) | |
| 		} else { | |
| 			break | |
| 		} | |
| 	} | |
| 
 | |
| 	l.isLocking = true | |
| 
 | |
| 	// start a goroutine to renew the lease | |
| 	go func() { | |
| 		ctx2, cancel2 := context.WithCancel(context.Background()) | |
| 		defer cancel2() | |
| 
 | |
| 		for l.isLocking { | |
| 			if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { | |
| 				resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ | |
| 					PreviousToken:    atomic.LoadInt64(&l.token), | |
| 					PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), | |
| 					LockName:         AdminLockName, | |
| 				}) | |
| 				if err == nil { | |
| 					atomic.StoreInt64(&l.token, resp.Token) | |
| 					atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs) | |
| 					// println("ts", l.lockTsNs, "token", l.token) | |
| 				} | |
| 				return err | |
| 			}); err != nil { | |
| 				glog.Errorf("failed to renew lock: %v", err) | |
| 				return | |
| 			} else { | |
| 				time.Sleep(RenewInteval) | |
| 			} | |
| 
 | |
| 		} | |
| 	}() | |
| 
 | |
| } | |
| 
 | |
| func (l *ExclusiveLocker) ReleaseLock() { | |
| 	l.isLocking = false | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background()) | |
| 	defer cancel() | |
| 
 | |
| 	l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { | |
| 		client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ | |
| 			PreviousToken:    atomic.LoadInt64(&l.token), | |
| 			PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), | |
| 			LockName:         AdminLockName, | |
| 		}) | |
| 		return nil | |
| 	}) | |
| 	atomic.StoreInt64(&l.token, 0) | |
| 	atomic.StoreInt64(&l.lockTsNs, 0) | |
| }
 |