From 9b6b56423547672953b6928311e6d879faad02e3 Mon Sep 17 00:00:00 2001 From: Dmitriy Pavlov Date: Thu, 30 Oct 2025 03:31:23 +0200 Subject: [PATCH] Filer: Add retry mechanism for failed file deletions (#7402) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Filer: Add retry mechanism for failed file deletions Implement a retry queue with exponential backoff for handling transient deletion failures, particularly when volumes are temporarily read-only. Key features: - Automatic retry for retryable errors (read-only volumes, network issues) - Exponential backoff: 5min → 10min → 20min → ... (max 6 hours) - Maximum 10 retry attempts per file before giving up - Separate goroutine processing retry queue every minute - Enhanced logging with retry/permanent error classification This addresses the issue where file deletions fail when volumes are temporarily read-only (tiered volumes, maintenance, etc.) and these deletions were previously lost. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * Update weed/filer/filer_deletion.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Filer: Add retry mechanism for failed file deletions Implement a retry queue with exponential backoff for handling transient deletion failures, particularly when volumes are temporarily read-only. Key features: - Automatic retry for retryable errors (read-only volumes, network issues) - Exponential backoff: 5min → 10min → 20min → ... (max 6 hours) - Maximum 10 retry attempts per file before giving up - Separate goroutine processing retry queue every minute - Map-based retry queue for O(1) lookups and deletions - Enhanced logging with retry/permanent error classification - Consistent error detail limiting (max 10 total errors logged) - Graceful shutdown support with quit channel for both processors This addresses the issue where file deletions fail when volumes are temporarily read-only (tiered volumes, maintenance, etc.) and these deletions were previously lost. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * Filer: Replace magic numbers with named constants in retry processor Replace hardcoded values with package-level constants for better maintainability: - DeletionRetryPollInterval (1 minute): interval for checking retry queue - DeletionRetryBatchSize (1000): max items to process per iteration This improves code readability and makes configuration changes easier. * Filer: Optimize retry queue with min-heap data structure Replace map-based retry queue with a min-heap for better scalability and deterministic ordering. Performance improvements: - GetReadyItems: O(N) → O(K log N) where K is items retrieved - AddOrUpdate: O(1) → O(log N) (acceptable trade-off) - Early exit when checking ready items (heap top is earliest) - No full iteration over all items while holding lock Benefits: - Deterministic processing order (earliest NextRetryAt first) - Better scalability for large retry queues (thousands of items) - Reduced lock contention duration - Memory efficient (no separate slice reconstruction) Implementation: - Min-heap ordered by NextRetryAt using container/heap - Dual index: heap for ordering + map for O(1) FileId lookups - heap.Fix() used when updating existing items - Comprehensive complexity documentation in comments This addresses the performance bottleneck identified in GetReadyItems where iterating over the entire map with a write lock could block other goroutines in high-failure scenarios. * Filer: Modernize heap interface and improve error handling docs 1. Replace interface{} with any in heap methods - Addresses modern Go style (Go 1.18+) - Improves code readability 2. Enhance isRetryableError documentation - Acknowledge string matching brittleness - Add comprehensive TODO for future improvements: * Use HTTP status codes (503, 429, etc.) * Implement structured error types with errors.Is/As * Extract gRPC status codes * Add error wrapping for better context - Document each error pattern with context - Add defensive check for empty error strings Current implementation remains pragmatic for initial release while documenting a clear path for future robustness improvements. String matching is acceptable for now but should be replaced with structured error checking when refactoring the deletion pipeline. * Filer: Refactor deletion processors for better readability Extract large callback functions into dedicated private methods to improve code organization and maintainability. Changes: 1. Extract processDeletionBatch method - Handles deletion of a batch of file IDs - Classifies errors (success, not found, retryable, permanent) - Manages retry queue additions - Consolidates logging logic 2. Extract processRetryBatch method - Handles retry attempts for previously failed deletions - Processes retry results and updates queue - Symmetric to processDeletionBatch for consistency Benefits: - Main loop functions (loopProcessingDeletion, loopProcessingDeletionRetry) are now concise and focused on orchestration - Business logic is separated into testable methods - Reduced nesting depth improves readability - Easier to understand control flow at a glance - Better separation of concerns The refactored methods follow the single responsibility principle, making the codebase more maintainable and easier to extend. * Update weed/filer/filer_deletion.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Filer: Fix critical retry count bug and add comprehensive error patterns Critical bug fixes from PR review: 1. Fix RetryCount reset bug (CRITICAL) - Problem: When items are re-queued via AddOrUpdate, RetryCount resets to 1, breaking exponential backoff - Solution: Add RequeueForRetry() method that preserves retry state - Impact: Ensures proper exponential backoff progression 2. Add overflow protection in backoff calculation - Check shift amount > 63 to prevent bit-shift overflow - Additional safety: check if delay <= 0 or > MaxRetryDelay - Protects against arithmetic overflow in extreme cases 3. Expand retryable error patterns - Added: timeout, deadline exceeded, context canceled - Added: lookup error/failed (volume discovery issues) - Added: connection refused, broken pipe (network errors) - Added: too many requests, service unavailable (backpressure) - Added: temporarily unavailable, try again (transient errors) - Added: i/o timeout (network timeouts) Benefits: - Retry mechanism now works correctly across restarts - More robust against edge cases and overflow - Better coverage of transient failure scenarios - Improved resilience in high-failure environments Addresses feedback from CodeRabbit and Gemini Code Assist in PR #7402. * Filer: Add persistence docs and comprehensive unit tests Documentation improvements: 1. Document in-memory queue limitation - Acknowledge that retry queue is volatile (lost on restart) - Document trade-offs and future persistence options - Provide clear path for production hardening - Note eventual consistency through main deletion queue Unit test coverage: 1. TestDeletionRetryQueue_AddAndRetrieve - Basic add/retrieve operations - Verify items not ready before delay elapsed 2. TestDeletionRetryQueue_ExponentialBackoff - Verify exponential backoff progression (5m→10m→20m→40m→80m) - Validate delay calculations with timing tolerance 3. TestDeletionRetryQueue_OverflowProtection - Test high retry counts (60+) that could cause overflow - Verify capping at MaxRetryDelay 4. TestDeletionRetryQueue_MaxAttemptsReached - Verify items discarded after MaxRetryAttempts - Confirm proper queue cleanup 5. TestIsRetryableError - Comprehensive error pattern coverage - Test all retryable error types (timeout, connection, lookup, etc.) - Verify non-retryable errors correctly identified 6. TestDeletionRetryQueue_HeapOrdering - Verify min-heap property maintained - Test items processed in NextRetryAt order - Validate heap.Init() integration All tests passing. Addresses PR feedback on testing requirements. * Filer: Add code quality improvements for deletion retry Address PR feedback with minor optimizations: - Add MaxLoggedErrorDetails constant (replaces magic number 10) - Pre-allocate slices and maps in processRetryBatch for efficiency - Improve log message formatting to use constant These changes improve code maintainability and runtime performance without altering functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * refactoring retrying * use constant * assert * address comment * refactor * address comments * dedup * process retried deletions * address comment * check in-flight items also; dedup code * refactoring * refactoring * simplify * reset heap * more efficient * add DeletionBatchSize as a constant;Permanent > Retryable > Success > Not Found --------- Co-authored-by: Claude Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: chrislu Co-authored-by: Chris Lu --- weed/filer/filer.go | 5 + weed/filer/filer_deletion.go | 572 +++++++++++++++++++++++++++--- weed/filer/filer_deletion_test.go | 308 ++++++++++++++++ 3 files changed, 840 insertions(+), 45 deletions(-) create mode 100644 weed/filer/filer_deletion_test.go diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 71185d3d1..b86ac3c5b 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -54,6 +54,8 @@ type Filer struct { RemoteStorage *FilerRemoteStorage Dlm *lock_manager.DistributedLockManager MaxFilenameLength uint32 + deletionQuit chan struct{} + DeletionRetryQueue *DeletionRetryQueue } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -66,6 +68,8 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH UniqueFilerId: util.RandomInt32(), Dlm: lock_manager.NewDistributedLockManager(filerHost), MaxFilenameLength: maxFilenameLength, + deletionQuit: make(chan struct{}), + DeletionRetryQueue: NewDeletionRetryQueue(), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId @@ -379,6 +383,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta } func (f *Filer) Shutdown() { + close(f.deletionQuit) f.LocalMetaLogBuffer.ShutdownLogBuffer() f.Store.Shutdown() } diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index b3a4296ba..d8bc105e6 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -1,20 +1,274 @@ package filer import ( + "container/heap" "context" "fmt" "strings" + "sync" "time" + "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +const ( + // Maximum number of retry attempts for failed deletions + MaxRetryAttempts = 10 + // Initial retry delay (will be doubled with each attempt) + InitialRetryDelay = 5 * time.Minute + // Maximum retry delay + MaxRetryDelay = 6 * time.Hour + // Interval for checking retry queue for ready items + DeletionRetryPollInterval = 1 * time.Minute + // Maximum number of items to process per retry iteration + DeletionRetryBatchSize = 1000 + // Maximum number of error details to include in log messages + MaxLoggedErrorDetails = 10 + // Interval for polling the deletion queue for new items + // Using a prime number to de-synchronize with other periodic tasks + DeletionPollInterval = 1123 * time.Millisecond + // Maximum number of file IDs to delete per batch (roughly 20 bytes per file ID) + DeletionBatchSize = 100000 +) + +// retryablePatterns contains error message patterns that indicate temporary/transient conditions +// that should be retried. These patterns are based on actual error messages from the deletion pipeline. +var retryablePatterns = []string{ + "is read only", // Volume temporarily read-only (tiering, maintenance) + "error reading from server", // Network I/O errors + "connection reset by peer", // Network connection issues + "closed network connection", // Network connection closed unexpectedly + "connection refused", // Server temporarily unavailable + "timeout", // Operation timeout (network or server) + "deadline exceeded", // Context deadline exceeded + "context canceled", // Context cancellation (may be transient) + "lookup error", // Volume lookup failures + "lookup failed", // Volume server discovery issues + "too many requests", // Rate limiting / backpressure + "service unavailable", // HTTP 503 errors + "temporarily unavailable", // Temporary service issues + "try again", // Explicit retry suggestion + "i/o timeout", // Network I/O timeout + "broken pipe", // Connection broken during operation +} + +// DeletionRetryItem represents a file deletion that failed and needs to be retried +type DeletionRetryItem struct { + FileId string + RetryCount int + NextRetryAt time.Time + LastError string + heapIndex int // index in the heap (for heap.Interface) + inFlight bool // true when item is being processed, prevents duplicate additions +} + +// retryHeap implements heap.Interface for DeletionRetryItem +// Items are ordered by NextRetryAt (earliest first) +type retryHeap []*DeletionRetryItem + +// Compile-time assertion that retryHeap implements heap.Interface +var _ heap.Interface = (*retryHeap)(nil) + +func (h retryHeap) Len() int { return len(h) } + +func (h retryHeap) Less(i, j int) bool { + return h[i].NextRetryAt.Before(h[j].NextRetryAt) +} + +func (h retryHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j +} + +func (h *retryHeap) Push(x any) { + item := x.(*DeletionRetryItem) + item.heapIndex = len(*h) + *h = append(*h, item) +} + +func (h *retryHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.heapIndex = -1 // mark as removed + *h = old[0 : n-1] + return item +} + +// DeletionRetryQueue manages the queue of failed deletions that need to be retried. +// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items. +// +// LIMITATION: Current implementation stores retry queue in memory only. +// On filer restart, all pending retries are lost. With MaxRetryDelay up to 6 hours, +// process restarts during this window will cause retry state loss. +// +// TODO: Consider persisting retry queue to durable storage for production resilience: +// - Option 1: Leverage existing Filer store (KV operations) +// - Option 2: Periodic snapshots to disk with recovery on startup +// - Option 3: Write-ahead log for retry queue mutations +// - Trade-offs: Performance vs durability, complexity vs reliability +// +// For now, accepting in-memory storage as pragmatic initial implementation. +// Lost retries will be eventually consistent as files remain in deletion queue. +type DeletionRetryQueue struct { + heap retryHeap + itemIndex map[string]*DeletionRetryItem // for O(1) lookup by FileId + lock sync.Mutex +} + +// NewDeletionRetryQueue creates a new retry queue +func NewDeletionRetryQueue() *DeletionRetryQueue { + q := &DeletionRetryQueue{ + heap: make(retryHeap, 0), + itemIndex: make(map[string]*DeletionRetryItem), + } + heap.Init(&q.heap) + return q +} + +// calculateBackoff calculates the exponential backoff delay for a given retry count. +// Uses exponential backoff formula: InitialRetryDelay * 2^(retryCount-1) +// The first retry (retryCount=1) uses InitialRetryDelay, second uses 2x, third uses 4x, etc. +// Includes overflow protection and caps at MaxRetryDelay. +func calculateBackoff(retryCount int) time.Duration { + // The first retry is attempt 1, but shift should start at 0 + if retryCount <= 1 { + return InitialRetryDelay + } + + shiftAmount := uint(retryCount - 1) + + // time.Duration is an int64. A left shift of 63 or more will result in a + // negative number or zero. The multiplication can also overflow much earlier + // (around a shift of 25 for a 5-minute initial delay). + // The `delay <= 0` check below correctly catches all these overflow cases. + delay := InitialRetryDelay << shiftAmount + + if delay <= 0 || delay > MaxRetryDelay { + return MaxRetryDelay + } + + return delay +} + +// AddOrUpdate adds a new failed deletion or updates an existing one +// Time complexity: O(log N) for insertion/update +func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { + q.lock.Lock() + defer q.lock.Unlock() + + // Check if item already exists (including in-flight items) + if item, exists := q.itemIndex[fileId]; exists { + // Item is already in the queue or being processed. Just update the error. + // The existing retry schedule should proceed. + // RetryCount is only incremented in RequeueForRetry when an actual retry is performed. + item.LastError = errorMsg + if item.inFlight { + glog.V(2).Infof("retry for %s in-flight: attempt %d, will preserve retry state", fileId, item.RetryCount) + } else { + glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt)) + } + return + } + + // Add new item + delay := InitialRetryDelay + item := &DeletionRetryItem{ + FileId: fileId, + RetryCount: 1, + NextRetryAt: time.Now().Add(delay), + LastError: errorMsg, + inFlight: false, + } + heap.Push(&q.heap, item) + q.itemIndex[fileId] = item + glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay) +} + +// RequeueForRetry re-adds a previously failed item back to the queue with incremented retry count. +// This method MUST be used when re-queuing items from processRetryBatch to preserve retry state. +// Time complexity: O(log N) for insertion +func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg string) { + q.lock.Lock() + defer q.lock.Unlock() + + // Increment retry count + item.RetryCount++ + item.LastError = errorMsg + + // Calculate next retry time with exponential backoff + delay := calculateBackoff(item.RetryCount) + item.NextRetryAt = time.Now().Add(delay) + item.inFlight = false // Clear in-flight flag + glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay) + + // Re-add to heap (item still in itemIndex) + heap.Push(&q.heap, item) +} + +// GetReadyItems returns items that are ready to be retried and marks them as in-flight +// Time complexity: O(K log N) where K is the number of ready items +// Items are processed in order of NextRetryAt (earliest first) +func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { + q.lock.Lock() + defer q.lock.Unlock() + + now := time.Now() + var readyItems []*DeletionRetryItem + + // Peek at items from the top of the heap (earliest NextRetryAt) + for len(q.heap) > 0 && len(readyItems) < maxItems { + item := q.heap[0] + + // If the earliest item is not ready yet, no other items are ready either + if item.NextRetryAt.After(now) { + break + } + + // Remove from heap but keep in itemIndex with inFlight flag + heap.Pop(&q.heap) + + if item.RetryCount <= MaxRetryAttempts { + item.inFlight = true // Mark as being processed + readyItems = append(readyItems, item) + } else { + // Max attempts reached, log and discard completely + delete(q.itemIndex, item.FileId) + glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) + } + } + + return readyItems +} + +// Remove removes an item from the queue (called when deletion succeeds or fails permanently) +// Time complexity: O(1) +func (q *DeletionRetryQueue) Remove(item *DeletionRetryItem) { + q.lock.Lock() + defer q.lock.Unlock() + + // Item was already removed from heap by GetReadyItems, just remove from index + delete(q.itemIndex, item.FileId) +} + +// Size returns the current size of the retry queue +func (q *DeletionRetryQueue) Size() int { + q.lock.Lock() + defer q.lock.Unlock() + return len(q.heap) +} + func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) { return func(vids []string) (map[string]*operation.LookupResult, error) { m := make(map[string]*operation.LookupResult) @@ -41,64 +295,292 @@ func (f *Filer) loopProcessingDeletion() { lookupFunc := LookupByMasterClientFn(f.MasterClient) - DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. + // Start retry processor in a separate goroutine + go f.loopProcessingDeletionRetry(lookupFunc) + + ticker := time.NewTicker(DeletionPollInterval) + defer ticker.Stop() - var deletionCount int for { - deletionCount = 0 - f.fileIdDeletionQueue.Consume(func(fileIds []string) { - for len(fileIds) > 0 { - var toDeleteFileIds []string - if len(fileIds) > DeletionBatchSize { - toDeleteFileIds = fileIds[:DeletionBatchSize] - fileIds = fileIds[DeletionBatchSize:] - } else { - toDeleteFileIds = fileIds - fileIds = fileIds[:0] - } - deletionCount = len(toDeleteFileIds) - results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) - - // Process individual results for better error tracking - var successCount, notFoundCount, errorCount int - var errorDetails []string - - for _, result := range results { - if result.Error == "" { - successCount++ - } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { - // Already deleted - acceptable - notFoundCount++ - } else { - // Actual error - errorCount++ - if errorCount <= 10 { - // Only log first 10 errors to avoid flooding logs - errorDetails = append(errorDetails, result.FileId+": "+result.Error) - } + select { + case <-f.deletionQuit: + glog.V(0).Infof("deletion processor shutting down") + return + case <-ticker.C: + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + for i := 0; i < len(fileIds); i += DeletionBatchSize { + end := i + DeletionBatchSize + if end > len(fileIds) { + end = len(fileIds) } + toDeleteFileIds := fileIds[i:end] + f.processDeletionBatch(toDeleteFileIds, lookupFunc) } + }) + } + } +} - if successCount > 0 || notFoundCount > 0 { - glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount) - } +// processDeletionBatch handles deletion of a batch of file IDs and processes results. +// It classifies errors into retryable and permanent categories, adds retryable failures +// to the retry queue, and logs appropriate messages. +func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { + // Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch. + uniqueFileIdsSlice := make([]string, 0, len(toDeleteFileIds)) + processed := make(map[string]struct{}, len(toDeleteFileIds)) + for _, fileId := range toDeleteFileIds { + if _, found := processed[fileId]; !found { + processed[fileId] = struct{}{} + uniqueFileIdsSlice = append(uniqueFileIdsSlice, fileId) + } + } - if errorCount > 0 { - logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds)) - if errorCount > 10 { - logMessage += " (showing first 10)" - } - glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) + if len(uniqueFileIdsSlice) == 0 { + return + } + + // Delete files and classify outcomes + outcomes := deleteFilesAndClassify(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc) + + // Process outcomes + var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int + var errorDetails []string + + for _, fileId := range uniqueFileIdsSlice { + outcome := outcomes[fileId] + + switch outcome.status { + case deletionOutcomeSuccess: + successCount++ + case deletionOutcomeNotFound: + notFoundCount++ + case deletionOutcomeRetryable, deletionOutcomeNoResult: + retryableErrorCount++ + f.DeletionRetryQueue.AddOrUpdate(fileId, outcome.errorMsg) + if len(errorDetails) < MaxLoggedErrorDetails { + errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (will retry)") + } + case deletionOutcomePermanent: + permanentErrorCount++ + if len(errorDetails) < MaxLoggedErrorDetails { + errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (permanent)") + } + } + } + + if successCount > 0 || notFoundCount > 0 { + glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount) + } + + totalErrors := retryableErrorCount + permanentErrorCount + if totalErrors > 0 { + logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)", + totalErrors, len(uniqueFileIdsSlice), retryableErrorCount, permanentErrorCount) + if len(errorDetails) > 0 { + if totalErrors > MaxLoggedErrorDetails { + logMessage += fmt.Sprintf(" (showing first %d)", len(errorDetails)) + } + glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) + } else { + glog.V(0).Info(logMessage) + } + } + + if f.DeletionRetryQueue.Size() > 0 { + glog.V(2).Infof("retry queue size: %d", f.DeletionRetryQueue.Size()) + } +} + +const ( + deletionOutcomeSuccess = "success" + deletionOutcomeNotFound = "not_found" + deletionOutcomeRetryable = "retryable" + deletionOutcomePermanent = "permanent" + deletionOutcomeNoResult = "no_result" +) + +// deletionOutcome represents the result of classifying deletion results for a file +type deletionOutcome struct { + status string // One of the deletionOutcome* constants + errorMsg string +} + +// deleteFilesAndClassify performs deletion and classifies outcomes for a list of file IDs +func deleteFilesAndClassify(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) map[string]deletionOutcome { + // Perform deletion + results := operation.DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) + + // Group results by file ID to handle multiple results for replicated volumes + resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) + for _, result := range results { + resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) + } + + // Classify outcome for each file + outcomes := make(map[string]deletionOutcome, len(fileIds)) + for _, fileId := range fileIds { + outcomes[fileId] = classifyDeletionOutcome(fileId, resultsByFileId) + } + + return outcomes +} + +// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome +// Uses a single pass through results with early return for permanent errors (highest priority) +// Priority: Permanent > Retryable > Success > Not Found +func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome { + fileIdResults, found := resultsByFileId[fileId] + if !found || len(fileIdResults) == 0 { + return deletionOutcome{ + status: deletionOutcomeNoResult, + errorMsg: "no deletion result from volume server", + } + } + + var firstRetryableError string + hasSuccess := false + + for _, res := range fileIdResults { + if res.Error == "" { + hasSuccess = true + continue + } + if strings.Contains(res.Error, storage.ErrorDeleted.Error()) || res.Error == "not found" { + continue + } + + if isRetryableError(res.Error) { + if firstRetryableError == "" { + firstRetryableError = res.Error + } + } else { + // Permanent error takes highest precedence - return immediately + return deletionOutcome{status: deletionOutcomePermanent, errorMsg: res.Error} + } + } + + if firstRetryableError != "" { + return deletionOutcome{status: deletionOutcomeRetryable, errorMsg: firstRetryableError} + } + + if hasSuccess { + return deletionOutcome{status: deletionOutcomeSuccess, errorMsg: ""} + } + + // If we are here, all results were "not found" + return deletionOutcome{status: deletionOutcomeNotFound, errorMsg: ""} +} + +// isRetryableError determines if an error is retryable based on its message. +// +// Current implementation uses string matching which is brittle and may break +// if error messages change in dependencies. This is acceptable for the initial +// implementation but should be improved in the future. +// +// TODO: Consider these improvements for more robust error handling: +// - Pass DeleteResult instead of just error string to access Status codes +// - Use HTTP status codes (503 Service Unavailable, 429 Too Many Requests, etc.) +// - Implement structured error types that can be checked with errors.Is/errors.As +// - Extract and check gRPC status codes for better classification +// - Add error wrapping in the deletion pipeline to preserve error context +// +// For now, we use conservative string matching for known transient error patterns. +func isRetryableError(errorMsg string) bool { + // Empty errors are not retryable + if errorMsg == "" { + return false + } + + errorLower := strings.ToLower(errorMsg) + for _, pattern := range retryablePatterns { + if strings.Contains(errorLower, pattern) { + return true + } + } + return false +} + +// loopProcessingDeletionRetry processes the retry queue for failed deletions +func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { + + ticker := time.NewTicker(DeletionRetryPollInterval) + defer ticker.Stop() + + for { + select { + case <-f.deletionQuit: + glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", f.DeletionRetryQueue.Size()) + return + case <-ticker.C: + // Process all ready items in batches until queue is empty + totalProcessed := 0 + for { + readyItems := f.DeletionRetryQueue.GetReadyItems(DeletionRetryBatchSize) + if len(readyItems) == 0 { + break } + + f.processRetryBatch(readyItems, lookupFunc) + totalProcessed += len(readyItems) } - }) - if deletionCount == 0 { - time.Sleep(1123 * time.Millisecond) + if totalProcessed > 0 { + glog.V(1).Infof("retried deletion of %d files", totalProcessed) + } } } } +// processRetryBatch attempts to retry deletion of files and processes results. +// Successfully deleted items are removed from tracking, retryable failures are +// re-queued with updated retry counts, and permanent errors are logged and discarded. +func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { + // Extract file IDs from retry items + fileIds := make([]string, 0, len(readyItems)) + for _, item := range readyItems { + fileIds = append(fileIds, item.FileId) + } + + // Delete files and classify outcomes + outcomes := deleteFilesAndClassify(f.GrpcDialOption, fileIds, lookupFunc) + + // Process outcomes - iterate over readyItems to ensure all items are accounted for + var successCount, notFoundCount, retryCount, permanentErrorCount int + for _, item := range readyItems { + outcome := outcomes[item.FileId] + + switch outcome.status { + case deletionOutcomeSuccess: + successCount++ + f.DeletionRetryQueue.Remove(item) // Remove from queue (success) + glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount) + case deletionOutcomeNotFound: + notFoundCount++ + f.DeletionRetryQueue.Remove(item) // Remove from queue (already deleted) + case deletionOutcomeRetryable, deletionOutcomeNoResult: + retryCount++ + if outcome.status == deletionOutcomeNoResult { + glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId) + } + f.DeletionRetryQueue.RequeueForRetry(item, outcome.errorMsg) + case deletionOutcomePermanent: + permanentErrorCount++ + f.DeletionRetryQueue.Remove(item) // Remove from queue (permanent failure) + glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, outcome.errorMsg) + } + } + + if successCount > 0 || notFoundCount > 0 { + glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount) + } + if retryCount > 0 { + glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount) + } + if permanentErrorCount > 0 { + glog.Warningf("retry: %d files failed with permanent errors", permanentErrorCount) + } +} + func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { f.doDeleteChunks(ctx, chunks) } diff --git a/weed/filer/filer_deletion_test.go b/weed/filer/filer_deletion_test.go new file mode 100644 index 000000000..77ac2310f --- /dev/null +++ b/weed/filer/filer_deletion_test.go @@ -0,0 +1,308 @@ +package filer + +import ( + "container/heap" + "testing" + "time" +) + +func TestDeletionRetryQueue_AddAndRetrieve(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Add items + queue.AddOrUpdate("file1", "is read only") + queue.AddOrUpdate("file2", "connection reset") + + if queue.Size() != 2 { + t.Errorf("Expected queue size 2, got %d", queue.Size()) + } + + // Items not ready yet (initial delay is 5 minutes) + readyItems := queue.GetReadyItems(10) + if len(readyItems) != 0 { + t.Errorf("Expected 0 ready items, got %d", len(readyItems)) + } + + // Size should remain unchanged + if queue.Size() != 2 { + t.Errorf("Expected queue size 2 after checking ready items, got %d", queue.Size()) + } +} + +func TestDeletionRetryQueue_ExponentialBackoff(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Create an item + item := &DeletionRetryItem{ + FileId: "test-file", + RetryCount: 0, + NextRetryAt: time.Now(), + LastError: "test error", + } + + // Requeue multiple times to test backoff + delays := []time.Duration{} + + for i := 0; i < 5; i++ { + beforeTime := time.Now() + queue.RequeueForRetry(item, "error") + + // Calculate expected delay for this retry count + expectedDelay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + expectedDelay = MaxRetryDelay + } + + // Verify NextRetryAt is approximately correct + actualDelay := item.NextRetryAt.Sub(beforeTime) + delays = append(delays, actualDelay) + + // Allow small timing variance + timeDiff := actualDelay - expectedDelay + if timeDiff < 0 { + timeDiff = -timeDiff + } + if timeDiff > 100*time.Millisecond { + t.Errorf("Retry %d: expected delay ~%v, got %v (diff: %v)", i+1, expectedDelay, actualDelay, timeDiff) + } + + // Verify retry count incremented + if item.RetryCount != i+1 { + t.Errorf("Expected RetryCount %d, got %d", i+1, item.RetryCount) + } + + // Reset the heap for the next isolated test iteration + queue.lock.Lock() + queue.heap = retryHeap{} + queue.lock.Unlock() + } + + t.Logf("Exponential backoff delays: %v", delays) +} + +func TestDeletionRetryQueue_OverflowProtection(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Create an item with very high retry count + item := &DeletionRetryItem{ + FileId: "test-file", + RetryCount: 60, // High count that would cause overflow without protection + NextRetryAt: time.Now(), + LastError: "test error", + } + + // Should not panic and should cap at MaxRetryDelay + queue.RequeueForRetry(item, "error") + + delay := time.Until(item.NextRetryAt) + if delay > MaxRetryDelay+time.Second { + t.Errorf("Delay exceeded MaxRetryDelay: %v > %v", delay, MaxRetryDelay) + } +} + +func TestDeletionRetryQueue_MaxAttemptsReached(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Add item + queue.AddOrUpdate("file1", "error") + + // Manually set retry count to max + queue.lock.Lock() + item, exists := queue.itemIndex["file1"] + if !exists { + queue.lock.Unlock() + t.Fatal("Item not found in queue") + } + item.RetryCount = MaxRetryAttempts + item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now + heap.Fix(&queue.heap, item.heapIndex) + queue.lock.Unlock() + + // Try to get ready items - should be returned for the last retry (attempt #10) + readyItems := queue.GetReadyItems(10) + if len(readyItems) != 1 { + t.Fatalf("Expected 1 item for last retry, got %d", len(readyItems)) + } + + // Requeue it, which will increment its retry count beyond the max + queue.RequeueForRetry(readyItems[0], "final error") + + // Manually make it ready again + queue.lock.Lock() + item, exists = queue.itemIndex["file1"] + if !exists { + queue.lock.Unlock() + t.Fatal("Item not found in queue after requeue") + } + item.NextRetryAt = time.Now().Add(-1 * time.Second) + heap.Fix(&queue.heap, item.heapIndex) + queue.lock.Unlock() + + // Now it should be discarded (retry count is 11, exceeds max of 10) + readyItems = queue.GetReadyItems(10) + if len(readyItems) != 0 { + t.Errorf("Expected 0 items (max attempts exceeded), got %d", len(readyItems)) + } + + // Should be removed from queue + if queue.Size() != 0 { + t.Errorf("Expected queue size 0 after max attempts exceeded, got %d", queue.Size()) + } +} + +func TestCalculateBackoff(t *testing.T) { + testCases := []struct { + retryCount int + expectedDelay time.Duration + description string + }{ + {1, InitialRetryDelay, "first retry"}, + {2, InitialRetryDelay * 2, "second retry"}, + {3, InitialRetryDelay * 4, "third retry"}, + {4, InitialRetryDelay * 8, "fourth retry"}, + {5, InitialRetryDelay * 16, "fifth retry"}, + {10, MaxRetryDelay, "capped at max delay"}, + {65, MaxRetryDelay, "overflow protection (shift > 63)"}, + {100, MaxRetryDelay, "very high retry count"}, + } + + for _, tc := range testCases { + result := calculateBackoff(tc.retryCount) + if result != tc.expectedDelay { + t.Errorf("%s (retry %d): expected %v, got %v", + tc.description, tc.retryCount, tc.expectedDelay, result) + } + } +} + +func TestIsRetryableError(t *testing.T) { + testCases := []struct { + error string + retryable bool + description string + }{ + {"volume 123 is read only", true, "read-only volume"}, + {"connection reset by peer", true, "connection reset"}, + {"timeout exceeded", true, "timeout"}, + {"deadline exceeded", true, "deadline exceeded"}, + {"context canceled", true, "context canceled"}, + {"lookup error: volume not found", true, "lookup error"}, + {"connection refused", true, "connection refused"}, + {"too many requests", true, "rate limiting"}, + {"service unavailable", true, "service unavailable"}, + {"i/o timeout", true, "I/O timeout"}, + {"broken pipe", true, "broken pipe"}, + {"not found", false, "not found (not retryable)"}, + {"invalid file id", false, "invalid input (not retryable)"}, + {"", false, "empty error"}, + } + + for _, tc := range testCases { + result := isRetryableError(tc.error) + if result != tc.retryable { + t.Errorf("%s: expected retryable=%v, got %v for error: %q", + tc.description, tc.retryable, result, tc.error) + } + } +} + +func TestDeletionRetryQueue_HeapOrdering(t *testing.T) { + queue := NewDeletionRetryQueue() + + now := time.Now() + + // Add items with different retry times (out of order) + items := []*DeletionRetryItem{ + {FileId: "file3", RetryCount: 1, NextRetryAt: now.Add(30 * time.Second), LastError: "error3"}, + {FileId: "file1", RetryCount: 1, NextRetryAt: now.Add(10 * time.Second), LastError: "error1"}, + {FileId: "file2", RetryCount: 1, NextRetryAt: now.Add(20 * time.Second), LastError: "error2"}, + } + + // Add items directly (simulating internal state) + for _, item := range items { + queue.lock.Lock() + queue.itemIndex[item.FileId] = item + queue.heap = append(queue.heap, item) + queue.lock.Unlock() + } + + // Use container/heap.Init to establish heap property + queue.lock.Lock() + heap.Init(&queue.heap) + queue.lock.Unlock() + + // Verify heap maintains min-heap property (earliest time at top) + queue.lock.Lock() + if queue.heap[0].FileId != "file1" { + t.Errorf("Expected file1 at heap top (earliest time), got %s", queue.heap[0].FileId) + } + queue.lock.Unlock() + + // Set all items to ready while preserving their relative order + queue.lock.Lock() + for _, item := range queue.itemIndex { + // Shift all times back by 40 seconds to make them ready, but preserve order + item.NextRetryAt = item.NextRetryAt.Add(-40 * time.Second) + } + heap.Init(&queue.heap) // Re-establish heap property after modification + queue.lock.Unlock() + + // GetReadyItems should return in NextRetryAt order + readyItems := queue.GetReadyItems(10) + expectedOrder := []string{"file1", "file2", "file3"} + + if len(readyItems) != 3 { + t.Fatalf("Expected 3 ready items, got %d", len(readyItems)) + } + + for i, item := range readyItems { + if item.FileId != expectedOrder[i] { + t.Errorf("Item %d: expected %s, got %s", i, expectedOrder[i], item.FileId) + } + } +} + +func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Add same file ID twice with retryable error - simulates duplicate in batch + queue.AddOrUpdate("file1", "timeout error") + + // Verify only one item exists in queue + if queue.Size() != 1 { + t.Fatalf("Expected queue size 1 after first add, got %d", queue.Size()) + } + + // Get initial retry count + queue.lock.Lock() + item1, exists := queue.itemIndex["file1"] + if !exists { + queue.lock.Unlock() + t.Fatal("Item not found in queue after first add") + } + initialRetryCount := item1.RetryCount + queue.lock.Unlock() + + // Add same file ID again - should NOT increment retry count (just update error) + queue.AddOrUpdate("file1", "timeout error again") + + // Verify still only one item exists in queue (not duplicated) + if queue.Size() != 1 { + t.Errorf("Expected queue size 1 after duplicate add, got %d (duplicates detected)", queue.Size()) + } + + // Verify retry count did NOT increment (AddOrUpdate only updates error, not count) + queue.lock.Lock() + item2, exists := queue.itemIndex["file1"] + queue.lock.Unlock() + + if !exists { + t.Fatal("Item not found in queue after second add") + } + if item2.RetryCount != initialRetryCount { + t.Errorf("Expected RetryCount to stay at %d after duplicate add (should not increment), got %d", initialRetryCount, item2.RetryCount) + } + if item2.LastError != "timeout error again" { + t.Errorf("Expected LastError to be updated to 'timeout error again', got %q", item2.LastError) + } +}