Browse Source

Filer: Add retry mechanism for failed file deletions

Implement a retry queue with exponential backoff for handling transient
deletion failures, particularly when volumes are temporarily read-only.

Key features:
- Automatic retry for retryable errors (read-only volumes, network issues)
- Exponential backoff: 5min → 10min → 20min → ... (max 6 hours)
- Maximum 10 retry attempts per file before giving up
- Separate goroutine processing retry queue every minute
- Enhanced logging with retry/permanent error classification

This addresses the issue where file deletions fail when volumes are
temporarily read-only (tiered volumes, maintenance, etc.) and these
deletions were previously lost.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
pull/7402/head
Dimonyga 1 month ago
parent
commit
c2b7c9a89b
  1. 210
      weed/filer/filer_deletion.go

210
weed/filer/filer_deletion.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage"
@ -15,6 +16,103 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
// Maximum number of retry attempts for failed deletions
MaxRetryAttempts = 10
// Initial retry delay (will be doubled with each attempt)
InitialRetryDelay = 5 * time.Minute
// Maximum retry delay
MaxRetryDelay = 6 * time.Hour
)
// DeletionRetryItem represents a file deletion that failed and needs to be retried
type DeletionRetryItem struct {
FileId string
RetryCount int
NextRetryAt time.Time
LastError string
}
// DeletionRetryQueue manages the queue of failed deletions that need to be retried
type DeletionRetryQueue struct {
items []*DeletionRetryItem
itemsLock sync.RWMutex
}
// NewDeletionRetryQueue creates a new retry queue
func NewDeletionRetryQueue() *DeletionRetryQueue {
return &DeletionRetryQueue{
items: make([]*DeletionRetryItem, 0),
}
}
// AddOrUpdate adds a new failed deletion or updates an existing one
func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
q.itemsLock.Lock()
defer q.itemsLock.Unlock()
// Check if item already exists
for _, item := range q.items {
if item.FileId == fileId {
item.RetryCount++
item.LastError = errorMsg
// Calculate next retry time with exponential backoff
delay := InitialRetryDelay * time.Duration(1<<uint(item.RetryCount-1))
if delay > MaxRetryDelay {
delay = MaxRetryDelay
}
item.NextRetryAt = time.Now().Add(delay)
glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay)
return
}
}
// Add new item
delay := InitialRetryDelay
newItem := &DeletionRetryItem{
FileId: fileId,
RetryCount: 1,
NextRetryAt: time.Now().Add(delay),
LastError: errorMsg,
}
q.items = append(q.items, newItem)
glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay)
}
// GetReadyItems returns items that are ready to be retried and removes them from the queue
func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
q.itemsLock.Lock()
defer q.itemsLock.Unlock()
now := time.Now()
var readyItems []*DeletionRetryItem
var remainingItems []*DeletionRetryItem
for _, item := range q.items {
if len(readyItems) < maxItems && item.NextRetryAt.Before(now) {
if item.RetryCount < MaxRetryAttempts {
readyItems = append(readyItems, item)
} else {
// Max attempts reached, log and discard
glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError)
}
} else if !item.NextRetryAt.Before(now) {
// Keep items that are not ready yet
remainingItems = append(remainingItems, item)
}
}
q.items = remainingItems
return readyItems
}
// Size returns the current size of the retry queue
func (q *DeletionRetryQueue) Size() int {
q.itemsLock.RLock()
defer q.itemsLock.RUnlock()
return len(q.items)
}
func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) {
return func(vids []string) (map[string]*operation.LookupResult, error) {
m := make(map[string]*operation.LookupResult)
@ -43,6 +141,12 @@ func (f *Filer) loopProcessingDeletion() {
DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
// Create retry queue
retryQueue := NewDeletionRetryQueue()
// Start retry processor in a separate goroutine
go f.loopProcessingDeletionRetry(lookupFunc, retryQueue)
var deletionCount int
for {
deletionCount = 0
@ -60,7 +164,7 @@ func (f *Filer) loopProcessingDeletion() {
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
// Process individual results for better error tracking
var successCount, notFoundCount, errorCount int
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
var errorDetails []string
for _, result := range results {
@ -69,12 +173,18 @@ func (f *Filer) loopProcessingDeletion() {
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - acceptable
notFoundCount++
} else if isRetryableError(result.Error) {
// Retryable error - add to retry queue
retryableErrorCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
if retryableErrorCount <= 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)")
}
} else {
// Actual error
errorCount++
if errorCount <= 10 {
// Only log first 10 errors to avoid flooding logs
errorDetails = append(errorDetails, result.FileId+": "+result.Error)
// Permanent error - log but don't retry
permanentErrorCount++
if permanentErrorCount <= 10 {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)")
}
}
}
@ -83,13 +193,19 @@ func (f *Filer) loopProcessingDeletion() {
glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
}
if errorCount > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds))
if errorCount > 10 {
totalErrors := retryableErrorCount + permanentErrorCount
if totalErrors > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount)
if totalErrors > 10 {
logMessage += " (showing first 10)"
}
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
if retryQueue.Size() > 0 {
glog.V(2).Infof("retry queue size: %d", retryQueue.Size())
}
}
})
@ -99,6 +215,82 @@ func (f *Filer) loopProcessingDeletion() {
}
}
// isRetryableError determines if an error is retryable
func isRetryableError(errorMsg string) bool {
// Errors that indicate temporary conditions
retryablePatterns := []string{
"is read only",
"error reading from server",
"connection reset by peer",
"closed network connection",
}
errorLower := strings.ToLower(errorMsg)
for _, pattern := range retryablePatterns {
if strings.Contains(errorLower, pattern) {
return true
}
}
return false
}
// loopProcessingDeletionRetry processes the retry queue for failed deletions
func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Get items that are ready to retry
readyItems := retryQueue.GetReadyItems(1000)
if len(readyItems) == 0 {
continue
}
glog.V(1).Infof("retrying deletion of %d files", len(readyItems))
// Extract file IDs from retry items
var fileIds []string
itemsByFileId := make(map[string]*DeletionRetryItem)
for _, item := range readyItems {
fileIds = append(fileIds, item.FileId)
itemsByFileId[item.FileId] = item
}
// Attempt deletion
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
// Process results
var successCount, notFoundCount, retryCount int
for _, result := range results {
item := itemsByFileId[result.FileId]
if result.Error == "" {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount)
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - success
notFoundCount++
} else if isRetryableError(result.Error) {
// Still failing, add back to retry queue
retryCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
} else {
// Permanent error on retry - give up
glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error)
}
}
if successCount > 0 || notFoundCount > 0 {
glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount)
}
if retryCount > 0 {
glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount)
}
}
}
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(ctx, chunks)
}

Loading…
Cancel
Save