Browse Source

Filer: Refactor deletion processors for better readability

Extract large callback functions into dedicated private methods to
improve code organization and maintainability.

Changes:
1. Extract processDeletionBatch method
   - Handles deletion of a batch of file IDs
   - Classifies errors (success, not found, retryable, permanent)
   - Manages retry queue additions
   - Consolidates logging logic

2. Extract processRetryBatch method
   - Handles retry attempts for previously failed deletions
   - Processes retry results and updates queue
   - Symmetric to processDeletionBatch for consistency

Benefits:
- Main loop functions (loopProcessingDeletion, loopProcessingDeletionRetry)
  are now concise and focused on orchestration
- Business logic is separated into testable methods
- Reduced nesting depth improves readability
- Easier to understand control flow at a glance
- Better separation of concerns

The refactored methods follow the single responsibility principle,
making the codebase more maintainable and easier to extend.
pull/7402/head
Dimonyga 1 month ago
parent
commit
288695030d
  1. 175
      weed/filer/filer_deletion.go

175
weed/filer/filer_deletion.go

@ -218,54 +218,61 @@ func (f *Filer) loopProcessingDeletion() {
toDeleteFileIds = fileIds
fileIds = fileIds[:0]
}
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
// Process individual results for better error tracking
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
var errorDetails []string
for _, result := range results {
if result.Error == "" {
successCount++
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - acceptable
notFoundCount++
} else if isRetryableError(result.Error) {
// Retryable error - add to retry queue
retryableErrorCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
if len(errorDetails) < 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)")
}
} else {
// Permanent error - log but don't retry
permanentErrorCount++
if len(errorDetails) < 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)")
}
}
}
f.processDeletionBatch(toDeleteFileIds, lookupFunc, retryQueue)
}
})
}
}
}
if successCount > 0 || notFoundCount > 0 {
glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
}
// processDeletionBatch handles deletion of a batch of file IDs and processes results.
// It classifies errors into retryable and permanent categories, adds retryable failures
// to the retry queue, and logs appropriate messages.
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
// Process individual results for better error tracking
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
var errorDetails []string
for _, result := range results {
if result.Error == "" {
successCount++
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - acceptable
notFoundCount++
} else if isRetryableError(result.Error) {
// Retryable error - add to retry queue
retryableErrorCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
if len(errorDetails) < 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)")
}
} else {
// Permanent error - log but don't retry
permanentErrorCount++
if len(errorDetails) < 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)")
}
}
}
totalErrors := retryableErrorCount + permanentErrorCount
if totalErrors > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount)
if totalErrors > 10 {
logMessage += " (showing first 10)"
}
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
if successCount > 0 || notFoundCount > 0 {
glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
}
if retryQueue.Size() > 0 {
glog.V(2).Infof("retry queue size: %d", retryQueue.Size())
}
}
})
totalErrors := retryableErrorCount + permanentErrorCount
if totalErrors > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount)
if totalErrors > 10 {
logMessage += " (showing first 10)"
}
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
if retryQueue.Size() > 0 {
glog.V(2).Infof("retry queue size: %d", retryQueue.Size())
}
}
@ -327,47 +334,53 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin
}
glog.V(1).Infof("retrying deletion of %d files", len(readyItems))
f.processRetryBatch(readyItems, lookupFunc, retryQueue)
}
}
}
// Extract file IDs from retry items
var fileIds []string
itemsByFileId := make(map[string]*DeletionRetryItem)
for _, item := range readyItems {
fileIds = append(fileIds, item.FileId)
itemsByFileId[item.FileId] = item
}
// Attempt deletion
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
// Process results
var successCount, notFoundCount, retryCount int
for _, result := range results {
item := itemsByFileId[result.FileId]
if result.Error == "" {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount)
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - success
notFoundCount++
} else if isRetryableError(result.Error) {
// Still failing, add back to retry queue
retryCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
} else {
// Permanent error on retry - give up
glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error)
}
}
// processRetryBatch attempts to retry deletion of files and processes results.
// Successfully deleted items are removed from tracking, retryable failures are
// re-queued with updated retry counts, and permanent errors are logged and discarded.
func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
// Extract file IDs from retry items
var fileIds []string
itemsByFileId := make(map[string]*DeletionRetryItem)
for _, item := range readyItems {
fileIds = append(fileIds, item.FileId)
itemsByFileId[item.FileId] = item
}
if successCount > 0 || notFoundCount > 0 {
glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount)
}
if retryCount > 0 {
glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount)
}
// Attempt deletion
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
// Process results
var successCount, notFoundCount, retryCount int
for _, result := range results {
item := itemsByFileId[result.FileId]
if result.Error == "" {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount)
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - success
notFoundCount++
} else if isRetryableError(result.Error) {
// Still failing, add back to retry queue
retryCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
} else {
// Permanent error on retry - give up
glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error)
}
}
if successCount > 0 || notFoundCount > 0 {
glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount)
}
if retryCount > 0 {
glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount)
}
}
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {

Loading…
Cancel
Save