From 00db26af68db34328af8d9dbae775b85ad999690 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 29 Oct 2025 15:00:39 -0700 Subject: [PATCH] address comments --- weed/filer/filer.go | 2 + weed/filer/filer_deletion.go | 91 ++++++++++++++++++++----------- weed/filer/filer_deletion_test.go | 53 ++++++++++++------ 3 files changed, 97 insertions(+), 49 deletions(-) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 2ae940a03..b86ac3c5b 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -55,6 +55,7 @@ type Filer struct { Dlm *lock_manager.DistributedLockManager MaxFilenameLength uint32 deletionQuit chan struct{} + DeletionRetryQueue *DeletionRetryQueue } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -68,6 +69,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH Dlm: lock_manager.NewDistributedLockManager(filerHost), MaxFilenameLength: maxFilenameLength, deletionQuit: make(chan struct{}), + DeletionRetryQueue: NewDeletionRetryQueue(), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 1fb071000..2ecc4a335 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -231,7 +231,7 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { heap.Pop(&q.heap) delete(q.itemIndex, item.FileId) - if item.RetryCount < MaxRetryAttempts { + if item.RetryCount <= MaxRetryAttempts { readyItems = append(readyItems, item) } else { // Max attempts reached, log and discard @@ -277,11 +277,8 @@ func (f *Filer) loopProcessingDeletion() { DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. - // Create retry queue - retryQueue := NewDeletionRetryQueue() - // Start retry processor in a separate goroutine - go f.loopProcessingDeletionRetry(lookupFunc, retryQueue) + go f.loopProcessingDeletionRetry(lookupFunc) ticker := time.NewTicker(DeletionPollInterval) defer ticker.Stop() @@ -302,7 +299,7 @@ func (f *Filer) loopProcessingDeletion() { toDeleteFileIds = fileIds fileIds = fileIds[:0] } - f.processDeletionBatch(toDeleteFileIds, lookupFunc, retryQueue) + f.processDeletionBatch(toDeleteFileIds, lookupFunc) } }) } @@ -312,7 +309,7 @@ func (f *Filer) loopProcessingDeletion() { // processDeletionBatch handles deletion of a batch of file IDs and processes results. // It classifies errors into retryable and permanent categories, adds retryable failures // to the retry queue, and logs appropriate messages. -func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) { +func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) // Process individual results for better error tracking @@ -328,7 +325,7 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ } else if isRetryableError(result.Error) { // Retryable error - add to retry queue retryableErrorCount++ - retryQueue.AddOrUpdate(result.FileId, result.Error) + f.DeletionRetryQueue.AddOrUpdate(result.FileId, result.Error) if len(errorDetails) < MaxLoggedErrorDetails { errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)") } @@ -355,8 +352,8 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) } - if retryQueue.Size() > 0 { - glog.V(2).Infof("retry queue size: %d", retryQueue.Size()) + if f.DeletionRetryQueue.Size() > 0 { + glog.V(2).Infof("retry queue size: %d", f.DeletionRetryQueue.Size()) } } @@ -390,7 +387,7 @@ func isRetryableError(errorMsg string) bool { } // loopProcessingDeletionRetry processes the retry queue for failed deletions -func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) { +func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { ticker := time.NewTicker(DeletionRetryPollInterval) defer ticker.Stop() @@ -398,18 +395,18 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin for { select { case <-f.deletionQuit: - glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", retryQueue.Size()) + glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", f.DeletionRetryQueue.Size()) return case <-ticker.C: // Get items that are ready to retry - readyItems := retryQueue.GetReadyItems(DeletionRetryBatchSize) + readyItems := f.DeletionRetryQueue.GetReadyItems(DeletionRetryBatchSize) if len(readyItems) == 0 { continue } glog.V(1).Infof("retrying deletion of %d files", len(readyItems)) - f.processRetryBatch(readyItems, lookupFunc, retryQueue) + f.processRetryBatch(readyItems, lookupFunc) } } } @@ -417,7 +414,7 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin // processRetryBatch attempts to retry deletion of files and processes results. // Successfully deleted items are removed from tracking, retryable failures are // re-queued with updated retry counts, and permanent errors are logged and discarded. -func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) { +func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) { // Extract file IDs from retry items fileIds := make([]string, 0, len(readyItems)) for _, item := range readyItems { @@ -427,39 +424,67 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu // Attempt deletion results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - // Create a map for quick lookup of results - resultsByFileId := make(map[string]*volume_server_pb.DeleteResult, len(results)) + // Group results by file ID to handle multiple results for replicated volumes + resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) for _, result := range results { - resultsByFileId[result.FileId] = result + resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) } // Process results - iterate over readyItems to ensure all items are accounted for var successCount, notFoundCount, retryCount, permanentErrorCount int for _, item := range readyItems { - result, found := resultsByFileId[item.FileId] + fileIdResults, found := resultsByFileId[item.FileId] if !found { // No result returned for this file ID - could indicate a bug or edge case. // Re-queue to avoid silent data loss. glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId) - retryQueue.RequeueForRetry(item, "no deletion result from volume server") + f.DeletionRetryQueue.RequeueForRetry(item, "no deletion result from volume server") retryCount++ continue } - if result.Error == "" { - successCount++ - glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount) - } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { - // Already deleted - success - notFoundCount++ - } else if isRetryableError(result.Error) { - // Still failing, re-queue with preserved retry count - retryCount++ - retryQueue.RequeueForRetry(item, result.Error) - } else { - // Permanent error on retry - give up + // Aggregate results for all replicas + var firstRetryableError string + var firstPermanentError string + allSuccessOrNotFound := true + + for _, res := range fileIdResults { + if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) { + allSuccessOrNotFound = false + if isRetryableError(res.Error) { + if firstRetryableError == "" { + firstRetryableError = res.Error + } + } else { + if firstPermanentError == "" { + firstPermanentError = res.Error + } + } + } + } + + // Determine overall outcome: permanent errors take precedence, then retryable errors + if firstPermanentError != "" { permanentErrorCount++ - glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error) + glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, firstPermanentError) + } else if firstRetryableError != "" { + retryCount++ + f.DeletionRetryQueue.RequeueForRetry(item, firstRetryableError) + } else if allSuccessOrNotFound { + // Check if it's pure success or "not found" + isPureSuccess := true + for _, res := range fileIdResults { + if res.Error != "" { + isPureSuccess = false + break + } + } + if isPureSuccess { + successCount++ + glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount) + } else { + notFoundCount++ + } } } diff --git a/weed/filer/filer_deletion_test.go b/weed/filer/filer_deletion_test.go index 5b5c7bc3e..3c91ebf16 100644 --- a/weed/filer/filer_deletion_test.go +++ b/weed/filer/filer_deletion_test.go @@ -104,29 +104,50 @@ func TestDeletionRetryQueue_OverflowProtection(t *testing.T) { func TestDeletionRetryQueue_MaxAttemptsReached(t *testing.T) { queue := NewDeletionRetryQueue() - // Add item and set retry count near max + // Add item queue.AddOrUpdate("file1", "error") - // Manually set high retry count + // Manually set retry count to max queue.lock.Lock() - if item, exists := queue.itemIndex["file1"]; exists { - item.RetryCount = MaxRetryAttempts - item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now + item, exists := queue.itemIndex["file1"] + if !exists { queue.lock.Unlock() + t.Fatal("Item not found in queue") + } + item.RetryCount = MaxRetryAttempts + item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now + heap.Fix(&queue.heap, item.heapIndex) + queue.lock.Unlock() - // Try to get ready items - should be discarded - readyItems := queue.GetReadyItems(10) - if len(readyItems) != 0 { - t.Errorf("Expected 0 items (max attempts reached), got %d", len(readyItems)) - } + // Try to get ready items - should be returned for the last retry (attempt #10) + readyItems := queue.GetReadyItems(10) + if len(readyItems) != 1 { + t.Fatalf("Expected 1 item for last retry, got %d", len(readyItems)) + } - // Should be removed from queue - if queue.Size() != 0 { - t.Errorf("Expected queue size 0 after max attempts, got %d", queue.Size()) - } - } else { + // Requeue it, which will increment its retry count beyond the max + queue.RequeueForRetry(readyItems[0], "final error") + + // Manually make it ready again + queue.lock.Lock() + item, exists = queue.itemIndex["file1"] + if !exists { queue.lock.Unlock() - t.Fatal("Item not found in queue") + t.Fatal("Item not found in queue after requeue") + } + item.NextRetryAt = time.Now().Add(-1 * time.Second) + heap.Fix(&queue.heap, item.heapIndex) + queue.lock.Unlock() + + // Now it should be discarded (retry count is 11, exceeds max of 10) + readyItems = queue.GetReadyItems(10) + if len(readyItems) != 0 { + t.Errorf("Expected 0 items (max attempts exceeded), got %d", len(readyItems)) + } + + // Should be removed from queue + if queue.Size() != 0 { + t.Errorf("Expected queue size 0 after max attempts exceeded, got %d", queue.Size()) } }