diff --git a/weed/filer/filer.go b/weed/filer/filer.go index ecca13f7a..d2364b155 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -7,6 +7,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" @@ -61,6 +62,8 @@ type Filer struct { deletionQuit chan struct{} DeletionRetryQueue *DeletionRetryQueue EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner + remoteDeletionLoop sync.Once + remoteMetadataDeletionIndexMu sync.Mutex } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -151,6 +154,9 @@ func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*m func (f *Filer) SetStore(store FilerStore) (isFresh bool) { f.Store = NewFilerStoreWrapper(store) + f.remoteDeletionLoop.Do(func() { + go f.loopProcessingRemoteMetadataDeletionPending() + }) return f.setOrLoadFilerStoreSignature(store) } @@ -358,7 +364,14 @@ var ( ) func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { + return f.findEntry(ctx, p, true) +} + +func (f *Filer) FindEntryLocal(ctx context.Context, p util.FullPath) (entry *Entry, err error) { + return f.findEntry(ctx, p, false) +} +func (f *Filer) findEntry(ctx context.Context, p util.FullPath, allowLazyFetch bool) (entry *Entry, err error) { if string(p) == "/" { return Root, nil } @@ -377,7 +390,7 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } } - if entry == nil && (err == nil || errors.Is(err, filer_pb.ErrNotFound)) { + if allowLazyFetch && entry == nil && (err == nil || errors.Is(err, filer_pb.ErrNotFound)) { if lazy, lazyErr := f.maybeLazyFetchFromRemote(ctx, p); lazyErr != nil { glog.V(1).InfofCtx(ctx, "FindEntry lazy fetch %s: %v", p, lazyErr) } else if lazy != nil { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 77d9e34a7..183897c40 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -1,8 +1,13 @@ package filer import ( + "encoding/base64" "context" + "errors" "fmt" + "sort" + "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -12,6 +17,11 @@ import ( const ( MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" + remoteMetadataDeletionPendingIndexKey = "filer.remote.metadata.deletion.pending.index" + remoteMetadataDeletionPendingKeyPrefix = "filer.remote.metadata.deletion.pending/" + remoteMetadataDeletionReconcileInterval = 1 * time.Minute + remoteMetadataDeletionMarkRetryAttempts = 3 + remoteMetadataDeletionMarkRetryBackoff = 100 * time.Millisecond ) type OnChunksFunc func([]*filer_pb.FileChunk) error @@ -130,9 +140,37 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) + remoteDeleted, remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry) + if remoteDeletionErr != nil { + return remoteDeletionErr + } + if remoteDeleted { + markBackoff := remoteMetadataDeletionMarkRetryBackoff + var markErr error + for attempt := 1; attempt <= remoteMetadataDeletionMarkRetryAttempts; attempt++ { + markErr = f.markRemoteMetadataDeletionPending(ctx, entry.FullPath) + if markErr == nil { + break + } + if attempt < remoteMetadataDeletionMarkRetryAttempts { + time.Sleep(markBackoff) + markBackoff *= 2 + } + } + if markErr != nil { + glog.Errorf("CRITICAL: failed to mark remote metadata deletion pending for %s after %d attempts: %v", entry.FullPath, remoteMetadataDeletionMarkRetryAttempts, markErr) + return fmt.Errorf("mark remote metadata deletion pending %s after retries: %w", entry.FullPath, markErr) + } + } + if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %w", storeDeletionErr) } + if remoteDeleted { + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, entry.FullPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", entry.FullPath, clearErr) + } + } if !entry.IsDirectory() { f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) } @@ -161,3 +199,197 @@ func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLink } } } + +func (f *Filer) loopProcessingRemoteMetadataDeletionPending() { + ticker := time.NewTicker(remoteMetadataDeletionReconcileInterval) + defer ticker.Stop() + + for { + select { + case <-f.deletionQuit: + return + case <-ticker.C: + if err := f.reconcilePendingRemoteMetadataDeletions(context.Background()); err != nil { + glog.Warningf("reconcile remote metadata deletion pendings: %v", err) + } + } + } +} + +func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) error { + if f.Store == nil { + return nil + } + + pendingPaths, err := f.listPendingRemoteMetadataDeletionPaths(ctx) + if err != nil { + return err + } + + for _, pendingPath := range pendingPaths { + entry, findErr := f.FindEntryLocal(ctx, pendingPath) + if errors.Is(findErr, filer_pb.ErrNotFound) || entry == nil { + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr) + } + continue + } + if findErr != nil { + glog.Warningf("find pending remote metadata deletion %s: %v", pendingPath, findErr) + continue + } + + if deleteErr := f.Store.DeleteOneEntry(ctx, entry); deleteErr != nil { + glog.Warningf("retry local metadata deletion %s: %v", pendingPath, deleteErr) + continue + } + + if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil { + glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr) + } + } + + return nil +} + +func (f *Filer) markRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error { + f.remoteMetadataDeletionIndexMu.Lock() + defer f.remoteMetadataDeletionIndexMu.Unlock() + + txnCtx, beginErr := f.BeginTransaction(ctx) + if beginErr != nil { + return beginErr + } + committed := false + defer func() { + if !committed { + _ = f.RollbackTransaction(txnCtx) + } + }() + + pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx) + if err != nil { + return err + } + + pendingSet := make(map[string]struct{}, len(pendings)+1) + for _, pendingPath := range pendings { + pendingSet[string(pendingPath)] = struct{}{} + } + pendingSet[string(path)] = struct{}{} + + if err := f.Store.KvPut(txnCtx, pendingRemoteMetadataDeletionPathKey(path), []byte(path)); err != nil { + return err + } + if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil { + return err + } + if err := f.CommitTransaction(txnCtx); err != nil { + return err + } + committed = true + return nil +} + +func (f *Filer) clearRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error { + f.remoteMetadataDeletionIndexMu.Lock() + defer f.remoteMetadataDeletionIndexMu.Unlock() + + txnCtx, beginErr := f.BeginTransaction(ctx) + if beginErr != nil { + return beginErr + } + committed := false + defer func() { + if !committed { + _ = f.RollbackTransaction(txnCtx) + } + }() + + pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx) + if err != nil { + return err + } + + pendingSet := make(map[string]struct{}, len(pendings)) + for _, pendingPath := range pendings { + pendingSet[string(pendingPath)] = struct{}{} + } + delete(pendingSet, string(path)) + + if err := f.Store.KvDelete(txnCtx, pendingRemoteMetadataDeletionPathKey(path)); err != nil { + return err + } + if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil { + return err + } + if err := f.CommitTransaction(txnCtx); err != nil { + return err + } + committed = true + return nil +} + +func (f *Filer) listPendingRemoteMetadataDeletionPaths(ctx context.Context) ([]util.FullPath, error) { + if f.Store == nil { + return nil, nil + } + + indexData, err := f.Store.KvGet(ctx, []byte(remoteMetadataDeletionPendingIndexKey)) + if err != nil { + if err == ErrKvNotFound { + return nil, nil + } + return nil, err + } + if len(indexData) == 0 { + return nil, nil + } + + encodedKeys := decodePendingRemoteMetadataDeletionIndex(indexData) + if len(encodedKeys) == 0 { + return nil, nil + } + + var pendingPaths []util.FullPath + for _, encodedKey := range encodedKeys { + encodedKey = strings.TrimSpace(encodedKey) + if encodedKey == "" { + continue + } + data, getErr := f.Store.KvGet(ctx, []byte(encodedKey)) + if getErr != nil { + if getErr == ErrKvNotFound { + continue + } + return nil, getErr + } + pendingPaths = append(pendingPaths, util.FullPath(string(data))) + } + return pendingPaths, nil +} + +func pendingRemoteMetadataDeletionPathKey(path util.FullPath) []byte { + encodedPath := base64.RawURLEncoding.EncodeToString([]byte(path)) + return []byte(remoteMetadataDeletionPendingKeyPrefix + encodedPath) +} + +func encodePendingRemoteMetadataDeletionIndex(pendingSet map[string]struct{}) []byte { + if len(pendingSet) == 0 { + return []byte{} + } + + keys := make([]string, 0, len(pendingSet)) + for path := range pendingSet { + keys = append(keys, string(pendingRemoteMetadataDeletionPathKey(util.FullPath(path)))) + } + sort.Strings(keys) + return []byte(strings.Join(keys, "\n")) +} + +func decodePendingRemoteMetadataDeletionIndex(indexData []byte) []string { + if len(indexData) == 0 { + return nil + } + return strings.Split(string(indexData), "\n") +} diff --git a/weed/filer/filer_lazy_remote.go b/weed/filer/filer_lazy_remote.go index 76f48a6fd..017827de9 100644 --- a/weed/filer/filer_lazy_remote.go +++ b/weed/filer/filer_lazy_remote.go @@ -114,3 +114,46 @@ func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) ( } return result.entry, nil } + +func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) (bool, error) { + if entry == nil || f.RemoteStorage == nil { + return false, nil + } + + mountDir, remoteLoc := f.RemoteStorage.FindMountDirectory(entry.FullPath) + if remoteLoc == nil { + return false, nil + } + + client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteLoc.Name) + if !found || client == nil { + return false, fmt.Errorf("resolve remote storage client for %s: not found", entry.FullPath) + } + + objectLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoc, entry.FullPath) + + if entry.IsDirectory() { + if err := client.RemoveDirectory(objectLoc); err != nil { + if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) { + return true, nil + } + return false, fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err) + } + glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted directory %s from remote", entry.FullPath) + return true, nil + } + + if !entry.IsInRemoteOnly() { + return false, nil + } + + if err := client.DeleteFile(objectLoc); err != nil { + if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) { + return true, nil + } + return false, fmt.Errorf("delete remote file %s: %w", entry.FullPath, err) + } + + glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted %s from remote", entry.FullPath) + return true, nil +} diff --git a/weed/filer/filer_lazy_remote_test.go b/weed/filer/filer_lazy_remote_test.go index 2c5dcd60f..fabe7279d 100644 --- a/weed/filer/filer_lazy_remote_test.go +++ b/weed/filer/filer_lazy_remote_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "os" "sync" "testing" "time" @@ -28,11 +29,17 @@ import ( type stubFilerStore struct { mu sync.Mutex entries map[string]*Entry + kv map[string][]byte insertErr error + deleteErrByPath map[string]error } func newStubFilerStore() *stubFilerStore { - return &stubFilerStore{entries: make(map[string]*Entry)} + return &stubFilerStore{ + entries: make(map[string]*Entry), + kv: make(map[string][]byte), + deleteErrByPath: make(map[string]error), + } } func (s *stubFilerStore) GetName() string { return "stub" } @@ -43,11 +50,27 @@ func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context, } func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil } func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil } -func (s *stubFilerStore) KvPut(context.Context, []byte, []byte) error { return nil } -func (s *stubFilerStore) KvGet(context.Context, []byte) ([]byte, error) { - return nil, ErrKvNotFound +func (s *stubFilerStore) KvPut(_ context.Context, key []byte, value []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + s.kv[string(key)] = append([]byte(nil), value...) + return nil +} +func (s *stubFilerStore) KvGet(_ context.Context, key []byte) ([]byte, error) { + s.mu.Lock() + defer s.mu.Unlock() + value, found := s.kv[string(key)] + if !found { + return nil, ErrKvNotFound + } + return append([]byte(nil), value...), nil +} +func (s *stubFilerStore) KvDelete(_ context.Context, key []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.kv, string(key)) + return nil } -func (s *stubFilerStore) KvDelete(context.Context, []byte) error { return nil } func (s *stubFilerStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil } func (s *stubFilerStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { return "", nil @@ -85,6 +108,9 @@ func (s *stubFilerStore) FindEntry(_ context.Context, p util.FullPath) (*Entry, func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error { s.mu.Lock() defer s.mu.Unlock() + if deleteErr, found := s.deleteErrByPath[string(p)]; found && deleteErr != nil { + return deleteErr + } delete(s.entries, string(p)) return nil } @@ -94,6 +120,11 @@ func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error { type stubRemoteClient struct { statResult *filer_pb.RemoteEntry statErr error + deleteErr error + removeErr error + + deleteCalls []*remote_pb.RemoteStorageLocation + removeCalls []*remote_pb.RemoteStorageLocation } func (c *stubRemoteClient) StatFile(*remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { @@ -108,14 +139,28 @@ func (c *stubRemoteClient) ReadFile(*remote_pb.RemoteStorageLocation, int64, int func (c *stubRemoteClient) WriteDirectory(*remote_pb.RemoteStorageLocation, *filer_pb.Entry) error { return nil } -func (c *stubRemoteClient) RemoveDirectory(*remote_pb.RemoteStorageLocation) error { return nil } +func (c *stubRemoteClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) error { + c.removeCalls = append(c.removeCalls, &remote_pb.RemoteStorageLocation{ + Name: loc.Name, + Bucket: loc.Bucket, + Path: loc.Path, + }) + return c.removeErr +} func (c *stubRemoteClient) WriteFile(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, io.Reader) (*filer_pb.RemoteEntry, error) { return nil, nil } func (c *stubRemoteClient) UpdateFileMetadata(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, *filer_pb.Entry) error { return nil } -func (c *stubRemoteClient) DeleteFile(*remote_pb.RemoteStorageLocation) error { return nil } +func (c *stubRemoteClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) error { + c.deleteCalls = append(c.deleteCalls, &remote_pb.RemoteStorageLocation{ + Name: loc.Name, + Bucket: loc.Bucket, + Path: loc.Path, + }) + return c.deleteErr +} func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil } func (c *stubRemoteClient) CreateBucket(string) error { return nil } func (c *stubRemoteClient) DeleteBucket(string) error { return nil } @@ -368,3 +413,268 @@ func TestFindEntry_LazyFetchOnMiss(t *testing.T) { require.NotNil(t, entry2) assert.Equal(t, uint64(999), entry2.FileSize) } + +func TestDeleteEntryMetaAndData_RemoteOnlyFileDeletesRemoteAndMetadata(t *testing.T) { + const storageType = "stub_lazy_delete_file" + stub := &stubRemoteClient{} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "deletestore", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "deletestore", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/file.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 64, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 64}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.NoError(t, err) + + _, findErr := store.FindEntry(context.Background(), filePath) + require.ErrorIs(t, findErr, filer_pb.ErrNotFound) + require.Len(t, stub.deleteCalls, 1) + assert.Equal(t, "deletestore", stub.deleteCalls[0].Name) + assert.Equal(t, "mybucket", stub.deleteCalls[0].Bucket) + assert.Equal(t, "/file.txt", stub.deleteCalls[0].Path) +} + +func TestDeleteEntryMetaAndData_RemoteOnlyFileNotUnderMountSkipsRemoteDelete(t *testing.T) { + const storageType = "stub_lazy_delete_not_under_mount" + stub := &stubRemoteClient{} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "notundermount", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "notundermount", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/no/mount/file.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 99, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 99}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.NoError(t, err) + require.Len(t, stub.deleteCalls, 0) +} + +func TestDeleteEntryMetaAndData_RemoteMountWithoutClientResolutionKeepsMetadata(t *testing.T) { + rs := NewFilerRemoteStorage() + rs.storageNameToConf["missingclient"] = &remote_pb.RemoteConf{Name: "missingclient", Type: "stub_missing_client"} + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "missingclient", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/no-client.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 51, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 51}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.Error(t, err) + require.ErrorContains(t, err, "resolve remote storage client") + require.ErrorContains(t, err, string(filePath)) + + stored, findErr := store.FindEntry(context.Background(), filePath) + require.NoError(t, findErr) + require.NotNil(t, stored) +} + +func TestDeleteEntryMetaAndData_LocalDeleteFailureLeavesDurablePendingForReconcile(t *testing.T) { + const storageType = "stub_lazy_delete_pending_reconcile" + stub := &stubRemoteClient{} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "pendingreconcile", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "pendingreconcile", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/reconcile.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 80, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 80}, + } + store.deleteErrByPath[string(filePath)] = errors.New("simulated local delete failure") + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.Error(t, err) + require.ErrorContains(t, err, "filer store delete") + require.Len(t, stub.deleteCalls, 1) + + stored, findErr := store.FindEntry(context.Background(), filePath) + require.NoError(t, findErr) + require.NotNil(t, stored) + + pendingPaths, pendingErr := f.listPendingRemoteMetadataDeletionPaths(context.Background()) + require.NoError(t, pendingErr) + require.Equal(t, []util.FullPath{filePath}, pendingPaths) + + delete(store.deleteErrByPath, string(filePath)) + require.NoError(t, f.reconcilePendingRemoteMetadataDeletions(context.Background())) + + _, findAfterReconcileErr := store.FindEntry(context.Background(), filePath) + require.ErrorIs(t, findAfterReconcileErr, filer_pb.ErrNotFound) + require.Len(t, stub.deleteCalls, 1) + + pendingPaths, pendingErr = f.listPendingRemoteMetadataDeletionPaths(context.Background()) + require.NoError(t, pendingErr) + require.Empty(t, pendingPaths) +} + +func TestDeleteEntryMetaAndData_RemoteDeleteNotFoundStillDeletesMetadata(t *testing.T) { + const storageType = "stub_lazy_delete_notfound" + stub := &stubRemoteClient{deleteErr: remote_storage.ErrRemoteObjectNotFound} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "deletenotfound", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "deletenotfound", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/notfound.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 23, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 23}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.NoError(t, err) + _, findErr := store.FindEntry(context.Background(), filePath) + require.ErrorIs(t, findErr, filer_pb.ErrNotFound) +} + +func TestDeleteEntryMetaAndData_RemoteDeleteErrorKeepsMetadata(t *testing.T) { + const storageType = "stub_lazy_delete_error" + stub := &stubRemoteClient{deleteErr: errors.New("remote delete failed")} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "deleteerr", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "deleteerr", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + filePath := util.FullPath("/buckets/mybucket/error.txt") + store.entries[string(filePath)] = &Entry{ + FullPath: filePath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: 0644, + FileSize: 77, + }, + Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 77}, + } + f := newTestFiler(t, store, rs) + + err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0) + require.Error(t, err) + stored, findErr := store.FindEntry(context.Background(), filePath) + require.NoError(t, findErr) + require.NotNil(t, stored) +} + +func TestDeleteEntryMetaAndData_DirectoryUnderMountDeletesRemoteDirectory(t *testing.T) { + const storageType = "stub_lazy_delete_dir" + stub := &stubRemoteClient{} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "dirstore", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "dirstore", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + dirPath := util.FullPath("/buckets/mybucket/dir") + store.entries[string(dirPath)] = &Entry{ + FullPath: dirPath, + Attr: Attr{ + Mtime: time.Unix(1700000000, 0), + Crtime: time.Unix(1700000000, 0), + Mode: os.ModeDir | 0755, + }, + } + f := newTestFiler(t, store, rs) + + err := f.doDeleteEntryMetaAndData(context.Background(), store.entries[string(dirPath)], false, false, nil) + require.NoError(t, err) + + require.Len(t, stub.removeCalls, 1) + assert.Equal(t, "dirstore", stub.removeCalls[0].Name) + assert.Equal(t, "mybucket", stub.removeCalls[0].Bucket) + assert.Equal(t, "/dir", stub.removeCalls[0].Path) + _, findErr := store.FindEntry(context.Background(), dirPath) + require.ErrorIs(t, findErr, filer_pb.ErrNotFound) +}