Browse Source

move deletion out of listing transaction; delete entries and empty folders

pull/7426/head
chrislu 4 weeks ago
parent
commit
543e70c511
  1. 79
      weed/filer/filer.go

79
weed/filer/filer.go

@ -368,6 +368,11 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
} }
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
// Collect expired entries during iteration to avoid deadlock with DB connection pool
var expiredEntries []*Entry
var s3ExpiredEntries []*Entry
var hasValidEntries bool
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -376,29 +381,91 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
if entry.TtlSec > 0 { if entry.TtlSec > 0 {
if entry.IsExpireS3Enabled() { if entry.IsExpireS3Enabled() {
if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() { if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
}
// Collect for deletion after iteration completes to avoid DB deadlock
s3ExpiredEntries = append(s3ExpiredEntries, entry)
expiredCount++ expiredCount++
return true return true
} }
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
}
// Collect for deletion after iteration completes to avoid DB deadlock
expiredEntries = append(expiredEntries, entry)
expiredCount++ expiredCount++
return true return true
} }
} }
// Track that we found at least one valid (non-expired) entry
hasValidEntries = true
return eachEntryFunc(entry) return eachEntryFunc(entry)
} }
}) })
if err != nil { if err != nil {
return expiredCount, lastFileName, err return expiredCount, lastFileName, err
} }
// Delete expired entries after iteration completes to avoid DB connection deadlock
if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 {
for _, entry := range s3ExpiredEntries {
if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
}
}
for _, entry := range expiredEntries {
if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
}
}
// Check if directory is now empty and delete it if so
// Only check if we didn't find any valid entries and we're not at root
if !hasValidEntries && p != "/" && startFileName == "" {
// Do a quick check to see if directory is truly empty now
isEmpty := true
_, checkErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, "", true, 1, prefix, func(entry *Entry) bool {
isEmpty = false
return false // Stop after first entry
})
if checkErr == nil && isEmpty {
glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p)
parentDir, _ := p.DirAndName()
if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil {
// Delete the now-empty directory
if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
// Recursively try to delete parent directories if they become empty
f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(parentDir))
}
}
}
}
}
return return
} }
// maybeDeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty
func (f *Filer) maybeDeleteEmptyParentDirectories(ctx context.Context, parentDir util.FullPath) {
if parentDir == "/" {
return
}
// Check if parent directory is empty
isEmpty := true
_, err := f.Store.ListDirectoryPrefixedEntries(ctx, parentDir, "", true, 1, "", func(entry *Entry) bool {
isEmpty = false
return false // Stop after first entry
})
if err == nil && isEmpty {
glog.V(2).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: deleting empty directory %s", parentDir)
grandParentDir, _ := parentDir.DirAndName()
if parentEntry, findErr := f.FindEntry(ctx, parentDir); findErr == nil {
if delErr := f.doDeleteEntryMetaAndData(ctx, parentEntry, false, false, nil); delErr == nil {
// Continue checking upwards
f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(grandParentDir))
}
}
}
}
func (f *Filer) Shutdown() { func (f *Filer) Shutdown() {
close(f.deletionQuit) close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer() f.LocalMetaLogBuffer.ShutdownLogBuffer()

Loading…
Cancel
Save