|
|
@ -32,17 +32,18 @@ func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) * |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type LiveLock struct { |
|
|
type LiveLock struct { |
|
|
key string |
|
|
|
|
|
renewToken string |
|
|
|
|
|
expireAtNs int64 |
|
|
|
|
|
hostFiler pb.ServerAddress |
|
|
|
|
|
cancelCh chan struct{} |
|
|
|
|
|
grpcDialOption grpc.DialOption |
|
|
|
|
|
isLocked int32 // 0 = unlocked, 1 = locked; use atomic operations
|
|
|
|
|
|
self string |
|
|
|
|
|
lc *LockClient |
|
|
|
|
|
owner string |
|
|
|
|
|
lockTTL time.Duration |
|
|
|
|
|
|
|
|
key string |
|
|
|
|
|
renewToken string |
|
|
|
|
|
expireAtNs int64 |
|
|
|
|
|
hostFiler pb.ServerAddress |
|
|
|
|
|
cancelCh chan struct{} |
|
|
|
|
|
grpcDialOption grpc.DialOption |
|
|
|
|
|
isLocked int32 // 0 = unlocked, 1 = locked; use atomic operations
|
|
|
|
|
|
self string |
|
|
|
|
|
lc *LockClient |
|
|
|
|
|
owner string |
|
|
|
|
|
lockTTL time.Duration |
|
|
|
|
|
consecutiveFailures int // Track connection failures to trigger fallback
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewShortLivedLock creates a lock with a 5-second duration
|
|
|
// NewShortLivedLock creates a lock with a 5-second duration
|
|
|
@ -213,6 +214,7 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e |
|
|
glog.V(4).Infof("LOCK: DistributedLock response - key=%s err=%v", lock.key, err) |
|
|
glog.V(4).Infof("LOCK: DistributedLock response - key=%s err=%v", lock.key, err) |
|
|
if err == nil && resp != nil { |
|
|
if err == nil && resp != nil { |
|
|
lock.renewToken = resp.RenewToken |
|
|
lock.renewToken = resp.RenewToken |
|
|
|
|
|
lock.consecutiveFailures = 0 // Reset failure counter on success
|
|
|
glog.V(4).Infof("LOCK: Got renewToken for key=%s", lock.key) |
|
|
glog.V(4).Infof("LOCK: Got renewToken for key=%s", lock.key) |
|
|
} else { |
|
|
} else { |
|
|
//this can be retried. Need to remember the last valid renewToken
|
|
|
//this can be retried. Need to remember the last valid renewToken
|
|
|
@ -225,7 +227,7 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e |
|
|
// Only log if the host actually changed
|
|
|
// Only log if the host actually changed
|
|
|
glog.V(2).Infof("LOCK: Host changed from %s to %s for key=%s", previousHostFiler, resp.LockHostMovedTo, lock.key) |
|
|
glog.V(2).Infof("LOCK: Host changed from %s to %s for key=%s", previousHostFiler, resp.LockHostMovedTo, lock.key) |
|
|
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
lock.lc.seedFiler = lock.hostFiler |
|
|
|
|
|
|
|
|
// Don't update seedFiler - keep original for fallback
|
|
|
} else if resp.LockHostMovedTo != "" { |
|
|
} else if resp.LockHostMovedTo != "" { |
|
|
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) |
|
|
} |
|
|
} |
|
|
@ -242,6 +244,19 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e |
|
|
} |
|
|
} |
|
|
return err |
|
|
return err |
|
|
}) |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
if err != nil && lock.hostFiler != lock.lc.seedFiler { |
|
|
|
|
|
lock.consecutiveFailures++ |
|
|
|
|
|
// Fall back to seed filer after 3 consecutive connection failures
|
|
|
|
|
|
if lock.consecutiveFailures >= 3 { |
|
|
|
|
|
glog.V(0).Infof("LOCK: Connection failed %d times for key=%s filer=%s, falling back to seed filer=%s", |
|
|
|
|
|
lock.consecutiveFailures, lock.key, lock.hostFiler, lock.lc.seedFiler) |
|
|
|
|
|
lock.hostFiler = lock.lc.seedFiler |
|
|
|
|
|
lock.consecutiveFailures = 0 |
|
|
|
|
|
lock.renewToken = "" |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|