diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go index 93ce0e0b8..6443ba27f 100644 --- a/weed/filer/foundationdb/foundationdb_store.go +++ b/weed/filer/foundationdb/foundationdb_store.go @@ -1,6 +1,21 @@ //go:build foundationdb // +build foundationdb +// Package foundationdb provides a filer store implementation using FoundationDB as the backend. +// +// IMPORTANT DESIGN NOTE - DeleteFolderChildren and Transaction Limits: +// +// FoundationDB imposes strict transaction limits: +// - Maximum transaction size: 10MB +// - Maximum transaction duration: 5 seconds +// +// The DeleteFolderChildren operation always uses batched deletion with multiple small transactions +// to safely handle directories of any size. Even if called within an existing transaction context, +// it will create its own batch transactions to avoid exceeding FDB limits. +// +// This means DeleteFolderChildren is NOT atomic with respect to an outer transaction - it manages +// its own transaction boundaries for safety and reliability. + package foundationdb import ( @@ -274,12 +289,12 @@ func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpa // We need recursion because our key structure is tuple{dirPath, fileName} // not tuple{dirPath, ...pathComponents}, so a simple prefix range won't catch subdirectories - // If we're already in a transaction, use it (caller handles transaction limits) - if _, exists := store.getTransactionFromContext(ctx); exists { - return store.deleteFolderChildrenRecursive(ctx, fullpath) - } - - // Otherwise, delete in batches to avoid FDB transaction limits (10MB size, 5s timeout) + // ALWAYS use batched deletion to safely handle directories of any size. + // This avoids FoundationDB's 10MB transaction size and 5s timeout limits. + // + // Note: Even if called within an existing transaction, we create our own batch transactions. + // This means DeleteFolderChildren is NOT atomic with an outer transaction, but it ensures + // reliability and prevents transaction limit violations. return store.deleteFolderChildrenInBatches(ctx, fullpath) } @@ -342,50 +357,6 @@ func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Contex return nil } -func (store *FoundationDBStore) deleteFolderChildrenRecursive(ctx context.Context, fullpath util.FullPath) error { - // List all entries in this directory (with pagination for directories > 1000 entries) - var entriesToDelete []util.FullPath - var subDirectories []util.FullPath - - var startFileName string - inclusive := true - for { - lastFileName, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, MAX_DIRECTORY_LIST_LIMIT, func(entry *filer.Entry) bool { - entriesToDelete = append(entriesToDelete, entry.FullPath) - if entry.IsDirectory() { - subDirectories = append(subDirectories, entry.FullPath) - } - return true - }) - - if err != nil { - return fmt.Errorf("listing children of %s: %w", fullpath, err) - } - - if lastFileName == "" { - break - } - startFileName = lastFileName - inclusive = false - } - - // Recursively delete subdirectories first - for _, subDir := range subDirectories { - if err := store.deleteFolderChildrenRecursive(ctx, subDir); err != nil { - return err - } - } - - // Delete all entries in this directory - for _, entryPath := range entriesToDelete { - if err := store.DeleteEntry(ctx, entryPath); err != nil { - return fmt.Errorf("deleting entry %s: %w", entryPath, err) - } - } - - return nil -} - func (store *FoundationDBStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) } diff --git a/weed/filer/foundationdb/foundationdb_store_test.go b/weed/filer/foundationdb/foundationdb_store_test.go index cbae2afa3..6295f6846 100644 --- a/weed/filer/foundationdb/foundationdb_store_test.go +++ b/weed/filer/foundationdb/foundationdb_store_test.go @@ -352,3 +352,178 @@ func createBenchmarkStore(b *testing.B) *FoundationDBStore { func containsString(s, substr string) bool { return strings.Contains(s, substr) } + +func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) { + // This test validates that DeleteFolderChildren always uses batching + // to safely handle large directories, regardless of transaction context + + store := getTestStore(t) + defer store.Shutdown() + + ctx := context.Background() + testDir := util.FullPath(fmt.Sprintf("/test_batch_delete_%d", time.Now().UnixNano())) + + // Create a large directory (> 100 entries to trigger batching) + const NUM_ENTRIES = 250 + + t.Logf("Creating %d test entries...", NUM_ENTRIES) + for i := 0; i < NUM_ENTRIES; i++ { + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir), fmt.Sprintf("file_%04d.txt", i)), + Attr: filer.Attr{ + Mode: 0644, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + if err := store.InsertEntry(ctx, entry); err != nil { + t.Fatalf("Failed to insert test entry %d: %v", i, err) + } + } + + // Test 1: DeleteFolderChildren outside transaction should succeed + t.Run("OutsideTransaction", func(t *testing.T) { + testDir1 := util.FullPath(fmt.Sprintf("/test_batch_1_%d", time.Now().UnixNano())) + + // Create entries + for i := 0; i < NUM_ENTRIES; i++ { + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir1), fmt.Sprintf("file_%04d.txt", i)), + Attr: filer.Attr{ + Mode: 0644, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + store.InsertEntry(ctx, entry) + } + + // Delete with batching + err := store.DeleteFolderChildren(ctx, testDir1) + if err != nil { + t.Errorf("DeleteFolderChildren outside transaction should succeed, got error: %v", err) + } + + // Verify all entries deleted + var count int + store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) bool { + count++ + return true + }) + if count != 0 { + t.Errorf("Expected all entries to be deleted, found %d", count) + } + }) + + // Test 2: DeleteFolderChildren with transaction context - uses its own batched transactions + t.Run("WithTransactionContext", func(t *testing.T) { + testDir2 := util.FullPath(fmt.Sprintf("/test_batch_2_%d", time.Now().UnixNano())) + + // Create entries + for i := 0; i < NUM_ENTRIES; i++ { + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir2), fmt.Sprintf("file_%04d.txt", i)), + Attr: filer.Attr{ + Mode: 0644, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + store.InsertEntry(ctx, entry) + } + + // Start a transaction (DeleteFolderChildren will ignore it and use its own batching) + txCtx, err := store.BeginTransaction(ctx) + if err != nil { + t.Fatalf("BeginTransaction failed: %v", err) + } + + // Delete large directory - should succeed with batching + err = store.DeleteFolderChildren(txCtx, testDir2) + if err != nil { + t.Errorf("DeleteFolderChildren should succeed with batching even when transaction context present, got: %v", err) + } + + // Rollback transaction (DeleteFolderChildren used its own transactions, so this doesn't affect deletions) + store.RollbackTransaction(txCtx) + + // Verify entries are still deleted (because DeleteFolderChildren managed its own transactions) + var count int + store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) bool { + count++ + return true + }) + + if count != 0 { + t.Errorf("Expected all entries to be deleted, found %d (DeleteFolderChildren uses its own transactions)", count) + } + }) + + // Test 3: Nested directories with batching + t.Run("NestedDirectories", func(t *testing.T) { + testDir3 := util.FullPath(fmt.Sprintf("/test_batch_3_%d", time.Now().UnixNano())) + + // Create nested structure + for i := 0; i < 50; i++ { + // Files in root + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("file_%02d.txt", i)), + Attr: filer.Attr{ + Mode: 0644, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + store.InsertEntry(ctx, entry) + + // Subdirectory + subDir := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("dir_%02d", i)), + Attr: filer.Attr{ + Mode: 0755 | os.ModeDir, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + store.InsertEntry(ctx, subDir) + + // Files in subdirectory + for j := 0; j < 3; j++ { + subEntry := &filer.Entry{ + FullPath: util.NewFullPath(string(testDir3)+"/"+fmt.Sprintf("dir_%02d", i), fmt.Sprintf("subfile_%02d.txt", j)), + Attr: filer.Attr{ + Mode: 0644, + Uid: 1000, + Gid: 1000, + Mtime: time.Now(), + }, + } + store.InsertEntry(ctx, subEntry) + } + } + + // Delete all with batching + err := store.DeleteFolderChildren(ctx, testDir3) + if err != nil { + t.Errorf("DeleteFolderChildren should handle nested directories, got: %v", err) + } + + // Verify all deleted + var count int + store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) bool { + count++ + return true + }) + if count != 0 { + t.Errorf("Expected all nested entries to be deleted, found %d", count) + } + }) + + // Cleanup + store.DeleteFolderChildren(ctx, testDir) +}