From ffba644d3358502c90a938949e1c190385753629 Mon Sep 17 00:00:00 2001 From: Peter Date: Thu, 5 Mar 2026 17:37:32 +0000 Subject: [PATCH] filer: harden remote delete metadata recovery Persist remote-delete metadata pendings so local entry removal can be retried after failures, and return explicit errors when remote client resolution fails to prevent silent local-only deletes. Made-with: Cursor --- weed/filer/filer.go | 1 + weed/filer/filer_delete_entry.go | 200 ++++++++++++++++++++++++++- weed/filer/filer_lazy_remote.go | 24 ++-- weed/filer/filer_lazy_remote_test.go | 122 +++++++++++++++- 4 files changed, 329 insertions(+), 18 deletions(-) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index ecca13f7a..ecdb24c82 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -85,6 +85,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH f.metaLogReplication = replication go f.loopProcessingDeletion() + go f.loopProcessingRemoteMetadataDeletionPending() return f } diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index c688be017..c7a902d21 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -1,8 +1,12 @@ package filer import ( + "encoding/base64" "context" "fmt" + "sort" + "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -12,6 +16,9 @@ import ( const ( MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" + remoteMetadataDeletionPendingIndexKey = "filer.remote.metadata.deletion.pending.index" + remoteMetadataDeletionPendingKeyPrefix = "filer.remote.metadata.deletion.pending/" + remoteMetadataDeletionReconcileInterval = 1 * time.Minute ) type OnChunksFunc func([]*filer_pb.FileChunk) error @@ -130,13 +137,24 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) - if remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry); remoteDeletionErr != nil { + remoteDeleted, remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry) + if remoteDeletionErr != nil { return remoteDeletionErr } + if remoteDeleted { + if markErr := f.markRemoteMetadataDeletionPending(ctx, entry.FullPath); markErr != nil { + return fmt.Errorf("mark remote metadata deletion pending %s: %w", entry.FullPath, markErr) + } + } if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %w", storeDeletionErr) } + if remoteDeleted { + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, entry.FullPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", entry.FullPath, clearErr) + } + } if !entry.IsDirectory() { f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) } @@ -165,3 +183,183 @@ func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLink } } } + +func (f *Filer) loopProcessingRemoteMetadataDeletionPending() { + ticker := time.NewTicker(remoteMetadataDeletionReconcileInterval) + defer ticker.Stop() + + for { + select { + case <-f.deletionQuit: + return + case <-ticker.C: + if err := f.reconcilePendingRemoteMetadataDeletions(context.Background()); err != nil { + glog.Warningf("reconcile remote metadata deletion pendings: %v", err) + } + } + } +} + +func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) error { + pendingPaths, err := f.listPendingRemoteMetadataDeletionPaths(ctx) + if err != nil { + return err + } + + for _, pendingPath := range pendingPaths { + entry, findErr := f.FindEntry(ctx, pendingPath) + if findErr == filer_pb.ErrNotFound || entry == nil { + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr) + } + continue + } + if findErr != nil { + glog.Warningf("find pending remote metadata deletion %s: %v", pendingPath, findErr) + continue + } + + if deleteErr := f.Store.DeleteOneEntry(ctx, entry); deleteErr != nil { + glog.Warningf("retry local metadata deletion %s: %v", pendingPath, deleteErr) + continue + } + + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr) + } + } + + return nil +} + +func (f *Filer) markRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error { + txnCtx, beginErr := f.BeginTransaction(ctx) + if beginErr != nil { + return beginErr + } + committed := false + defer func() { + if !committed { + _ = f.RollbackTransaction(txnCtx) + } + }() + + pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx) + if err != nil { + return err + } + + pendingSet := make(map[string]struct{}, len(pendings)+1) + for _, pendingPath := range pendings { + pendingSet[string(pendingPath)] = struct{}{} + } + pendingSet[string(path)] = struct{}{} + + if err := f.Store.KvPut(txnCtx, pendingRemoteMetadataDeletionPathKey(path), []byte(path)); err != nil { + return err + } + if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil { + return err + } + if err := f.CommitTransaction(txnCtx); err != nil { + return err + } + committed = true + return nil +} + +func (f *Filer) clearRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error { + txnCtx, beginErr := f.BeginTransaction(ctx) + if beginErr != nil { + return beginErr + } + committed := false + defer func() { + if !committed { + _ = f.RollbackTransaction(txnCtx) + } + }() + + pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx) + if err != nil { + return err + } + + pendingSet := make(map[string]struct{}, len(pendings)) + for _, pendingPath := range pendings { + pendingSet[string(pendingPath)] = struct{}{} + } + delete(pendingSet, string(path)) + + if err := f.Store.KvDelete(txnCtx, pendingRemoteMetadataDeletionPathKey(path)); err != nil { + return err + } + if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil { + return err + } + if err := f.CommitTransaction(txnCtx); err != nil { + return err + } + committed = true + return nil +} + +func (f *Filer) listPendingRemoteMetadataDeletionPaths(ctx context.Context) ([]util.FullPath, error) { + indexData, err := f.Store.KvGet(ctx, []byte(remoteMetadataDeletionPendingIndexKey)) + if err != nil { + if err == ErrKvNotFound { + return nil, nil + } + return nil, err + } + if len(indexData) == 0 { + return nil, nil + } + + encodedKeys := decodePendingRemoteMetadataDeletionIndex(indexData) + if len(encodedKeys) == 0 { + return nil, nil + } + + var pendingPaths []util.FullPath + for _, encodedKey := range encodedKeys { + encodedKey = strings.TrimSpace(encodedKey) + if encodedKey == "" { + continue + } + data, getErr := f.Store.KvGet(ctx, []byte(encodedKey)) + if getErr != nil { + if getErr == ErrKvNotFound { + continue + } + return nil, getErr + } + pendingPaths = append(pendingPaths, util.FullPath(string(data))) + } + return pendingPaths, nil +} + +func pendingRemoteMetadataDeletionPathKey(path util.FullPath) []byte { + encodedPath := base64.RawURLEncoding.EncodeToString([]byte(path)) + return []byte(remoteMetadataDeletionPendingKeyPrefix + encodedPath) +} + +func encodePendingRemoteMetadataDeletionIndex(pendingSet map[string]struct{}) []byte { + if len(pendingSet) == 0 { + return []byte{} + } + + keys := make([]string, 0, len(pendingSet)) + for path := range pendingSet { + keys = append(keys, string(pendingRemoteMetadataDeletionPathKey(util.FullPath(path)))) + } + sort.Strings(keys) + return []byte(strings.Join(keys, "\n")) +} + +func decodePendingRemoteMetadataDeletionIndex(indexData []byte) []string { + if len(indexData) == 0 { + return nil + } + return strings.Split(string(indexData), "\n") +} diff --git a/weed/filer/filer_lazy_remote.go b/weed/filer/filer_lazy_remote.go index 87fb9adfc..3f176f75d 100644 --- a/weed/filer/filer_lazy_remote.go +++ b/weed/filer/filer_lazy_remote.go @@ -115,19 +115,19 @@ func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) ( return result.entry, nil } -func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) error { +func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) (bool, error) { if entry == nil || f.RemoteStorage == nil { - return nil + return false, nil } mountDir, remoteLoc := f.RemoteStorage.FindMountDirectory(entry.FullPath) if remoteLoc == nil { - return nil + return false, nil } client, _, found := f.RemoteStorage.FindRemoteStorageClient(entry.FullPath) - if !found { - return nil + if !found || client == nil { + return false, fmt.Errorf("resolve remote storage client for %s: not found", entry.FullPath) } objectLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoc, entry.FullPath) @@ -135,24 +135,24 @@ func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) error { if entry.IsDirectory() { if err := client.RemoveDirectory(objectLoc); err != nil { if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) { - return nil + return true, nil } - return fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err) + return false, fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err) } - return nil + return true, nil } if !entry.IsInRemoteOnly() { - return nil + return false, nil } if err := client.DeleteFile(objectLoc); err != nil { if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) { - return nil + return true, nil } - return fmt.Errorf("delete remote file %s: %w", entry.FullPath, err) + return false, fmt.Errorf("delete remote file %s: %w", entry.FullPath, err) } glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted %s from remote", entry.FullPath) - return nil + return true, nil } diff --git a/weed/filer/filer_lazy_remote_test.go b/weed/filer/filer_lazy_remote_test.go index a316ad0bf..fabe7279d 100644 --- a/weed/filer/filer_lazy_remote_test.go +++ b/weed/filer/filer_lazy_remote_test.go @@ -29,11 +29,17 @@ import ( type stubFilerStore struct { mu sync.Mutex entries map[string]*Entry + kv map[string][]byte insertErr error + deleteErrByPath map[string]error } func newStubFilerStore() *stubFilerStore { - return &stubFilerStore{entries: make(map[string]*Entry)} + return &stubFilerStore{ + entries: make(map[string]*Entry), + kv: make(map[string][]byte), + deleteErrByPath: make(map[string]error), + } } func (s *stubFilerStore) GetName() string { return "stub" } @@ -44,11 +50,27 @@ func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context, } func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil } func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil } -func (s *stubFilerStore) KvPut(context.Context, []byte, []byte) error { return nil } -func (s *stubFilerStore) KvGet(context.Context, []byte) ([]byte, error) { - return nil, ErrKvNotFound +func (s *stubFilerStore) KvPut(_ context.Context, key []byte, value []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + s.kv[string(key)] = append([]byte(nil), value...) + return nil +} +func (s *stubFilerStore) KvGet(_ context.Context, key []byte) ([]byte, error) { + s.mu.Lock() + defer s.mu.Unlock() + value, found := s.kv[string(key)] + if !found { + return nil, ErrKvNotFound + } + return append([]byte(nil), value...), nil +} +func (s *stubFilerStore) KvDelete(_ context.Context, key []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.kv, string(key)) + return nil } -func (s *stubFilerStore) KvDelete(context.Context, []byte) error { return nil } func (s *stubFilerStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil } func (s *stubFilerStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { return "", nil @@ -86,6 +108,9 @@ func (s *stubFilerStore) FindEntry(_ context.Context, p util.FullPath) (*Entry, func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error { s.mu.Lock() defer s.mu.Unlock() + if deleteErr, found := s.deleteErrByPath[string(p)]; found && deleteErr != nil { + return deleteErr + } delete(s.entries, string(p)) return nil } @@ -461,6 +486,93 @@ func TestDeleteEntryMetaAndData_RemoteOnlyFileNotUnderMountSkipsRemoteDelete(t * require.Len(t, stub.deleteCalls, 0) } +func TestDeleteEntryMetaAndData_RemoteMountWithoutClientResolutionKeepsMetadata(t *testing.T) { + rs := NewFilerRemoteStorage() + rs.storageNameToConf["missingclient"] = &remote_pb.RemoteConf{Name: "missingclient", Type: "stub_missing_client"} + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "missingclient", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/no-client.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 51, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 51}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.Error(t, err) + require.ErrorContains(t, err, "resolve remote storage client") + require.ErrorContains(t, err, string(filePath)) + + stored, findErr := store.FindEntry(context.Background(), filePath) + require.NoError(t, findErr) + require.NotNil(t, stored) +} + +func TestDeleteEntryMetaAndData_LocalDeleteFailureLeavesDurablePendingForReconcile(t *testing.T) { + const storageType = "stub_lazy_delete_pending_reconcile" + stub := &stubRemoteClient{} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "pendingreconcile", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "pendingreconcile", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/reconcile.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 80, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 80}, + } + store.deleteErrByPath[string(filePath)] = errors.New("simulated local delete failure") + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.Error(t, err) + require.ErrorContains(t, err, "filer store delete") + require.Len(t, stub.deleteCalls, 1) + + stored, findErr := store.FindEntry(context.Background(), filePath) + require.NoError(t, findErr) + require.NotNil(t, stored) + + pendingPaths, pendingErr := f.listPendingRemoteMetadataDeletionPaths(context.Background()) + require.NoError(t, pendingErr) + require.Equal(t, []util.FullPath{filePath}, pendingPaths) + + delete(store.deleteErrByPath, string(filePath)) + require.NoError(t, f.reconcilePendingRemoteMetadataDeletions(context.Background())) + + _, findAfterReconcileErr := store.FindEntry(context.Background(), filePath) + require.ErrorIs(t, findAfterReconcileErr, filer_pb.ErrNotFound) + require.Len(t, stub.deleteCalls, 1) + + pendingPaths, pendingErr = f.listPendingRemoteMetadataDeletionPaths(context.Background()) + require.NoError(t, pendingErr) + require.Empty(t, pendingPaths) +} + func TestDeleteEntryMetaAndData_RemoteDeleteNotFoundStillDeletesMetadata(t *testing.T) { const storageType = "stub_lazy_delete_notfound" stub := &stubRemoteClient{deleteErr: remote_storage.ErrRemoteObjectNotFound}