From 720510f6bb6aa8e8b477d714e24f9bb06c4bb1c6 Mon Sep 17 00:00:00 2001 From: Dimonyga Date: Wed, 29 Oct 2025 13:41:16 +0200 Subject: [PATCH] Filer: Optimize retry queue with min-heap data structure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace map-based retry queue with a min-heap for better scalability and deterministic ordering. Performance improvements: - GetReadyItems: O(N) → O(K log N) where K is items retrieved - AddOrUpdate: O(1) → O(log N) (acceptable trade-off) - Early exit when checking ready items (heap top is earliest) - No full iteration over all items while holding lock Benefits: - Deterministic processing order (earliest NextRetryAt first) - Better scalability for large retry queues (thousands of items) - Reduced lock contention duration - Memory efficient (no separate slice reconstruction) Implementation: - Min-heap ordered by NextRetryAt using container/heap - Dual index: heap for ordering + map for O(1) FileId lookups - heap.Fix() used when updating existing items - Comprehensive complexity documentation in comments This addresses the performance bottleneck identified in GetReadyItems where iterating over the entire map with a write lock could block other goroutines in high-failure scenarios. --- weed/filer/filer_deletion.go | 100 +++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 23 deletions(-) diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 83f84c54b..b1cc6f2be 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -1,6 +1,7 @@ package filer import ( + "container/heap" "context" "fmt" "strings" @@ -35,28 +36,67 @@ type DeletionRetryItem struct { RetryCount int NextRetryAt time.Time LastError string + heapIndex int // index in the heap (for heap.Interface) +} + +// retryHeap implements heap.Interface for DeletionRetryItem +// Items are ordered by NextRetryAt (earliest first) +type retryHeap []*DeletionRetryItem + +func (h retryHeap) Len() int { return len(h) } + +func (h retryHeap) Less(i, j int) bool { + return h[i].NextRetryAt.Before(h[j].NextRetryAt) +} + +func (h retryHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j +} + +func (h *retryHeap) Push(x interface{}) { + item := x.(*DeletionRetryItem) + item.heapIndex = len(*h) + *h = append(*h, item) +} + +func (h *retryHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.heapIndex = -1 // mark as removed + *h = old[0 : n-1] + return item } // DeletionRetryQueue manages the queue of failed deletions that need to be retried +// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items type DeletionRetryQueue struct { - items map[string]*DeletionRetryItem - itemsLock sync.RWMutex + heap retryHeap + itemIndex map[string]*DeletionRetryItem // for O(1) lookup by FileId + lock sync.Mutex } // NewDeletionRetryQueue creates a new retry queue func NewDeletionRetryQueue() *DeletionRetryQueue { - return &DeletionRetryQueue{ - items: make(map[string]*DeletionRetryItem), + q := &DeletionRetryQueue{ + heap: make(retryHeap, 0), + itemIndex: make(map[string]*DeletionRetryItem), } + heap.Init(&q.heap) + return q } // AddOrUpdate adds a new failed deletion or updates an existing one +// Time complexity: O(log N) for insertion/update func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { - q.itemsLock.Lock() - defer q.itemsLock.Unlock() + q.lock.Lock() + defer q.lock.Unlock() // Check if item already exists - if item, exists := q.items[fileId]; exists { + if item, exists := q.itemIndex[fileId]; exists { item.RetryCount++ item.LastError = errorMsg // Calculate next retry time with exponential backoff @@ -65,39 +105,53 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) { delay = MaxRetryDelay } item.NextRetryAt = time.Now().Add(delay) + // Re-heapify since NextRetryAt changed + heap.Fix(&q.heap, item.heapIndex) glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay) return } // Add new item delay := InitialRetryDelay - q.items[fileId] = &DeletionRetryItem{ + item := &DeletionRetryItem{ FileId: fileId, RetryCount: 1, NextRetryAt: time.Now().Add(delay), LastError: errorMsg, } + heap.Push(&q.heap, item) + q.itemIndex[fileId] = item glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay) } // GetReadyItems returns items that are ready to be retried and removes them from the queue +// Time complexity: O(K log N) where K is the number of ready items +// Items are processed in order of NextRetryAt (earliest first) func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { - q.itemsLock.Lock() - defer q.itemsLock.Unlock() + q.lock.Lock() + defer q.lock.Unlock() now := time.Now() var readyItems []*DeletionRetryItem - for fileId, item := range q.items { - if len(readyItems) < maxItems && item.NextRetryAt.Before(now) { - if item.RetryCount < MaxRetryAttempts { - readyItems = append(readyItems, item) - delete(q.items, fileId) - } else { - // Max attempts reached, log and discard - glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) - delete(q.items, fileId) - } + // Peek at items from the top of the heap (earliest NextRetryAt) + for len(q.heap) > 0 && len(readyItems) < maxItems { + item := q.heap[0] + + // If the earliest item is not ready yet, no other items are ready either + if item.NextRetryAt.After(now) { + break + } + + // Remove from heap and index + heap.Pop(&q.heap) + delete(q.itemIndex, item.FileId) + + if item.RetryCount < MaxRetryAttempts { + readyItems = append(readyItems, item) + } else { + // Max attempts reached, log and discard + glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError) } } @@ -106,9 +160,9 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem { // Size returns the current size of the retry queue func (q *DeletionRetryQueue) Size() int { - q.itemsLock.RLock() - defer q.itemsLock.RUnlock() - return len(q.items) + q.lock.Lock() + defer q.lock.Unlock() + return len(q.heap) } func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) {