From 69b16334c228f98ce53f97b72358cda798278c2a Mon Sep 17 00:00:00 2001 From: Dimonyga Date: Wed, 29 Oct 2025 14:06:29 +0200 Subject: [PATCH] Filer: Add persistence docs and comprehensive unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documentation improvements: 1. Document in-memory queue limitation - Acknowledge that retry queue is volatile (lost on restart) - Document trade-offs and future persistence options - Provide clear path for production hardening - Note eventual consistency through main deletion queue Unit test coverage: 1. TestDeletionRetryQueue_AddAndRetrieve - Basic add/retrieve operations - Verify items not ready before delay elapsed 2. TestDeletionRetryQueue_ExponentialBackoff - Verify exponential backoff progression (5m→10m→20m→40m→80m) - Validate delay calculations with timing tolerance 3. TestDeletionRetryQueue_OverflowProtection - Test high retry counts (60+) that could cause overflow - Verify capping at MaxRetryDelay 4. TestDeletionRetryQueue_MaxAttemptsReached - Verify items discarded after MaxRetryAttempts - Confirm proper queue cleanup 5. TestIsRetryableError - Comprehensive error pattern coverage - Test all retryable error types (timeout, connection, lookup, etc.) - Verify non-retryable errors correctly identified 6. TestDeletionRetryQueue_HeapOrdering - Verify min-heap property maintained - Test items processed in NextRetryAt order - Validate heap.Init() integration All tests passing. Addresses PR feedback on testing requirements. --- weed/filer/filer_deletion.go | 17 ++- weed/filer/filer_deletion_test.go | 216 ++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 weed/filer/filer_deletion_test.go diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 5c64af83e..31552da16 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -71,8 +71,21 @@ func (h *retryHeap) Pop() any { return item } -// DeletionRetryQueue manages the queue of failed deletions that need to be retried -// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items +// DeletionRetryQueue manages the queue of failed deletions that need to be retried. +// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items. +// +// LIMITATION: Current implementation stores retry queue in memory only. +// On filer restart, all pending retries are lost. With MaxRetryDelay up to 6 hours, +// process restarts during this window will cause retry state loss. +// +// TODO: Consider persisting retry queue to durable storage for production resilience: +// - Option 1: Leverage existing Filer store (KV operations) +// - Option 2: Periodic snapshots to disk with recovery on startup +// - Option 3: Write-ahead log for retry queue mutations +// - Trade-offs: Performance vs durability, complexity vs reliability +// +// For now, accepting in-memory storage as pragmatic initial implementation. +// Lost retries will be eventually consistent as files remain in deletion queue. type DeletionRetryQueue struct { heap retryHeap itemIndex map[string]*DeletionRetryItem // for O(1) lookup by FileId diff --git a/weed/filer/filer_deletion_test.go b/weed/filer/filer_deletion_test.go new file mode 100644 index 000000000..306232872 --- /dev/null +++ b/weed/filer/filer_deletion_test.go @@ -0,0 +1,216 @@ +package filer + +import ( + "container/heap" + "testing" + "time" +) + +func TestDeletionRetryQueue_AddAndRetrieve(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Add items + queue.AddOrUpdate("file1", "is read only") + queue.AddOrUpdate("file2", "connection reset") + + if queue.Size() != 2 { + t.Errorf("Expected queue size 2, got %d", queue.Size()) + } + + // Items not ready yet (initial delay is 5 minutes) + readyItems := queue.GetReadyItems(10) + if len(readyItems) != 0 { + t.Errorf("Expected 0 ready items, got %d", len(readyItems)) + } + + // Size should remain unchanged + if queue.Size() != 2 { + t.Errorf("Expected queue size 2 after checking ready items, got %d", queue.Size()) + } +} + +func TestDeletionRetryQueue_ExponentialBackoff(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Create an item + item := &DeletionRetryItem{ + FileId: "test-file", + RetryCount: 0, + NextRetryAt: time.Now(), + LastError: "test error", + } + + // Requeue multiple times to test backoff + delays := []time.Duration{} + + for i := 0; i < 5; i++ { + beforeTime := time.Now() + queue.RequeueForRetry(item, "error") + + // Calculate expected delay for this retry count + expectedDelay := InitialRetryDelay * time.Duration(1< MaxRetryDelay { + expectedDelay = MaxRetryDelay + } + + // Verify NextRetryAt is approximately correct + actualDelay := item.NextRetryAt.Sub(beforeTime) + delays = append(delays, actualDelay) + + // Allow small timing variance + timeDiff := actualDelay - expectedDelay + if timeDiff < 0 { + timeDiff = -timeDiff + } + if timeDiff > 100*time.Millisecond { + t.Errorf("Retry %d: expected delay ~%v, got %v (diff: %v)", i+1, expectedDelay, actualDelay, timeDiff) + } + + // Verify retry count incremented + if item.RetryCount != i+1 { + t.Errorf("Expected RetryCount %d, got %d", i+1, item.RetryCount) + } + + // Remove from queue for next iteration + queue.lock.Lock() + delete(queue.itemIndex, item.FileId) + queue.heap = retryHeap{} + queue.lock.Unlock() + } + + t.Logf("Exponential backoff delays: %v", delays) +} + +func TestDeletionRetryQueue_OverflowProtection(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Create an item with very high retry count + item := &DeletionRetryItem{ + FileId: "test-file", + RetryCount: 60, // High count that would cause overflow without protection + NextRetryAt: time.Now(), + LastError: "test error", + } + + // Should not panic and should cap at MaxRetryDelay + queue.RequeueForRetry(item, "error") + + delay := item.NextRetryAt.Sub(time.Now()) + if delay > MaxRetryDelay+time.Second { + t.Errorf("Delay exceeded MaxRetryDelay: %v > %v", delay, MaxRetryDelay) + } +} + +func TestDeletionRetryQueue_MaxAttemptsReached(t *testing.T) { + queue := NewDeletionRetryQueue() + + // Add item and set retry count near max + queue.AddOrUpdate("file1", "error") + + // Manually set high retry count + queue.lock.Lock() + if item, exists := queue.itemIndex["file1"]; exists { + item.RetryCount = MaxRetryAttempts + item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now + queue.lock.Unlock() + + // Try to get ready items - should be discarded + readyItems := queue.GetReadyItems(10) + if len(readyItems) != 0 { + t.Errorf("Expected 0 items (max attempts reached), got %d", len(readyItems)) + } + + // Should be removed from queue + if queue.Size() != 0 { + t.Errorf("Expected queue size 0 after max attempts, got %d", queue.Size()) + } + } else { + queue.lock.Unlock() + t.Fatal("Item not found in queue") + } +} + +func TestIsRetryableError(t *testing.T) { + testCases := []struct { + error string + retryable bool + description string + }{ + {"volume 123 is read only", true, "read-only volume"}, + {"connection reset by peer", true, "connection reset"}, + {"timeout exceeded", true, "timeout"}, + {"deadline exceeded", true, "deadline exceeded"}, + {"context canceled", true, "context canceled"}, + {"lookup error: volume not found", true, "lookup error"}, + {"connection refused", true, "connection refused"}, + {"too many requests", true, "rate limiting"}, + {"service unavailable", true, "service unavailable"}, + {"i/o timeout", true, "I/O timeout"}, + {"broken pipe", true, "broken pipe"}, + {"not found", false, "not found (not retryable)"}, + {"invalid file id", false, "invalid input (not retryable)"}, + {"", false, "empty error"}, + } + + for _, tc := range testCases { + result := isRetryableError(tc.error) + if result != tc.retryable { + t.Errorf("%s: expected retryable=%v, got %v for error: %q", + tc.description, tc.retryable, result, tc.error) + } + } +} + +func TestDeletionRetryQueue_HeapOrdering(t *testing.T) { + queue := NewDeletionRetryQueue() + + now := time.Now() + + // Add items with different retry times (out of order) + items := []*DeletionRetryItem{ + {FileId: "file3", RetryCount: 1, NextRetryAt: now.Add(30 * time.Second), LastError: "error3"}, + {FileId: "file1", RetryCount: 1, NextRetryAt: now.Add(10 * time.Second), LastError: "error1"}, + {FileId: "file2", RetryCount: 1, NextRetryAt: now.Add(20 * time.Second), LastError: "error2"}, + } + + // Add items directly (simulating internal state) + for _, item := range items { + queue.lock.Lock() + queue.itemIndex[item.FileId] = item + queue.heap = append(queue.heap, item) + queue.lock.Unlock() + } + + // Use container/heap.Init to establish heap property + queue.lock.Lock() + heap.Init(&queue.heap) + queue.lock.Unlock() + + // Verify heap maintains min-heap property (earliest time at top) + queue.lock.Lock() + if queue.heap[0].FileId != "file1" { + t.Errorf("Expected file1 at heap top (earliest time), got %s", queue.heap[0].FileId) + } + queue.lock.Unlock() + + // Set all items to ready + queue.lock.Lock() + for _, item := range queue.itemIndex { + item.NextRetryAt = now.Add(-1 * time.Second) + } + queue.lock.Unlock() + + // GetReadyItems should return in NextRetryAt order + readyItems := queue.GetReadyItems(10) + expectedOrder := []string{"file1", "file2", "file3"} + + if len(readyItems) != 3 { + t.Fatalf("Expected 3 ready items, got %d", len(readyItems)) + } + + for i, item := range readyItems { + if item.FileId != expectedOrder[i] { + t.Errorf("Item %d: expected %s, got %s", i, expectedOrder[i], item.FileId) + } + } +}