Browse Source

dedup

pull/7402/head
chrislu 1 month ago
parent
commit
eb003ffd25
  1. 18
      weed/filer/filer_deletion.go
  2. 33
      weed/filer/filer_deletion_test.go

18
weed/filer/filer_deletion.go

@ -310,7 +310,21 @@ func (f *Filer) loopProcessingDeletion() {
// It classifies errors into retryable and permanent categories, adds retryable failures
// to the retry queue, and logs appropriate messages.
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
// Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch.
uniqueFileIds := make([]string, 0, len(toDeleteFileIds))
processed := make(map[string]struct{}, len(toDeleteFileIds))
for _, fileId := range toDeleteFileIds {
if _, found := processed[fileId]; !found {
processed[fileId] = struct{}{}
uniqueFileIds = append(uniqueFileIds, fileId)
}
}
if len(uniqueFileIds) == 0 {
return
}
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIds, lookupFunc)
// Process individual results for better error tracking
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
@ -345,7 +359,7 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
totalErrors := retryableErrorCount + permanentErrorCount
if totalErrors > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
totalErrors, len(toDeleteFileIds), retryableErrorCount, permanentErrorCount)
totalErrors, len(uniqueFileIds), retryableErrorCount, permanentErrorCount)
if totalErrors > MaxLoggedErrorDetails {
logMessage += fmt.Sprintf(" (showing first %d)", MaxLoggedErrorDetails)
}

33
weed/filer/filer_deletion_test.go

@ -262,3 +262,36 @@ func TestDeletionRetryQueue_HeapOrdering(t *testing.T) {
}
}
}
func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) {
queue := NewDeletionRetryQueue()
// Add same file ID twice with retryable error - simulates duplicate in batch
queue.AddOrUpdate("file1", "timeout error")
// Verify only one item exists in queue
if queue.Size() != 1 {
t.Fatalf("Expected queue size 1 after first add, got %d", queue.Size())
}
// Add same file ID again - should increment retry count (simulates duplicate)
queue.AddOrUpdate("file1", "timeout error again")
// Verify still only one item exists in queue (not duplicated)
if queue.Size() != 1 {
t.Errorf("Expected queue size 1 after duplicate add, got %d (duplicates detected)", queue.Size())
}
// Verify retry count incremented to 2 by checking internal state
// Note: This documents the current behavior - AddOrUpdate increments retry count on duplicate
queue.lock.Lock()
item, exists := queue.itemIndex["file1"]
queue.lock.Unlock()
if !exists {
t.Fatal("Item not found in queue")
}
if item.RetryCount != 2 {
t.Errorf("Expected RetryCount 2 after duplicate add (each AddOrUpdate increments), got %d", item.RetryCount)
}
}
Loading…
Cancel
Save