diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index b3a4296ba..e50321817 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/storage" @@ -15,6 +16,103 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +const ( + // Maximum number of retry attempts for failed deletions + MaxRetryAttempts = 10 + // Initial retry delay (will be doubled with each attempt) + InitialRetryDelay = 5 * time.Minute + // Maximum retry delay + MaxRetryDelay = 6 * time.Hour +) + +// DeletionRetryItem represents a file deletion that failed and needs to be retried +type DeletionRetryItem struct { + FileId string + RetryCount int + NextRetryAt time.Time + LastError string +} + +// DeletionRetryQueue manages the queue of failed deletions that need to be retried +type DeletionRetryQueue struct { + items []*DeletionRetryItem + itemsLock sync.RWMutex +} + +// NewDeletionRetryQueue creates a new retry queue +func NewDeletionRetryQueue() *DeletionRetryQueue { + return &DeletionRetryQueue{ + items: make([]*DeletionRetryItem, 0), + } +} + +// AddOrUpdate adds a new failed deletion or updates an existing one +func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { + q.itemsLock.Lock() + defer q.itemsLock.Unlock() + + // Check if item already exists + for _, item := range q.items { + if item.FileId == fileId { + item.RetryCount++ + item.LastError = errorMsg + // Calculate next retry time with exponential backoff + delay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + delay = MaxRetryDelay + } + item.NextRetryAt = time.Now().Add(delay) + glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay) + return + } + } + + // Add new item + delay := InitialRetryDelay + newItem := &DeletionRetryItem{ + FileId: fileId, + RetryCount: 1, + NextRetryAt: time.Now().Add(delay), + LastError: errorMsg, + } + q.items = append(q.items, newItem) + glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay) +} + +// GetReadyItems returns items that are ready to be retried and removes them from the queue +func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { + q.itemsLock.Lock() + defer q.itemsLock.Unlock() + + now := time.Now() + var readyItems []*DeletionRetryItem + var remainingItems []*DeletionRetryItem + + for _, item := range q.items { + if len(readyItems) < maxItems && item.NextRetryAt.Before(now) { + if item.RetryCount < MaxRetryAttempts { + readyItems = append(readyItems, item) + } else { + // Max attempts reached, log and discard + glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) + } + } else if !item.NextRetryAt.Before(now) { + // Keep items that are not ready yet + remainingItems = append(remainingItems, item) + } + } + + q.items = remainingItems + return readyItems +} + +// Size returns the current size of the retry queue +func (q *DeletionRetryQueue) Size() int { + q.itemsLock.RLock() + defer q.itemsLock.RUnlock() + return len(q.items) +} + func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) { return func(vids []string) (map[string]*operation.LookupResult, error) { m := make(map[string]*operation.LookupResult) @@ -43,6 +141,12 @@ func (f *Filer) loopProcessingDeletion() { DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. + // Create retry queue + retryQueue := NewDeletionRetryQueue() + + // Start retry processor in a separate goroutine + go f.loopProcessingDeletionRetry(lookupFunc, retryQueue) + var deletionCount int for { deletionCount = 0 @@ -60,7 +164,7 @@ func (f *Filer) loopProcessingDeletion() { results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) // Process individual results for better error tracking - var successCount, notFoundCount, errorCount int + var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int var errorDetails []string for _, result := range results { @@ -69,12 +173,18 @@ func (f *Filer) loopProcessingDeletion() { } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { // Already deleted - acceptable notFoundCount++ + } else if isRetryableError(result.Error) { + // Retryable error - add to retry queue + retryableErrorCount++ + retryQueue.AddOrUpdate(result.FileId, result.Error) + if retryableErrorCount <= 10 { + errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)") + } } else { - // Actual error - errorCount++ - if errorCount <= 10 { - // Only log first 10 errors to avoid flooding logs - errorDetails = append(errorDetails, result.FileId+": "+result.Error) + // Permanent error - log but don't retry + permanentErrorCount++ + if permanentErrorCount <= 10 { + errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)") } } } @@ -83,13 +193,19 @@ func (f *Filer) loopProcessingDeletion() { glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount) } - if errorCount > 0 { - logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds)) - if errorCount > 10 { + totalErrors := retryableErrorCount + permanentErrorCount + if totalErrors > 0 { + logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)", + totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount) + if totalErrors > 10 { logMessage += " (showing first 10)" } glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) } + + if retryQueue.Size() > 0 { + glog.V(2).Infof("retry queue size: %d", retryQueue.Size()) + } } }) @@ -99,6 +215,82 @@ func (f *Filer) loopProcessingDeletion() { } } +// isRetryableError determines if an error is retryable +func isRetryableError(errorMsg string) bool { + // Errors that indicate temporary conditions + retryablePatterns := []string{ + "is read only", + "error reading from server", + "connection reset by peer", + "closed network connection", + } + + errorLower := strings.ToLower(errorMsg) + for _, pattern := range retryablePatterns { + if strings.Contains(errorLower, pattern) { + return true + } + } + return false +} + +// loopProcessingDeletionRetry processes the retry queue for failed deletions +func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) { + + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + // Get items that are ready to retry + readyItems := retryQueue.GetReadyItems(1000) + + if len(readyItems) == 0 { + continue + } + + glog.V(1).Infof("retrying deletion of %d files", len(readyItems)) + + // Extract file IDs from retry items + var fileIds []string + itemsByFileId := make(map[string]*DeletionRetryItem) + for _, item := range readyItems { + fileIds = append(fileIds, item.FileId) + itemsByFileId[item.FileId] = item + } + + // Attempt deletion + results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) + + // Process results + var successCount, notFoundCount, retryCount int + for _, result := range results { + item := itemsByFileId[result.FileId] + + if result.Error == "" { + successCount++ + glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount) + } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { + // Already deleted - success + notFoundCount++ + } else if isRetryableError(result.Error) { + // Still failing, add back to retry queue + retryCount++ + retryQueue.AddOrUpdate(result.FileId, result.Error) + } else { + // Permanent error on retry - give up + glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error) + } + } + + if successCount > 0 || notFoundCount > 0 { + glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount) + } + if retryCount > 0 { + glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount) + } + } +} + func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { f.doDeleteChunks(ctx, chunks) }