Browse Source

pagination, one transaction for deletion

pull/7178/head
chrislu 4 weeks ago
parent
commit
19bd91be85
  1. 44
      weed/filer/foundationdb/foundationdb_store.go

44
weed/filer/foundationdb/foundationdb_store.go

@ -95,7 +95,9 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e
glog.V(0).Infof("FoundationDB: connecting to cluster file: %s, API version: %d", clusterFile, apiVersion)
// Set FDB API version
fdb.MustAPIVersion(apiVersion)
if err := fdb.APIVersion(apiVersion); err != nil {
return fmt.Errorf("failed to set FoundationDB API version %d: %w", apiVersion, err)
}
// Open database
var err error
@ -272,24 +274,44 @@ func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpa
// We need recursion because our key structure is tuple{dirPath, fileName}
// not tuple{dirPath, ...pathComponents}, so a simple prefix range won't catch subdirectories
return store.deleteFolderChildrenRecursive(ctx, fullpath)
// Wrap entire deletion in a single transaction for performance
if _, exists := store.getTransactionFromContext(ctx); exists {
return store.deleteFolderChildrenRecursive(ctx, fullpath)
}
_, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
txCtx := store.setTransactionInContext(context.Background(), tr)
return nil, store.deleteFolderChildrenRecursive(txCtx, fullpath)
})
return err
}
func (store *FoundationDBStore) deleteFolderChildrenRecursive(ctx context.Context, fullpath util.FullPath) error {
// List all entries in this directory
// List all entries in this directory (with pagination for directories > 1000 entries)
var entriesToDelete []util.FullPath
var subDirectories []util.FullPath
err := store.ListDirectoryEntries(ctx, fullpath, "", true, MAX_DIRECTORY_LIST_LIMIT, func(entry *filer.Entry) bool {
entriesToDelete = append(entriesToDelete, entry.FullPath)
if entry.IsDirectory() {
subDirectories = append(subDirectories, entry.FullPath)
var startFileName string
inclusive := true
for {
lastFileName, err := store.ListDirectoryEntries(ctx, fullpath, startFileName, inclusive, MAX_DIRECTORY_LIST_LIMIT, func(entry *filer.Entry) bool {
entriesToDelete = append(entriesToDelete, entry.FullPath)
if entry.IsDirectory() {
subDirectories = append(subDirectories, entry.FullPath)
}
return true
})
if err != nil {
return fmt.Errorf("listing children of %s: %w", fullpath, err)
}
return true
})
if err != nil {
return fmt.Errorf("listing children of %s: %w", fullpath, err)
if lastFileName == "" {
break
}
startFileName = lastFileName
inclusive = false
}
// Recursively delete subdirectories first

Loading…
Cancel
Save