Browse Source

recursive deletion

pull/7178/head
chrislu 2 months ago
parent
commit
d869ce44e9
  1. 60
      weed/filer/foundationdb/foundationdb_store.go
  2. 21
      weed/filer/foundationdb/foundationdb_store_test.go

60
weed/filer/foundationdb/foundationdb_store.go

@ -22,6 +22,9 @@ import (
const (
// FoundationDB transaction size limit is 10MB
FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024
// Maximum number of entries to return in a single directory listing
// Large batches can cause transaction timeouts and increase memory pressure
MAX_DIRECTORY_LIST_LIMIT = 1000
)
func init() {
@ -265,27 +268,42 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F
}
func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
// Construct tuple-aware range for all children in this directory
prefixBytes := store.seaweedfsDir.Pack(tuple.Tuple{string(fullpath)})
kr, err := fdb.PrefixRange(prefixBytes)
if err != nil {
return fmt.Errorf("creating prefix range for %s: %w", fullpath, err)
}
// Recursively delete all entries in this directory and its subdirectories
// We need recursion because our key structure is tuple{dirPath, fileName}
// not tuple{dirPath, ...pathComponents}, so a simple prefix range won't catch subdirectories
// Check if there's a transaction in context
if tx, exists := store.getTransactionFromContext(ctx); exists {
tx.ClearRange(kr)
return nil
}
return store.deleteFolderChildrenRecursive(ctx, fullpath)
}
// Execute in a new transaction if not in an existing one
_, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(kr)
return nil, nil
func (store *FoundationDBStore) deleteFolderChildrenRecursive(ctx context.Context, fullpath util.FullPath) error {
// List all entries in this directory
var entriesToDelete []util.FullPath
var subDirectories []util.FullPath
err := store.ListDirectoryEntries(ctx, fullpath, "", true, MAX_DIRECTORY_LIST_LIMIT, func(entry *filer.Entry) bool {
entriesToDelete = append(entriesToDelete, entry.FullPath)
if entry.IsDirectory() {
subDirectories = append(subDirectories, entry.FullPath)
}
return true
})
if err != nil {
return fmt.Errorf("deleting folder children %s: %w", fullpath, err)
return fmt.Errorf("listing children of %s: %w", fullpath, err)
}
// Recursively delete subdirectories first
for _, subDir := range subDirectories {
if err := store.deleteFolderChildrenRecursive(ctx, subDir); err != nil {
return err
}
}
// Delete all entries in this directory
for _, entryPath := range entriesToDelete {
if err := store.DeleteEntry(ctx, entryPath); err != nil {
return fmt.Errorf("deleting entry %s: %w", entryPath, err)
}
}
return nil
@ -296,10 +314,10 @@ func (store *FoundationDBStore) ListDirectoryEntries(ctx context.Context, dirPat
}
func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
// Cap limit at 1000 for optimal FoundationDB performance
// Cap limit for optimal FoundationDB performance
// Large batches can cause transaction timeouts and increase memory pressure
if limit > 1000 {
limit = 1000
if limit > MAX_DIRECTORY_LIST_LIMIT || limit <= 0 {
limit = MAX_DIRECTORY_LIST_LIMIT
}
// Determine the key range for the scan
@ -481,8 +499,8 @@ func (store *FoundationDBStore) extractFileName(key fdb.Key) (string, error) {
if err != nil {
return "", fmt.Errorf("unpack key %v: %w", key, err)
}
if len(t) < 2 {
return "", fmt.Errorf("tuple too short (len=%d) for key %v", len(t), key)
if len(t) != 2 {
return "", fmt.Errorf("tuple unexpected length (len=%d, expected 2) for key %v", len(t), key)
}
if fileName, ok := t[1].(string); ok {

21
weed/filer/foundationdb/foundationdb_store_test.go

@ -144,14 +144,17 @@ func TestFoundationDBStore_KeyGeneration(t *testing.T) {
t.Error("Generated key should not be empty")
}
// Test that we can extract filename back
// Note: This tests internal consistency
if tc.fileName != "" {
extractedName := store.extractFileName(key)
if extractedName != tc.fileName {
t.Errorf("Expected extracted filename '%s', got '%s'", tc.fileName, extractedName)
}
// Test that we can extract filename back
// Note: This tests internal consistency
if tc.fileName != "" {
extractedName, err := store.extractFileName(key)
if err != nil {
t.Errorf("extractFileName failed: %v", err)
}
if extractedName != tc.fileName {
t.Errorf("Expected extracted filename '%s', got '%s'", tc.fileName, extractedName)
}
}
})
}
}
@ -241,8 +244,8 @@ func TestFoundationDBStore_TransactionState(t *testing.T) {
t.Fatalf("BeginTransaction failed: %v", err)
}
// Try to begin another transaction
_, err = store.BeginTransaction(ctx)
// Try to begin another transaction on the same context
_, err = store.BeginTransaction(txCtx)
if err == nil {
t.Error("Expected error when beginning transaction while one is active")
}

Loading…
Cancel
Save