diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 50617d80a..795a0f9b2 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -62,7 +62,8 @@ type DeletionRetryItem struct { RetryCount int NextRetryAt time.Time LastError string - heapIndex int // index in the heap (for heap.Interface) + heapIndex int // index in the heap (for heap.Interface) + inFlight bool // true when item is being processed, prevents duplicate additions } // retryHeap implements heap.Interface for DeletionRetryItem @@ -162,13 +163,17 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { q.lock.Lock() defer q.lock.Unlock() - // Check if item already exists + // Check if item already exists (including in-flight items) if item, exists := q.itemIndex[fileId]; exists { - // Item is already in the queue. Just update the error. + // Item is already in the queue or being processed. Just update the error. // The existing retry schedule should proceed. // RetryCount is only incremented in RequeueForRetry when an actual retry is performed. item.LastError = errorMsg - glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt)) + if item.inFlight { + glog.V(2).Infof("retry for %s in-flight: attempt %d, will preserve retry state", fileId, item.RetryCount) + } else { + glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt)) + } return } @@ -179,6 +184,7 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { RetryCount: 1, NextRetryAt: time.Now().Add(delay), LastError: errorMsg, + inFlight: false, } heap.Push(&q.heap, item) q.itemIndex[fileId] = item @@ -199,14 +205,14 @@ func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg s // Calculate next retry time with exponential backoff delay := calculateBackoff(item.RetryCount) item.NextRetryAt = time.Now().Add(delay) + item.inFlight = false // Clear in-flight flag glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay) - // Re-add to heap and index + // Re-add to heap (item still in itemIndex) heap.Push(&q.heap, item) - q.itemIndex[item.FileId] = item } -// GetReadyItems returns items that are ready to be retried and removes them from the queue +// GetReadyItems returns items that are ready to be retried and marks them as in-flight // Time complexity: O(K log N) where K is the number of ready items // Items are processed in order of NextRetryAt (earliest first) func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { @@ -225,14 +231,15 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { break } - // Remove from heap and index + // Remove from heap but keep in itemIndex with inFlight flag heap.Pop(&q.heap) - delete(q.itemIndex, item.FileId) if item.RetryCount <= MaxRetryAttempts { + item.inFlight = true // Mark as being processed readyItems = append(readyItems, item) } else { - // Max attempts reached, log and discard + // Max attempts reached, log and discard completely + delete(q.itemIndex, item.FileId) glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) } } @@ -240,6 +247,16 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { return readyItems } +// Remove removes an item from the queue (called when deletion succeeds or fails permanently) +// Time complexity: O(1) +func (q *DeletionRetryQueue) Remove(item *DeletionRetryItem) { + q.lock.Lock() + defer q.lock.Unlock() + + // Item was already removed from heap by GetReadyItems, just remove from index + delete(q.itemIndex, item.FileId) +} + // Size returns the current size of the retry queue func (q *DeletionRetryQueue) Size() int { q.lock.Lock() @@ -335,60 +352,23 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ var errorDetails []string for _, fileId := range uniqueFileIdsSlice { - fileIdResults, found := resultsByFileId[fileId] - if !found { - // This can happen if lookup fails for the volume. - // To be safe, we treat it as a retryable error. + outcome := classifyDeletionOutcome(fileId, resultsByFileId) + + switch outcome.status { + case "success": + successCount++ + case "not_found": + notFoundCount++ + case "retryable", "no_result": retryableErrorCount++ - f.DeletionRetryQueue.AddOrUpdate(fileId, "no deletion result from volume server") + f.DeletionRetryQueue.AddOrUpdate(fileId, outcome.errorMsg) if len(errorDetails) < MaxLoggedErrorDetails { - errorDetails = append(errorDetails, fileId+": no deletion result (will retry)") - } - continue - } - - var firstRetryableError, firstPermanentError string - allSuccessOrNotFound := true - - for _, res := range fileIdResults { - if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) { - allSuccessOrNotFound = false - if isRetryableError(res.Error) { - if firstRetryableError == "" { - firstRetryableError = res.Error - } - } else { - if firstPermanentError == "" { - firstPermanentError = res.Error - } - } + errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (will retry)") } - } - - // Determine overall outcome: permanent errors take precedence, then retryable errors - if firstPermanentError != "" { + case "permanent": permanentErrorCount++ if len(errorDetails) < MaxLoggedErrorDetails { - errorDetails = append(errorDetails, fileId+": "+firstPermanentError+" (permanent)") - } - } else if firstRetryableError != "" { - retryableErrorCount++ - f.DeletionRetryQueue.AddOrUpdate(fileId, firstRetryableError) - if len(errorDetails) < MaxLoggedErrorDetails { - errorDetails = append(errorDetails, fileId+": "+firstRetryableError+" (will retry)") - } - } else if allSuccessOrNotFound { - isPureSuccess := true - for _, res := range fileIdResults { - if res.Error != "" { - isPureSuccess = false - break - } - } - if isPureSuccess { - successCount++ - } else { - notFoundCount++ + errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (permanent)") } } } @@ -416,6 +396,64 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ } } +// deletionOutcome represents the result of classifying deletion results for a file +type deletionOutcome struct { + status string // "success", "not_found", "retryable", "permanent", "no_result" + errorMsg string +} + +// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome +func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome { + fileIdResults, found := resultsByFileId[fileId] + if !found { + return deletionOutcome{ + status: "no_result", + errorMsg: "no deletion result from volume server", + } + } + + var firstRetryableError, firstPermanentError string + allSuccessOrNotFound := true + + for _, res := range fileIdResults { + if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) { + allSuccessOrNotFound = false + if isRetryableError(res.Error) { + if firstRetryableError == "" { + firstRetryableError = res.Error + } + } else { + if firstPermanentError == "" { + firstPermanentError = res.Error + } + } + } + } + + // Determine overall outcome: permanent errors take precedence, then retryable errors + if firstPermanentError != "" { + return deletionOutcome{status: "permanent", errorMsg: firstPermanentError} + } else if firstRetryableError != "" { + return deletionOutcome{status: "retryable", errorMsg: firstRetryableError} + } else if allSuccessOrNotFound { + // Check if it's pure success or "not found" + isPureSuccess := true + for _, res := range fileIdResults { + if res.Error != "" { + isPureSuccess = false + break + } + } + if isPureSuccess { + return deletionOutcome{status: "success", errorMsg: ""} + } + return deletionOutcome{status: "not_found", errorMsg: ""} + } + + // Shouldn't reach here, but return retryable as safe default + return deletionOutcome{status: "retryable", errorMsg: "unknown error"} +} + // isRetryableError determines if an error is retryable based on its message. // // Current implementation uses string matching which is brittle and may break @@ -498,58 +536,26 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu // Process results - iterate over readyItems to ensure all items are accounted for var successCount, notFoundCount, retryCount, permanentErrorCount int for _, item := range readyItems { - fileIdResults, found := resultsByFileId[item.FileId] - if !found { - // No result returned for this file ID - could indicate a bug or edge case. - // Re-queue to avoid silent data loss. - glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId) - f.DeletionRetryQueue.RequeueForRetry(item, "no deletion result from volume server") + outcome := classifyDeletionOutcome(item.FileId, resultsByFileId) + + switch outcome.status { + case "success": + successCount++ + f.DeletionRetryQueue.Remove(item) // Remove from queue (success) + glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount) + case "not_found": + notFoundCount++ + f.DeletionRetryQueue.Remove(item) // Remove from queue (already deleted) + case "retryable", "no_result": retryCount++ - continue - } - - // Aggregate results for all replicas - var firstRetryableError string - var firstPermanentError string - allSuccessOrNotFound := true - - for _, res := range fileIdResults { - if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) { - allSuccessOrNotFound = false - if isRetryableError(res.Error) { - if firstRetryableError == "" { - firstRetryableError = res.Error - } - } else { - if firstPermanentError == "" { - firstPermanentError = res.Error - } - } + if outcome.status == "no_result" { + glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId) } - } - - // Determine overall outcome: permanent errors take precedence, then retryable errors - if firstPermanentError != "" { + f.DeletionRetryQueue.RequeueForRetry(item, outcome.errorMsg) + case "permanent": permanentErrorCount++ - glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, firstPermanentError) - } else if firstRetryableError != "" { - retryCount++ - f.DeletionRetryQueue.RequeueForRetry(item, firstRetryableError) - } else if allSuccessOrNotFound { - // Check if it's pure success or "not found" - isPureSuccess := true - for _, res := range fileIdResults { - if res.Error != "" { - isPureSuccess = false - break - } - } - if isPureSuccess { - successCount++ - glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount) - } else { - notFoundCount++ - } + f.DeletionRetryQueue.Remove(item) // Remove from queue (permanent failure) + glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, outcome.errorMsg) } }