Browse Source

filer: harden remote delete metadata recovery

Persist remote-delete metadata pendings so local entry removal can be retried after failures, and return explicit errors when remote client resolution fails to prevent silent local-only deletes.

Made-with: Cursor
pull/8522/head
Peter 5 days ago
parent
commit
ffba644d33
  1. 1
      weed/filer/filer.go
  2. 200
      weed/filer/filer_delete_entry.go
  3. 24
      weed/filer/filer_lazy_remote.go
  4. 122
      weed/filer/filer_lazy_remote_test.go

1
weed/filer/filer.go

@ -85,6 +85,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
f.metaLogReplication = replication
go f.loopProcessingDeletion()
go f.loopProcessingRemoteMetadataDeletionPending()
return f
}

200
weed/filer/filer_delete_entry.go

@ -1,8 +1,12 @@
package filer
import (
"encoding/base64"
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -12,6 +16,9 @@ import (
const (
MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
remoteMetadataDeletionPendingIndexKey = "filer.remote.metadata.deletion.pending.index"
remoteMetadataDeletionPendingKeyPrefix = "filer.remote.metadata.deletion.pending/"
remoteMetadataDeletionReconcileInterval = 1 * time.Minute
)
type OnChunksFunc func([]*filer_pb.FileChunk) error
@ -130,13 +137,24 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
if remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry); remoteDeletionErr != nil {
remoteDeleted, remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry)
if remoteDeletionErr != nil {
return remoteDeletionErr
}
if remoteDeleted {
if markErr := f.markRemoteMetadataDeletionPending(ctx, entry.FullPath); markErr != nil {
return fmt.Errorf("mark remote metadata deletion pending %s: %w", entry.FullPath, markErr)
}
}
if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %w", storeDeletionErr)
}
if remoteDeleted {
if clearErr := f.clearRemoteMetadataDeletionPending(ctx, entry.FullPath); clearErr != nil {
glog.Warningf("clear remote metadata deletion pending %s: %v", entry.FullPath, clearErr)
}
}
if !entry.IsDirectory() {
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
}
@ -165,3 +183,183 @@ func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLink
}
}
}
func (f *Filer) loopProcessingRemoteMetadataDeletionPending() {
ticker := time.NewTicker(remoteMetadataDeletionReconcileInterval)
defer ticker.Stop()
for {
select {
case <-f.deletionQuit:
return
case <-ticker.C:
if err := f.reconcilePendingRemoteMetadataDeletions(context.Background()); err != nil {
glog.Warningf("reconcile remote metadata deletion pendings: %v", err)
}
}
}
}
func (f *Filer) reconcilePendingRemoteMetadataDeletions(ctx context.Context) error {
pendingPaths, err := f.listPendingRemoteMetadataDeletionPaths(ctx)
if err != nil {
return err
}
for _, pendingPath := range pendingPaths {
entry, findErr := f.FindEntry(ctx, pendingPath)
if findErr == filer_pb.ErrNotFound || entry == nil {
if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil {
glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr)
}
continue
}
if findErr != nil {
glog.Warningf("find pending remote metadata deletion %s: %v", pendingPath, findErr)
continue
}
if deleteErr := f.Store.DeleteOneEntry(ctx, entry); deleteErr != nil {
glog.Warningf("retry local metadata deletion %s: %v", pendingPath, deleteErr)
continue
}
if clearErr := f.clearRemoteMetadataDeletionPending(ctx, pendingPath); clearErr != nil {
glog.Warningf("clear remote metadata deletion pending %s: %v", pendingPath, clearErr)
}
}
return nil
}
func (f *Filer) markRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error {
txnCtx, beginErr := f.BeginTransaction(ctx)
if beginErr != nil {
return beginErr
}
committed := false
defer func() {
if !committed {
_ = f.RollbackTransaction(txnCtx)
}
}()
pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx)
if err != nil {
return err
}
pendingSet := make(map[string]struct{}, len(pendings)+1)
for _, pendingPath := range pendings {
pendingSet[string(pendingPath)] = struct{}{}
}
pendingSet[string(path)] = struct{}{}
if err := f.Store.KvPut(txnCtx, pendingRemoteMetadataDeletionPathKey(path), []byte(path)); err != nil {
return err
}
if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil {
return err
}
if err := f.CommitTransaction(txnCtx); err != nil {
return err
}
committed = true
return nil
}
func (f *Filer) clearRemoteMetadataDeletionPending(ctx context.Context, path util.FullPath) error {
txnCtx, beginErr := f.BeginTransaction(ctx)
if beginErr != nil {
return beginErr
}
committed := false
defer func() {
if !committed {
_ = f.RollbackTransaction(txnCtx)
}
}()
pendings, err := f.listPendingRemoteMetadataDeletionPaths(txnCtx)
if err != nil {
return err
}
pendingSet := make(map[string]struct{}, len(pendings))
for _, pendingPath := range pendings {
pendingSet[string(pendingPath)] = struct{}{}
}
delete(pendingSet, string(path))
if err := f.Store.KvDelete(txnCtx, pendingRemoteMetadataDeletionPathKey(path)); err != nil {
return err
}
if err := f.Store.KvPut(txnCtx, []byte(remoteMetadataDeletionPendingIndexKey), encodePendingRemoteMetadataDeletionIndex(pendingSet)); err != nil {
return err
}
if err := f.CommitTransaction(txnCtx); err != nil {
return err
}
committed = true
return nil
}
func (f *Filer) listPendingRemoteMetadataDeletionPaths(ctx context.Context) ([]util.FullPath, error) {
indexData, err := f.Store.KvGet(ctx, []byte(remoteMetadataDeletionPendingIndexKey))
if err != nil {
if err == ErrKvNotFound {
return nil, nil
}
return nil, err
}
if len(indexData) == 0 {
return nil, nil
}
encodedKeys := decodePendingRemoteMetadataDeletionIndex(indexData)
if len(encodedKeys) == 0 {
return nil, nil
}
var pendingPaths []util.FullPath
for _, encodedKey := range encodedKeys {
encodedKey = strings.TrimSpace(encodedKey)
if encodedKey == "" {
continue
}
data, getErr := f.Store.KvGet(ctx, []byte(encodedKey))
if getErr != nil {
if getErr == ErrKvNotFound {
continue
}
return nil, getErr
}
pendingPaths = append(pendingPaths, util.FullPath(string(data)))
}
return pendingPaths, nil
}
func pendingRemoteMetadataDeletionPathKey(path util.FullPath) []byte {
encodedPath := base64.RawURLEncoding.EncodeToString([]byte(path))
return []byte(remoteMetadataDeletionPendingKeyPrefix + encodedPath)
}
func encodePendingRemoteMetadataDeletionIndex(pendingSet map[string]struct{}) []byte {
if len(pendingSet) == 0 {
return []byte{}
}
keys := make([]string, 0, len(pendingSet))
for path := range pendingSet {
keys = append(keys, string(pendingRemoteMetadataDeletionPathKey(util.FullPath(path))))
}
sort.Strings(keys)
return []byte(strings.Join(keys, "\n"))
}
func decodePendingRemoteMetadataDeletionIndex(indexData []byte) []string {
if len(indexData) == 0 {
return nil
}
return strings.Split(string(indexData), "\n")
}

24
weed/filer/filer_lazy_remote.go

@ -115,19 +115,19 @@ func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) (
return result.entry, nil
}
func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) error {
func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) (bool, error) {
if entry == nil || f.RemoteStorage == nil {
return nil
return false, nil
}
mountDir, remoteLoc := f.RemoteStorage.FindMountDirectory(entry.FullPath)
if remoteLoc == nil {
return nil
return false, nil
}
client, _, found := f.RemoteStorage.FindRemoteStorageClient(entry.FullPath)
if !found {
return nil
if !found || client == nil {
return false, fmt.Errorf("resolve remote storage client for %s: not found", entry.FullPath)
}
objectLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoc, entry.FullPath)
@ -135,24 +135,24 @@ func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) error {
if entry.IsDirectory() {
if err := client.RemoveDirectory(objectLoc); err != nil {
if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) {
return nil
return true, nil
}
return fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err)
return false, fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err)
}
return nil
return true, nil
}
if !entry.IsInRemoteOnly() {
return nil
return false, nil
}
if err := client.DeleteFile(objectLoc); err != nil {
if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) {
return nil
return true, nil
}
return fmt.Errorf("delete remote file %s: %w", entry.FullPath, err)
return false, fmt.Errorf("delete remote file %s: %w", entry.FullPath, err)
}
glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted %s from remote", entry.FullPath)
return nil
return true, nil
}

122
weed/filer/filer_lazy_remote_test.go

@ -29,11 +29,17 @@ import (
type stubFilerStore struct {
mu sync.Mutex
entries map[string]*Entry
kv map[string][]byte
insertErr error
deleteErrByPath map[string]error
}
func newStubFilerStore() *stubFilerStore {
return &stubFilerStore{entries: make(map[string]*Entry)}
return &stubFilerStore{
entries: make(map[string]*Entry),
kv: make(map[string][]byte),
deleteErrByPath: make(map[string]error),
}
}
func (s *stubFilerStore) GetName() string { return "stub" }
@ -44,11 +50,27 @@ func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context,
}
func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil }
func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil }
func (s *stubFilerStore) KvPut(context.Context, []byte, []byte) error { return nil }
func (s *stubFilerStore) KvGet(context.Context, []byte) ([]byte, error) {
return nil, ErrKvNotFound
func (s *stubFilerStore) KvPut(_ context.Context, key []byte, value []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
s.kv[string(key)] = append([]byte(nil), value...)
return nil
}
func (s *stubFilerStore) KvGet(_ context.Context, key []byte) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
value, found := s.kv[string(key)]
if !found {
return nil, ErrKvNotFound
}
return append([]byte(nil), value...), nil
}
func (s *stubFilerStore) KvDelete(_ context.Context, key []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.kv, string(key))
return nil
}
func (s *stubFilerStore) KvDelete(context.Context, []byte) error { return nil }
func (s *stubFilerStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil }
func (s *stubFilerStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
return "", nil
@ -86,6 +108,9 @@ func (s *stubFilerStore) FindEntry(_ context.Context, p util.FullPath) (*Entry,
func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error {
s.mu.Lock()
defer s.mu.Unlock()
if deleteErr, found := s.deleteErrByPath[string(p)]; found && deleteErr != nil {
return deleteErr
}
delete(s.entries, string(p))
return nil
}
@ -461,6 +486,93 @@ func TestDeleteEntryMetaAndData_RemoteOnlyFileNotUnderMountSkipsRemoteDelete(t *
require.Len(t, stub.deleteCalls, 0)
}
func TestDeleteEntryMetaAndData_RemoteMountWithoutClientResolutionKeepsMetadata(t *testing.T) {
rs := NewFilerRemoteStorage()
rs.storageNameToConf["missingclient"] = &remote_pb.RemoteConf{Name: "missingclient", Type: "stub_missing_client"}
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
Name: "missingclient",
Bucket: "mybucket",
Path: "/",
})
store := newStubFilerStore()
filePath := util.FullPath("/buckets/mybucket/no-client.txt")
store.entries[string(filePath)] = &Entry{
FullPath: filePath,
Attr: Attr{
Mtime: time.Unix(1700000000, 0),
Crtime: time.Unix(1700000000, 0),
Mode: 0644,
FileSize: 51,
},
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 51},
}
f := newTestFiler(t, store, rs)
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
require.Error(t, err)
require.ErrorContains(t, err, "resolve remote storage client")
require.ErrorContains(t, err, string(filePath))
stored, findErr := store.FindEntry(context.Background(), filePath)
require.NoError(t, findErr)
require.NotNil(t, stored)
}
func TestDeleteEntryMetaAndData_LocalDeleteFailureLeavesDurablePendingForReconcile(t *testing.T) {
const storageType = "stub_lazy_delete_pending_reconcile"
stub := &stubRemoteClient{}
defer registerStubMaker(t, storageType, stub)()
conf := &remote_pb.RemoteConf{Name: "pendingreconcile", Type: storageType}
rs := NewFilerRemoteStorage()
rs.storageNameToConf[conf.Name] = conf
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
Name: "pendingreconcile",
Bucket: "mybucket",
Path: "/",
})
store := newStubFilerStore()
filePath := util.FullPath("/buckets/mybucket/reconcile.txt")
store.entries[string(filePath)] = &Entry{
FullPath: filePath,
Attr: Attr{
Mtime: time.Unix(1700000000, 0),
Crtime: time.Unix(1700000000, 0),
Mode: 0644,
FileSize: 80,
},
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 80},
}
store.deleteErrByPath[string(filePath)] = errors.New("simulated local delete failure")
f := newTestFiler(t, store, rs)
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
require.Error(t, err)
require.ErrorContains(t, err, "filer store delete")
require.Len(t, stub.deleteCalls, 1)
stored, findErr := store.FindEntry(context.Background(), filePath)
require.NoError(t, findErr)
require.NotNil(t, stored)
pendingPaths, pendingErr := f.listPendingRemoteMetadataDeletionPaths(context.Background())
require.NoError(t, pendingErr)
require.Equal(t, []util.FullPath{filePath}, pendingPaths)
delete(store.deleteErrByPath, string(filePath))
require.NoError(t, f.reconcilePendingRemoteMetadataDeletions(context.Background()))
_, findAfterReconcileErr := store.FindEntry(context.Background(), filePath)
require.ErrorIs(t, findAfterReconcileErr, filer_pb.ErrNotFound)
require.Len(t, stub.deleteCalls, 1)
pendingPaths, pendingErr = f.listPendingRemoteMetadataDeletionPaths(context.Background())
require.NoError(t, pendingErr)
require.Empty(t, pendingPaths)
}
func TestDeleteEntryMetaAndData_RemoteDeleteNotFoundStillDeletesMetadata(t *testing.T) {
const storageType = "stub_lazy_delete_notfound"
stub := &stubRemoteClient{deleteErr: remote_storage.ErrRemoteObjectNotFound}

Loading…
Cancel
Save