@ -35,14 +35,14 @@ type DeletionRetryItem struct {
// DeletionRetryQueue manages the queue of failed deletions that need to be retried
// DeletionRetryQueue manages the queue of failed deletions that need to be retried
type DeletionRetryQueue struct {
type DeletionRetryQueue struct {
items [ ] * DeletionRetryItem
items map [ string ] * DeletionRetryItem
itemsLock sync . RWMutex
itemsLock sync . RWMutex
}
}
// NewDeletionRetryQueue creates a new retry queue
// NewDeletionRetryQueue creates a new retry queue
func NewDeletionRetryQueue ( ) * DeletionRetryQueue {
func NewDeletionRetryQueue ( ) * DeletionRetryQueue {
return & DeletionRetryQueue {
return & DeletionRetryQueue {
items : make ( [ ] * DeletionRetryItem , 0 ) ,
items : make ( map [ string ] * DeletionRetryItem ) ,
}
}
}
}
@ -52,8 +52,7 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
defer q . itemsLock . Unlock ( )
defer q . itemsLock . Unlock ( )
// Check if item already exists
// Check if item already exists
for _ , item := range q . items {
if item . FileId == fileId {
if item , exists := q . items [ fileId ] ; exists {
item . RetryCount ++
item . RetryCount ++
item . LastError = errorMsg
item . LastError = errorMsg
// Calculate next retry time with exponential backoff
// Calculate next retry time with exponential backoff
@ -65,17 +64,15 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
glog . V ( 2 ) . Infof ( "updated retry for %s: attempt %d, next retry in %v" , fileId , item . RetryCount , delay )
glog . V ( 2 ) . Infof ( "updated retry for %s: attempt %d, next retry in %v" , fileId , item . RetryCount , delay )
return
return
}
}
}
// Add new item
// Add new item
delay := InitialRetryDelay
delay := InitialRetryDelay
newItem : = & DeletionRetryItem {
q . items [ fileId ] = & DeletionRetryItem {
FileId : fileId ,
FileId : fileId ,
RetryCount : 1 ,
RetryCount : 1 ,
NextRetryAt : time . Now ( ) . Add ( delay ) ,
NextRetryAt : time . Now ( ) . Add ( delay ) ,
LastError : errorMsg ,
LastError : errorMsg ,
}
}
q . items = append ( q . items , newItem )
glog . V ( 2 ) . Infof ( "added retry for %s: next retry in %v" , fileId , delay )
glog . V ( 2 ) . Infof ( "added retry for %s: next retry in %v" , fileId , delay )
}
}
@ -86,23 +83,20 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
now := time . Now ( )
now := time . Now ( )
var readyItems [ ] * DeletionRetryItem
var readyItems [ ] * DeletionRetryItem
var remainingItems [ ] * DeletionRetryItem
for _ , item := range q . items {
for fileId , item := range q . items {
if len ( readyItems ) < maxItems && item . NextRetryAt . Before ( now ) {
if len ( readyItems ) < maxItems && item . NextRetryAt . Before ( now ) {
if item . RetryCount < MaxRetryAttempts {
if item . RetryCount < MaxRetryAttempts {
readyItems = append ( readyItems , item )
readyItems = append ( readyItems , item )
delete ( q . items , fileId )
} else {
} else {
// Max attempts reached, log and discard
// Max attempts reached, log and discard
glog . Warningf ( "max retry attempts (%d) reached for %s, last error: %s" , MaxRetryAttempts , item . FileId , item . LastError )
glog . Warningf ( "max retry attempts (%d) reached for %s, last error: %s" , MaxRetryAttempts , item . FileId , item . LastError )
delete ( q . items , fileId )
}
}
} else {
// Keep items that are not ready yet or if the batch is full
remainingItems = append ( remainingItems , item )
}
}
}
}
q . items = remainingItems
return readyItems
return readyItems
}
}
@ -147,9 +141,15 @@ func (f *Filer) loopProcessingDeletion() {
// Start retry processor in a separate goroutine
// Start retry processor in a separate goroutine
go f . loopProcessingDeletionRetry ( lookupFunc , retryQueue )
go f . loopProcessingDeletionRetry ( lookupFunc , retryQueue )
var deletionCount int
ticker := time . NewTicker ( 1123 * time . Millisecond )
defer ticker . Stop ( )
for {
for {
deletionCount = 0
select {
case <- f . deletionQuit :
glog . V ( 0 ) . Infof ( "deletion processor shutting down" )
return
case <- ticker . C :
f . fileIdDeletionQueue . Consume ( func ( fileIds [ ] string ) {
f . fileIdDeletionQueue . Consume ( func ( fileIds [ ] string ) {
for len ( fileIds ) > 0 {
for len ( fileIds ) > 0 {
var toDeleteFileIds [ ] string
var toDeleteFileIds [ ] string
@ -160,7 +160,6 @@ func (f *Filer) loopProcessingDeletion() {
toDeleteFileIds = fileIds
toDeleteFileIds = fileIds
fileIds = fileIds [ : 0 ]
fileIds = fileIds [ : 0 ]
}
}
deletionCount = len ( toDeleteFileIds )
results := operation . DeleteFileIdsWithLookupVolumeId ( f . GrpcDialOption , toDeleteFileIds , lookupFunc )
results := operation . DeleteFileIdsWithLookupVolumeId ( f . GrpcDialOption , toDeleteFileIds , lookupFunc )
// Process individual results for better error tracking
// Process individual results for better error tracking
@ -177,13 +176,13 @@ func (f *Filer) loopProcessingDeletion() {
// Retryable error - add to retry queue
// Retryable error - add to retry queue
retryableErrorCount ++
retryableErrorCount ++
retryQueue . AddOrUpdate ( result . FileId , result . Error )
retryQueue . AddOrUpdate ( result . FileId , result . Error )
if retryableErrorCount <= 10 {
if len ( errorDetails ) < 10 {
errorDetails = append ( errorDetails , result . FileId + ": " + result . Error + " (will retry)" )
errorDetails = append ( errorDetails , result . FileId + ": " + result . Error + " (will retry)" )
}
}
} else {
} else {
// Permanent error - log but don't retry
// Permanent error - log but don't retry
permanentErrorCount ++
permanentErrorCount ++
if permanentErrorCount <= 10 {
if len ( errorDetails ) < 10 {
errorDetails = append ( errorDetails , result . FileId + ": " + result . Error + " (permanent)" )
errorDetails = append ( errorDetails , result . FileId + ": " + result . Error + " (permanent)" )
}
}
}
}
@ -208,9 +207,6 @@ func (f *Filer) loopProcessingDeletion() {
}
}
}
}
} )
} )
if deletionCount == 0 {
time . Sleep ( 1123 * time . Millisecond )
}
}
}
}
}
}
@ -240,7 +236,12 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin
ticker := time . NewTicker ( 1 * time . Minute )
ticker := time . NewTicker ( 1 * time . Minute )
defer ticker . Stop ( )
defer ticker . Stop ( )
for range ticker . C {
for {
select {
case <- f . deletionQuit :
glog . V ( 0 ) . Infof ( "retry processor shutting down, %d items remaining in queue" , retryQueue . Size ( ) )
return
case <- ticker . C :
// Get items that are ready to retry
// Get items that are ready to retry
readyItems := retryQueue . GetReadyItems ( 1000 )
readyItems := retryQueue . GetReadyItems ( 1000 )
@ -289,6 +290,7 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin
glog . V ( 1 ) . Infof ( "retry: %d files still failing, will retry again later" , retryCount )
glog . V ( 1 ) . Infof ( "retry: %d files still failing, will retry again later" , retryCount )
}
}
}
}
}
}
}
func ( f * Filer ) DeleteUncommittedChunks ( ctx context . Context , chunks [ ] * filer_pb . FileChunk ) {
func ( f * Filer ) DeleteUncommittedChunks ( ctx context . Context , chunks [ ] * filer_pb . FileChunk ) {