@ -1,6 +1,7 @@
package filer
package filer
import (
import (
"container/heap"
"context"
"context"
"fmt"
"fmt"
"strings"
"strings"
@ -35,28 +36,67 @@ type DeletionRetryItem struct {
RetryCount int
RetryCount int
NextRetryAt time . Time
NextRetryAt time . Time
LastError string
LastError string
heapIndex int // index in the heap (for heap.Interface)
}
// retryHeap implements heap.Interface for DeletionRetryItem
// Items are ordered by NextRetryAt (earliest first)
type retryHeap [ ] * DeletionRetryItem
func ( h retryHeap ) Len ( ) int { return len ( h ) }
func ( h retryHeap ) Less ( i , j int ) bool {
return h [ i ] . NextRetryAt . Before ( h [ j ] . NextRetryAt )
}
func ( h retryHeap ) Swap ( i , j int ) {
h [ i ] , h [ j ] = h [ j ] , h [ i ]
h [ i ] . heapIndex = i
h [ j ] . heapIndex = j
}
func ( h * retryHeap ) Push ( x interface { } ) {
item := x . ( * DeletionRetryItem )
item . heapIndex = len ( * h )
* h = append ( * h , item )
}
func ( h * retryHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
item := old [ n - 1 ]
old [ n - 1 ] = nil // avoid memory leak
item . heapIndex = - 1 // mark as removed
* h = old [ 0 : n - 1 ]
return item
}
}
// DeletionRetryQueue manages the queue of failed deletions that need to be retried
// DeletionRetryQueue manages the queue of failed deletions that need to be retried
// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items
type DeletionRetryQueue struct {
type DeletionRetryQueue struct {
items map [ string ] * DeletionRetryItem
itemsLock sync . RWMutex
heap retryHeap
itemIndex map [ string ] * DeletionRetryItem // for O(1) lookup by FileId
lock sync . Mutex
}
}
// NewDeletionRetryQueue creates a new retry queue
// NewDeletionRetryQueue creates a new retry queue
func NewDeletionRetryQueue ( ) * DeletionRetryQueue {
func NewDeletionRetryQueue ( ) * DeletionRetryQueue {
return & DeletionRetryQueue {
items : make ( map [ string ] * DeletionRetryItem ) ,
q := & DeletionRetryQueue {
heap : make ( retryHeap , 0 ) ,
itemIndex : make ( map [ string ] * DeletionRetryItem ) ,
}
}
heap . Init ( & q . heap )
return q
}
}
// AddOrUpdate adds a new failed deletion or updates an existing one
// AddOrUpdate adds a new failed deletion or updates an existing one
// Time complexity: O(log N) for insertion/update
func ( q * DeletionRetryQueue ) AddOrUpdate ( fileId string , errorMsg string ) {
func ( q * DeletionRetryQueue ) AddOrUpdate ( fileId string , errorMsg string ) {
q . itemsLock . Lock ( )
defer q . itemsLock . Unlock ( )
q . l ock. Lock ( )
defer q . l ock. Unlock ( )
// Check if item already exists
// Check if item already exists
if item , exists := q . items [ fileId ] ; exists {
if item , exists := q . itemIndex [ fileId ] ; exists {
item . RetryCount ++
item . RetryCount ++
item . LastError = errorMsg
item . LastError = errorMsg
// Calculate next retry time with exponential backoff
// Calculate next retry time with exponential backoff
@ -65,39 +105,53 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
delay = MaxRetryDelay
delay = MaxRetryDelay
}
}
item . NextRetryAt = time . Now ( ) . Add ( delay )
item . NextRetryAt = time . Now ( ) . Add ( delay )
// Re-heapify since NextRetryAt changed
heap . Fix ( & q . heap , item . heapIndex )
glog . V ( 2 ) . Infof ( "updated retry for %s: attempt %d, next retry in %v" , fileId , item . RetryCount , delay )
glog . V ( 2 ) . Infof ( "updated retry for %s: attempt %d, next retry in %v" , fileId , item . RetryCount , delay )
return
return
}
}
// Add new item
// Add new item
delay := InitialRetryDelay
delay := InitialRetryDelay
q . items [ fileId ] = & DeletionRetryItem {
item : = & DeletionRetryItem {
FileId : fileId ,
FileId : fileId ,
RetryCount : 1 ,
RetryCount : 1 ,
NextRetryAt : time . Now ( ) . Add ( delay ) ,
NextRetryAt : time . Now ( ) . Add ( delay ) ,
LastError : errorMsg ,
LastError : errorMsg ,
}
}
heap . Push ( & q . heap , item )
q . itemIndex [ fileId ] = item
glog . V ( 2 ) . Infof ( "added retry for %s: next retry in %v" , fileId , delay )
glog . V ( 2 ) . Infof ( "added retry for %s: next retry in %v" , fileId , delay )
}
}
// GetReadyItems returns items that are ready to be retried and removes them from the queue
// GetReadyItems returns items that are ready to be retried and removes them from the queue
// Time complexity: O(K log N) where K is the number of ready items
// Items are processed in order of NextRetryAt (earliest first)
func ( q * DeletionRetryQueue ) GetReadyItems ( maxItems int ) [ ] * DeletionRetryItem {
func ( q * DeletionRetryQueue ) GetReadyItems ( maxItems int ) [ ] * DeletionRetryItem {
q . itemsLock . Lock ( )
defer q . itemsL ock. Unlock ( )
q . l ock. Lock ( )
defer q . l ock. Unlock ( )
now := time . Now ( )
now := time . Now ( )
var readyItems [ ] * DeletionRetryItem
var readyItems [ ] * DeletionRetryItem
for fileId , item := range q . items {
if len ( readyItems ) < maxItems && item . NextRetryAt . Before ( now ) {
// Peek at items from the top of the heap (earliest NextRetryAt)
for len ( q . heap ) > 0 && len ( readyItems ) < maxItems {
item := q . heap [ 0 ]
// If the earliest item is not ready yet, no other items are ready either
if item . NextRetryAt . After ( now ) {
break
}
// Remove from heap and index
heap . Pop ( & q . heap )
delete ( q . itemIndex , item . FileId )
if item . RetryCount < MaxRetryAttempts {
if item . RetryCount < MaxRetryAttempts {
readyItems = append ( readyItems , item )
readyItems = append ( readyItems , item )
delete ( q . items , fileId )
} else {
} else {
// Max attempts reached, log and discard
// Max attempts reached, log and discard
glog . Warningf ( "max retry attempts (%d) reached for %s, last error: %s" , MaxRetryAttempts , item . FileId , item . LastError )
glog . Warningf ( "max retry attempts (%d) reached for %s, last error: %s" , MaxRetryAttempts , item . FileId , item . LastError )
delete ( q . items , fileId )
}
}
}
}
}
@ -106,9 +160,9 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
// Size returns the current size of the retry queue
// Size returns the current size of the retry queue
func ( q * DeletionRetryQueue ) Size ( ) int {
func ( q * DeletionRetryQueue ) Size ( ) int {
q . itemsL ock. R Lock( )
defer q . itemsL ock. R Unlock( )
return len ( q . items )
q . l ock. Lock ( )
defer q . l ock. Unlock ( )
return len ( q . heap )
}
}
func LookupByMasterClientFn ( masterClient * wdclient . MasterClient ) func ( vids [ ] string ) ( map [ string ] * operation . LookupResult , error ) {
func LookupByMasterClientFn ( masterClient * wdclient . MasterClient ) func ( vids [ ] string ) ( map [ string ] * operation . LookupResult , error ) {