Browse Source

RetryForever => RetryUntil

pull/4880/head
chrislu 1 year ago
parent
commit
b02fdeabff
  1. 6
      weed/cluster/lock_client.go
  2. 2
      weed/command/filer_remote_gateway.go
  3. 2
      weed/command/filer_remote_sync.go
  4. 2
      weed/mount/meta_cache/meta_cache_subscribe.go
  5. 2
      weed/operation/upload_content.go
  6. 2
      weed/pb/filer_pb_tail.go
  7. 2
      weed/s3api/auth_credentials_subscribe.go
  8. 5
      weed/util/retry.go

6
weed/cluster/lock_client.go

@ -57,7 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lc: lc, lc: lc,
} }
go func() { go func() {
util.RetryForever("create lock:"+key, func() error {
util.RetryUntil("create lock:"+key, func() error {
errorMessage, err := lock.doLock(lock_manager.MaxDuration) errorMessage, err := lock.doLock(lock_manager.MaxDuration)
if err != nil { if err != nil {
glog.Infof("create lock %s: %s", key, err) glog.Infof("create lock %s: %s", key, err)
@ -98,7 +98,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
lockDuration = lc.maxLockDuration lockDuration = lc.maxLockDuration
needRenewal = true needRenewal = true
} }
util.RetryForever("create lock:"+key, func() error {
util.RetryUntil("create lock:"+key, func() error {
errorMessage, err := lock.doLock(lockDuration) errorMessage, err := lock.doLock(lockDuration)
if err != nil { if err != nil {
time.Sleep(time.Second) time.Sleep(time.Second)
@ -148,7 +148,7 @@ func (lc *LockClient) keepLock(lock *LiveLock) {
select { select {
case <-ticker: case <-ticker:
// renew the lock if lock.expireAtNs is still greater than now // renew the lock if lock.expireAtNs is still greater than now
util.RetryForever("keep lock:"+lock.key, func() error {
util.RetryUntil("keep lock:"+lock.key, func() error {
lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
if lockDuration > lc.maxLockDuration { if lockDuration > lc.maxLockDuration {
lockDuration = lc.maxLockDuration lockDuration = lc.maxLockDuration

2
weed/command/filer_remote_gateway.go

@ -111,7 +111,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool {
// synchronize /buckets folder // synchronize /buckets folder
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir)
util.RetryForever("filer.remote.sync buckets", func() error {
util.RetryUntil("filer.remote.sync buckets", func() error {
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource)
}, func(err error) bool { }, func(err error) bool {
if err != nil { if err != nil {

2
weed/command/filer_remote_sync.go

@ -90,7 +90,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
if dir != "" { if dir != "" {
fmt.Printf("synchronize %s to remote storage...\n", dir) fmt.Printf("synchronize %s to remote storage...\n", dir)
util.RetryForever("filer.remote.sync "+dir, func() error {
util.RetryUntil("filer.remote.sync "+dir, func() error {
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
}, func(err error) bool { }, func(err error) bool {
if err != nil { if err != nil {

2
weed/mount/meta_cache/meta_cache_subscribe.go

@ -70,7 +70,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
StopTsNs: 0, StopTsNs: 0,
EventErrorType: pb.FatalOnError, EventErrorType: pb.FatalOnError,
} }
util.RetryForever("followMetaUpdates", func() error {
util.RetryUntil("followMetaUpdates", func() error {
clientEpoch++ clientEpoch++
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
}, func(err error) bool { }, func(err error) bool {

2
weed/operation/upload_content.go

@ -116,7 +116,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
return uploadErr return uploadErr
} }
if uploadOption.RetryForever { if uploadOption.RetryForever {
util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err) glog.V(0).Infof("upload content: %v", err)
return true return true
}) })

2
weed/pb/filer_pb_tail.go

@ -90,7 +90,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
case FatalOnError: case FatalOnError:
glog.Fatalf("process %v: %v", resp, err) glog.Fatalf("process %v: %v", resp, err)
case RetryForeverOnError: case RetryForeverOnError:
util.RetryForever("followMetaUpdates", func() error {
util.RetryUntil("followMetaUpdates", func() error {
return processEventFn(resp) return processEventFn(resp)
}, func(err error) bool { }, func(err error) bool {
glog.Errorf("process %v: %v", resp, err) glog.Errorf("process %v: %v", resp, err)

2
weed/s3api/auth_credentials_subscribe.go

@ -46,7 +46,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p
StopTsNs: 0, StopTsNs: 0,
EventErrorType: pb.FatalOnError, EventErrorType: pb.FatalOnError,
} }
util.RetryForever("followIamChanges", func() error {
util.RetryUntil("followIamChanges", func() error {
clientEpoch++ clientEpoch++
return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn) return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
}, func(err error) bool { }, func(err error) bool {

5
weed/util/retry.go

@ -57,7 +57,8 @@ func MultiRetry(name string, errList []string, job func() error) (err error) {
return err return err
} }
func RetryForever(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) {
// RetryUntil retries until the job returns no error or onErrFn returns false
func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) {
waitTime := time.Second waitTime := time.Second
for { for {
err := job() err := job()
@ -74,6 +75,8 @@ func RetryForever(name string, job func() error, onErrFn func(err error) (should
waitTime += waitTime / 2 waitTime += waitTime / 2
} }
continue continue
} else {
break
} }
} }
} }

Loading…
Cancel
Save