diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 71185d3d1..2ae940a03 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -54,6 +54,7 @@ type Filer struct { RemoteStorage *FilerRemoteStorage Dlm *lock_manager.DistributedLockManager MaxFilenameLength uint32 + deletionQuit chan struct{} } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -66,6 +67,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH UniqueFilerId: util.RandomInt32(), Dlm: lock_manager.NewDistributedLockManager(filerHost), MaxFilenameLength: maxFilenameLength, + deletionQuit: make(chan struct{}), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId @@ -379,6 +381,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta } func (f *Filer) Shutdown() { + close(f.deletionQuit) f.LocalMetaLogBuffer.ShutdownLogBuffer() f.Store.Shutdown() } diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 8804b731f..226c85bf4 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -35,14 +35,14 @@ type DeletionRetryItem struct { // DeletionRetryQueue manages the queue of failed deletions that need to be retried type DeletionRetryQueue struct { - items []*DeletionRetryItem + items map[string]*DeletionRetryItem itemsLock sync.RWMutex } // NewDeletionRetryQueue creates a new retry queue func NewDeletionRetryQueue() *DeletionRetryQueue { return &DeletionRetryQueue{ - items: make([]*DeletionRetryItem, 0), + items: make(map[string]*DeletionRetryItem), } } @@ -52,30 +52,27 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { defer q.itemsLock.Unlock() // Check if item already exists - for _, item := range q.items { - if item.FileId == fileId { - item.RetryCount++ - item.LastError = errorMsg - // Calculate next retry time with exponential backoff - delay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { - delay = MaxRetryDelay - } - item.NextRetryAt = time.Now().Add(delay) - glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay) - return + if item, exists := q.items[fileId]; exists { + item.RetryCount++ + item.LastError = errorMsg + // Calculate next retry time with exponential backoff + delay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + delay = MaxRetryDelay } + item.NextRetryAt = time.Now().Add(delay) + glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay) + return } // Add new item delay := InitialRetryDelay - newItem := &DeletionRetryItem{ + q.items[fileId] = &DeletionRetryItem{ FileId: fileId, RetryCount: 1, NextRetryAt: time.Now().Add(delay), LastError: errorMsg, } - q.items = append(q.items, newItem) glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay) } @@ -86,23 +83,20 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { now := time.Now() var readyItems []*DeletionRetryItem - var remainingItems []*DeletionRetryItem - for _, item := range q.items { + for fileId, item := range q.items { if len(readyItems) < maxItems && item.NextRetryAt.Before(now) { if item.RetryCount < MaxRetryAttempts { readyItems = append(readyItems, item) + delete(q.items, fileId) } else { // Max attempts reached, log and discard glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) + delete(q.items, fileId) } - } else { - // Keep items that are not ready yet or if the batch is full - remainingItems = append(remainingItems, item) } } - q.items = remainingItems return readyItems } @@ -147,70 +141,72 @@ func (f *Filer) loopProcessingDeletion() { // Start retry processor in a separate goroutine go f.loopProcessingDeletionRetry(lookupFunc, retryQueue) - var deletionCount int + ticker := time.NewTicker(1123 * time.Millisecond) + defer ticker.Stop() + for { - deletionCount = 0 - f.fileIdDeletionQueue.Consume(func(fileIds []string) { - for len(fileIds) > 0 { - var toDeleteFileIds []string - if len(fileIds) > DeletionBatchSize { - toDeleteFileIds = fileIds[:DeletionBatchSize] - fileIds = fileIds[DeletionBatchSize:] - } else { - toDeleteFileIds = fileIds - fileIds = fileIds[:0] - } - deletionCount = len(toDeleteFileIds) - results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) - - // Process individual results for better error tracking - var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int - var errorDetails []string - - for _, result := range results { - if result.Error == "" { - successCount++ - } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { - // Already deleted - acceptable - notFoundCount++ - } else if isRetryableError(result.Error) { - // Retryable error - add to retry queue - retryableErrorCount++ - retryQueue.AddOrUpdate(result.FileId, result.Error) - if retryableErrorCount <= 10 { - errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)") - } + select { + case <-f.deletionQuit: + glog.V(0).Infof("deletion processor shutting down") + return + case <-ticker.C: + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + for len(fileIds) > 0 { + var toDeleteFileIds []string + if len(fileIds) > DeletionBatchSize { + toDeleteFileIds = fileIds[:DeletionBatchSize] + fileIds = fileIds[DeletionBatchSize:] } else { - // Permanent error - log but don't retry - permanentErrorCount++ - if permanentErrorCount <= 10 { - errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)") + toDeleteFileIds = fileIds + fileIds = fileIds[:0] + } + results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + + // Process individual results for better error tracking + var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int + var errorDetails []string + + for _, result := range results { + if result.Error == "" { + successCount++ + } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { + // Already deleted - acceptable + notFoundCount++ + } else if isRetryableError(result.Error) { + // Retryable error - add to retry queue + retryableErrorCount++ + retryQueue.AddOrUpdate(result.FileId, result.Error) + if len(errorDetails) < 10 { + errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)") + } + } else { + // Permanent error - log but don't retry + permanentErrorCount++ + if len(errorDetails) < 10 { + errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)") + } } } - } - if successCount > 0 || notFoundCount > 0 { - glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount) - } + if successCount > 0 || notFoundCount > 0 { + glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount) + } - totalErrors := retryableErrorCount + permanentErrorCount - if totalErrors > 0 { - logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)", - totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount) - if totalErrors > 10 { - logMessage += " (showing first 10)" + totalErrors := retryableErrorCount + permanentErrorCount + if totalErrors > 0 { + logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)", + totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount) + if totalErrors > 10 { + logMessage += " (showing first 10)" + } + glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) } - glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; ")) - } - if retryQueue.Size() > 0 { - glog.V(2).Infof("retry queue size: %d", retryQueue.Size()) + if retryQueue.Size() > 0 { + glog.V(2).Infof("retry queue size: %d", retryQueue.Size()) + } } - } - }) - - if deletionCount == 0 { - time.Sleep(1123 * time.Millisecond) + }) } } } @@ -240,53 +236,59 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() - for range ticker.C { - // Get items that are ready to retry - readyItems := retryQueue.GetReadyItems(1000) + for { + select { + case <-f.deletionQuit: + glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", retryQueue.Size()) + return + case <-ticker.C: + // Get items that are ready to retry + readyItems := retryQueue.GetReadyItems(1000) - if len(readyItems) == 0 { - continue - } + if len(readyItems) == 0 { + continue + } - glog.V(1).Infof("retrying deletion of %d files", len(readyItems)) + glog.V(1).Infof("retrying deletion of %d files", len(readyItems)) - // Extract file IDs from retry items - var fileIds []string - itemsByFileId := make(map[string]*DeletionRetryItem) - for _, item := range readyItems { - fileIds = append(fileIds, item.FileId) - itemsByFileId[item.FileId] = item - } + // Extract file IDs from retry items + var fileIds []string + itemsByFileId := make(map[string]*DeletionRetryItem) + for _, item := range readyItems { + fileIds = append(fileIds, item.FileId) + itemsByFileId[item.FileId] = item + } - // Attempt deletion - results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - - // Process results - var successCount, notFoundCount, retryCount int - for _, result := range results { - item := itemsByFileId[result.FileId] - - if result.Error == "" { - successCount++ - glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount) - } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { - // Already deleted - success - notFoundCount++ - } else if isRetryableError(result.Error) { - // Still failing, add back to retry queue - retryCount++ - retryQueue.AddOrUpdate(result.FileId, result.Error) - } else { - // Permanent error on retry - give up - glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error) + // Attempt deletion + results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) + + // Process results + var successCount, notFoundCount, retryCount int + for _, result := range results { + item := itemsByFileId[result.FileId] + + if result.Error == "" { + successCount++ + glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount) + } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) { + // Already deleted - success + notFoundCount++ + } else if isRetryableError(result.Error) { + // Still failing, add back to retry queue + retryCount++ + retryQueue.AddOrUpdate(result.FileId, result.Error) + } else { + // Permanent error on retry - give up + glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error) + } } - } - if successCount > 0 || notFoundCount > 0 { - glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount) - } - if retryCount > 0 { - glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount) + if successCount > 0 || notFoundCount > 0 { + glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount) + } + if retryCount > 0 { + glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount) + } } } }