diff --git a/weed/filer/filer.go b/weed/filer/filer.go index ecdb24c82..49f730370 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -7,6 +7,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" @@ -61,6 +62,7 @@ type Filer struct { deletionQuit chan struct{} DeletionRetryQueue *DeletionRetryQueue EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner + remoteDeletionLoop sync.Once } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -85,7 +87,6 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH f.metaLogReplication = replication go f.loopProcessingDeletion() - go f.loopProcessingRemoteMetadataDeletionPending() return f } @@ -152,6 +153,9 @@ func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*m func (f *Filer) SetStore(store FilerStore) (isFresh bool) { f.Store = NewFilerStoreWrapper(store) + f.remoteDeletionLoop.Do(func() { + go f.loopProcessingRemoteMetadataDeletionPending() + }) return f.setOrLoadFilerStoreSignature(store) } diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index c7a902d21..e592efcc0 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -3,6 +3,7 @@ package filer import ( "encoding/base64" "context" + "errors" "fmt" "sort" "strings" @@ -19,6 +20,8 @@ const ( remoteMetadataDeletionPendingIndexKey = "filer.remote.metadata.deletion.pending.index" remoteMetadataDeletionPendingKeyPrefix = "filer.remote.metadata.deletion.pending/" remoteMetadataDeletionReconcileInterval = 1 * time.Minute + remoteMetadataDeletionMarkRetryAttempts = 3 + remoteMetadataDeletionMarkRetryBackoff = 100 * time.Millisecond ) type OnChunksFunc func([]*filer_pb.FileChunk) error @@ -142,8 +145,21 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou return remoteDeletionErr } if remoteDeleted { - if markErr := f.markRemoteMetadataDeletionPending(ctx, entry.FullPath); markErr != nil { - return fmt.Errorf("mark remote metadata deletion pending %s: %w", entry.FullPath, markErr) + markBackoff := remoteMetadataDeletionMarkRetryBackoff + var markErr error + for attempt := 1; attempt <= remoteMetadataDeletionMarkRetryAttempts; attempt++ { + markErr = f.markRemoteMetadataDeletionPending(ctx, entry.FullPath) + if markErr == nil { + break + } + if attempt < remoteMetadataDeletionMarkRetryAttempts { + time.Sleep(markBackoff) + markBackoff *= 2 + } + } + if markErr != nil { + glog.Errorf("CRITICAL: failed to mark remote metadata deletion pending for %s after %d attempts: %v", entry.FullPath, remoteMetadataDeletionMarkRetryAttempts, markErr) + return fmt.Errorf("mark remote metadata deletion pending %s after retries: %w", entry.FullPath, markErr) } } @@ -201,6 +217,10 @@ func (f *Filer) loopProcessingRemoteMetadataDeletionPending() { } func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) error { + if f.Store == nil { + return nil + } + pendingPaths, err := f.listPendingRemoteMetadataDeletionPaths(ctx) if err != nil { return err @@ -208,7 +228,7 @@ func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) err for _, pendingPath := range pendingPaths { entry, findErr := f.FindEntry(ctx, pendingPath) - if findErr == filer_pb.ErrNotFound || entry == nil { + if errors.Is(findErr, filer_pb.ErrNotFound) || entry == nil { if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil { glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr) } @@ -305,6 +325,10 @@ func (f *Filer) clearRemoteMetadataDeletionPending(ctx context.Context, path uti } func (f *Filer) listPendingRemoteMetadataDeletionPaths(ctx context.Context) ([]util.FullPath, error) { + if f.Store == nil { + return nil, nil + } + indexData, err := f.Store.KvGet(ctx, []byte(remoteMetadataDeletionPendingIndexKey)) if err != nil { if err == ErrKvNotFound {