From 476db20bfedf6c48ec9d35d7b1b4afc8d2533245 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 29 Oct 2025 17:27:17 -0700 Subject: [PATCH] address comment --- weed/filer/filer_deletion.go | 101 +++++++++++++++++++++--------- weed/filer/filer_deletion_test.go | 26 +++++--- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index c578ce174..50617d80a 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -164,13 +164,11 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { // Check if item already exists if item, exists := q.itemIndex[fileId]; exists { - item.RetryCount++ + // Item is already in the queue. Just update the error. + // The existing retry schedule should proceed. + // RetryCount is only incremented in RequeueForRetry when an actual retry is performed. item.LastError = errorMsg - delay := calculateBackoff(item.RetryCount) - item.NextRetryAt = time.Now().Add(delay) - // Re-heapify since NextRetryAt changed - heap.Fix(&q.heap, item.heapIndex) - glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay) + glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt)) return } @@ -311,43 +309,86 @@ func (f *Filer) loopProcessingDeletion() { // to the retry queue, and logs appropriate messages. func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { // Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch. - uniqueFileIds := make([]string, 0, len(toDeleteFileIds)) + uniqueFileIdsSlice := make([]string, 0, len(toDeleteFileIds)) processed := make(map[string]struct{}, len(toDeleteFileIds)) for _, fileId := range toDeleteFileIds { if _, found := processed[fileId]; !found { processed[fileId] = struct{}{} - uniqueFileIds = append(uniqueFileIds, fileId) + uniqueFileIdsSlice = append(uniqueFileIdsSlice, fileId) } } - if len(uniqueFileIds) == 0 { + if len(uniqueFileIdsSlice) == 0 { return } - results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIds, lookupFunc) + results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc) - // Process individual results for better error tracking + // Group results by file ID to handle multiple results for replicated volumes + resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) + for _, result := range results { + resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) + } + + // Process results var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int var errorDetails []string - for _, result := range results { - if result.Error == "" { - successCount++ - } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { - // Already deleted - acceptable - notFoundCount++ - } else if isRetryableError(result.Error) { - // Retryable error - add to retry queue + for _, fileId := range uniqueFileIdsSlice { + fileIdResults, found := resultsByFileId[fileId] + if !found { + // This can happen if lookup fails for the volume. + // To be safe, we treat it as a retryable error. retryableErrorCount++ - f.DeletionRetryQueue.AddOrUpdate(result.FileId, result.Error) + f.DeletionRetryQueue.AddOrUpdate(fileId, "no deletion result from volume server") if len(errorDetails) < MaxLoggedErrorDetails { - errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)") + errorDetails = append(errorDetails, fileId+": no deletion result (will retry)") } - } else { - // Permanent error - log but don't retry + continue + } + + var firstRetryableError, firstPermanentError string + allSuccessOrNotFound := true + + for _, res := range fileIdResults { + if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) { + allSuccessOrNotFound = false + if isRetryableError(res.Error) { + if firstRetryableError == "" { + firstRetryableError = res.Error + } + } else { + if firstPermanentError == "" { + firstPermanentError = res.Error + } + } + } + } + + // Determine overall outcome: permanent errors take precedence, then retryable errors + if firstPermanentError != "" { permanentErrorCount++ if len(errorDetails) < MaxLoggedErrorDetails { - errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)") + errorDetails = append(errorDetails, fileId+": "+firstPermanentError+" (permanent)") + } + } else if firstRetryableError != "" { + retryableErrorCount++ + f.DeletionRetryQueue.AddOrUpdate(fileId, firstRetryableError) + if len(errorDetails) < MaxLoggedErrorDetails { + errorDetails = append(errorDetails, fileId+": "+firstRetryableError+" (will retry)") + } + } else if allSuccessOrNotFound { + isPureSuccess := true + for _, res := range fileIdResults { + if res.Error != "" { + isPureSuccess = false + break + } + } + if isPureSuccess { + successCount++ + } else { + notFoundCount++ } } } @@ -359,11 +400,15 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ totalErrors := retryableErrorCount + permanentErrorCount if totalErrors > 0 { logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)", - totalErrors, len(uniqueFileIds), retryableErrorCount, permanentErrorCount) - if totalErrors > MaxLoggedErrorDetails { - logMessage += fmt.Sprintf(" (showing first %d)", MaxLoggedErrorDetails) + totalErrors, len(uniqueFileIdsSlice), retryableErrorCount, permanentErrorCount) + if len(errorDetails) > 0 { + if totalErrors > MaxLoggedErrorDetails { + logMessage += fmt.Sprintf(" (showing first %d)", len(errorDetails)) + } + glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) + } else { + glog.V(0).Info(logMessage) } - glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) } if f.DeletionRetryQueue.Size() > 0 { diff --git a/weed/filer/filer_deletion_test.go b/weed/filer/filer_deletion_test.go index 8aa7f87ca..a6b7e8164 100644 --- a/weed/filer/filer_deletion_test.go +++ b/weed/filer/filer_deletion_test.go @@ -274,7 +274,17 @@ func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) { t.Fatalf("Expected queue size 1 after first add, got %d", queue.Size()) } - // Add same file ID again - should increment retry count (simulates duplicate) + // Get initial retry count + queue.lock.Lock() + item1, exists := queue.itemIndex["file1"] + if !exists { + queue.lock.Unlock() + t.Fatal("Item not found in queue after first add") + } + initialRetryCount := item1.RetryCount + queue.lock.Unlock() + + // Add same file ID again - should NOT increment retry count (just update error) queue.AddOrUpdate("file1", "timeout error again") // Verify still only one item exists in queue (not duplicated) @@ -282,16 +292,18 @@ func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) { t.Errorf("Expected queue size 1 after duplicate add, got %d (duplicates detected)", queue.Size()) } - // Verify retry count incremented to 2 by checking internal state - // Note: This documents the current behavior - AddOrUpdate increments retry count on duplicate + // Verify retry count did NOT increment (AddOrUpdate only updates error, not count) queue.lock.Lock() - item, exists := queue.itemIndex["file1"] + item2, exists := queue.itemIndex["file1"] queue.lock.Unlock() if !exists { - t.Fatal("Item not found in queue") + t.Fatal("Item not found in queue after second add") + } + if item2.RetryCount != initialRetryCount { + t.Errorf("Expected RetryCount to stay at %d after duplicate add (should not increment), got %d", initialRetryCount, item2.RetryCount) } - if item.RetryCount != 2 { - t.Errorf("Expected RetryCount 2 after duplicate add (each AddOrUpdate increments), got %d", item.RetryCount) + if item2.LastError != "timeout error again" { + t.Errorf("Expected LastError to be updated to 'timeout error again', got %q", item2.LastError) } }