Browse Source

filer: harden pending remote metadata deletion flow

Retry pending-marker writes before local delete, fail closed when marking cannot be persisted, and start remote pending reconciliation only after the filer store is initialised to avoid nil store access.

Made-with: Cursor
pull/8522/head
Peter 5 days ago
parent
commit
b70fe32fbe
  1. 6
      weed/filer/filer.go
  2. 30
      weed/filer/filer_delete_entry.go

6
weed/filer/filer.go

@ -7,6 +7,7 @@ import (
"os"
"sort"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
@ -61,6 +62,7 @@ type Filer struct {
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
remoteDeletionLoop sync.Once
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@ -85,7 +87,6 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
f.metaLogReplication = replication
go f.loopProcessingDeletion()
go f.loopProcessingRemoteMetadataDeletionPending()
return f
}
@ -152,6 +153,9 @@ func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*m
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
f.Store = NewFilerStoreWrapper(store)
f.remoteDeletionLoop.Do(func() {
go f.loopProcessingRemoteMetadataDeletionPending()
})
return f.setOrLoadFilerStoreSignature(store)
}

30
weed/filer/filer_delete_entry.go

@ -3,6 +3,7 @@ package filer
import (
"encoding/base64"
"context"
"errors"
"fmt"
"sort"
"strings"
@ -19,6 +20,8 @@ const (
remoteMetadataDeletionPendingIndexKey = "filer.remote.metadata.deletion.pending.index"
remoteMetadataDeletionPendingKeyPrefix = "filer.remote.metadata.deletion.pending/"
remoteMetadataDeletionReconcileInterval = 1 * time.Minute
remoteMetadataDeletionMarkRetryAttempts = 3
remoteMetadataDeletionMarkRetryBackoff = 100 * time.Millisecond
)
type OnChunksFunc func([]*filer_pb.FileChunk) error
@ -142,8 +145,21 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
return remoteDeletionErr
}
if remoteDeleted {
if markErr := f.markRemoteMetadataDeletionPending(ctx, entry.FullPath); markErr != nil {
return fmt.Errorf("mark remote metadata deletion pending %s: %w", entry.FullPath, markErr)
markBackoff := remoteMetadataDeletionMarkRetryBackoff
var markErr error
for attempt := 1; attempt <= remoteMetadataDeletionMarkRetryAttempts; attempt++ {
markErr = f.markRemoteMetadataDeletionPending(ctx, entry.FullPath)
if markErr == nil {
break
}
if attempt < remoteMetadataDeletionMarkRetryAttempts {
time.Sleep(markBackoff)
markBackoff *= 2
}
}
if markErr != nil {
glog.Errorf("CRITICAL: failed to mark remote metadata deletion pending for %s after %d attempts: %v", entry.FullPath, remoteMetadataDeletionMarkRetryAttempts, markErr)
return fmt.Errorf("mark remote metadata deletion pending %s after retries: %w", entry.FullPath, markErr)
}
}
@ -201,6 +217,10 @@ func (f *Filer) loopProcessingRemoteMetadataDeletionPending() {
}
func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) error {
if f.Store == nil {
return nil
}
pendingPaths, err := f.listPendingRemoteMetadataDeletionPaths(ctx)
if err != nil {
return err
@ -208,7 +228,7 @@ func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) err
for _, pendingPath := range pendingPaths {
entry, findErr := f.FindEntry(ctx, pendingPath)
if findErr == filer_pb.ErrNotFound || entry == nil {
if errors.Is(findErr, filer_pb.ErrNotFound) || entry == nil {
if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil {
glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr)
}
@ -305,6 +325,10 @@ func (f *Filer) clearRemoteMetadataDeletionPending(ctx context.Context, path uti
}
func (f *Filer) listPendingRemoteMetadataDeletionPaths(ctx context.Context) ([]util.FullPath, error) {
if f.Store == nil {
return nil, nil
}
indexData, err := f.Store.KvGet(ctx, []byte(remoteMetadataDeletionPendingIndexKey))
if err != nil {
if err == ErrKvNotFound {

Loading…
Cancel
Save