diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 7d03efce5..5c64af83e 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -99,10 +99,18 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { if item, exists := q.itemIndex[fileId]; exists { item.RetryCount++ item.LastError = errorMsg - // Calculate next retry time with exponential backoff - delay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + // Calculate next retry time with exponential backoff with overflow protection + shiftAmount := uint(item.RetryCount - 1) + var delay time.Duration + if shiftAmount > 63 { + // Prevent overflow: use max delay directly delay = MaxRetryDelay + } else { + delay = InitialRetryDelay * time.Duration(1< MaxRetryDelay { + delay = MaxRetryDelay + } } item.NextRetryAt = time.Now().Add(delay) // Re-heapify since NextRetryAt changed @@ -124,6 +132,39 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay) } +// RequeueForRetry re-adds a previously failed item back to the queue with incremented retry count. +// This method MUST be used when re-queuing items from processRetryBatch to preserve retry state. +// Time complexity: O(log N) for insertion +func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg string) { + q.lock.Lock() + defer q.lock.Unlock() + + // Increment retry count + item.RetryCount++ + item.LastError = errorMsg + + // Calculate next retry time with exponential backoff + // Check for potential overflow in the shift operation + shiftAmount := uint(item.RetryCount - 1) + if shiftAmount > 63 { + // Prevent overflow: use max delay directly + item.NextRetryAt = time.Now().Add(MaxRetryDelay) + glog.V(2).Infof("requeued retry for %s: attempt %d (capped at max delay)", item.FileId, item.RetryCount) + } else { + delay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + delay = MaxRetryDelay + } + item.NextRetryAt = time.Now().Add(delay) + glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay) + } + + // Re-add to heap and index + heap.Push(&q.heap, item) + q.itemIndex[item.FileId] = item +} + // GetReadyItems returns items that are ready to be retried and removes them from the queue // Time complexity: O(K log N) where K is the number of ready items // Items are processed in order of NextRetryAt (earliest first) @@ -303,6 +344,18 @@ func isRetryableError(errorMsg string) bool { "error reading from server", // Network I/O errors "connection reset by peer", // Network connection issues "closed network connection", // Network connection closed unexpectedly + "connection refused", // Server temporarily unavailable + "timeout", // Operation timeout (network or server) + "deadline exceeded", // Context deadline exceeded + "context canceled", // Context cancellation (may be transient) + "lookup error", // Volume lookup failures + "lookup failed", // Volume server discovery issues + "too many requests", // Rate limiting / backpressure + "service unavailable", // HTTP 503 errors + "temporarily unavailable", // Temporary service issues + "try again", // Explicit retry suggestion + "i/o timeout", // Network I/O timeout + "broken pipe", // Connection broken during operation } errorLower := strings.ToLower(errorMsg) @@ -366,9 +419,9 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu // Already deleted - success notFoundCount++ } else if isRetryableError(result.Error) { - // Still failing, add back to retry queue + // Still failing, re-queue with preserved retry count retryCount++ - retryQueue.AddOrUpdate(result.FileId, result.Error) + retryQueue.RequeueForRetry(item, result.Error) } else { // Permanent error on retry - give up glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error)