Browse Source

address comment

pull/7402/head
chrislu 1 month ago
parent
commit
476db20bfe
  1. 101
      weed/filer/filer_deletion.go
  2. 26
      weed/filer/filer_deletion_test.go

101
weed/filer/filer_deletion.go

@ -164,13 +164,11 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
// Check if item already exists
if item, exists := q.itemIndex[fileId]; exists {
item.RetryCount++
// Item is already in the queue. Just update the error.
// The existing retry schedule should proceed.
// RetryCount is only incremented in RequeueForRetry when an actual retry is performed.
item.LastError = errorMsg
delay := calculateBackoff(item.RetryCount)
item.NextRetryAt = time.Now().Add(delay)
// Re-heapify since NextRetryAt changed
heap.Fix(&q.heap, item.heapIndex)
glog.V(2).Infof("updated retry for %s: attempt %d, next retry in %v", fileId, item.RetryCount, delay)
glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt))
return
}
@ -311,43 +309,86 @@ func (f *Filer) loopProcessingDeletion() {
// to the retry queue, and logs appropriate messages.
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
// Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch.
uniqueFileIds := make([]string, 0, len(toDeleteFileIds))
uniqueFileIdsSlice := make([]string, 0, len(toDeleteFileIds))
processed := make(map[string]struct{}, len(toDeleteFileIds))
for _, fileId := range toDeleteFileIds {
if _, found := processed[fileId]; !found {
processed[fileId] = struct{}{}
uniqueFileIds = append(uniqueFileIds, fileId)
uniqueFileIdsSlice = append(uniqueFileIdsSlice, fileId)
}
}
if len(uniqueFileIds) == 0 {
if len(uniqueFileIdsSlice) == 0 {
return
}
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIds, lookupFunc)
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc)
// Process individual results for better error tracking
// Group results by file ID to handle multiple results for replicated volumes
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult)
for _, result := range results {
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result)
}
// Process results
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
var errorDetails []string
for _, result := range results {
if result.Error == "" {
successCount++
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - acceptable
notFoundCount++
} else if isRetryableError(result.Error) {
// Retryable error - add to retry queue
for _, fileId := range uniqueFileIdsSlice {
fileIdResults, found := resultsByFileId[fileId]
if !found {
// This can happen if lookup fails for the volume.
// To be safe, we treat it as a retryable error.
retryableErrorCount++
f.DeletionRetryQueue.AddOrUpdate(result.FileId, result.Error)
f.DeletionRetryQueue.AddOrUpdate(fileId, "no deletion result from volume server")
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)")
errorDetails = append(errorDetails, fileId+": no deletion result (will retry)")
}
} else {
// Permanent error - log but don't retry
continue
}
var firstRetryableError, firstPermanentError string
allSuccessOrNotFound := true
for _, res := range fileIdResults {
if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) {
allSuccessOrNotFound = false
if isRetryableError(res.Error) {
if firstRetryableError == "" {
firstRetryableError = res.Error
}
} else {
if firstPermanentError == "" {
firstPermanentError = res.Error
}
}
}
}
// Determine overall outcome: permanent errors take precedence, then retryable errors
if firstPermanentError != "" {
permanentErrorCount++
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (permanent)")
errorDetails = append(errorDetails, fileId+": "+firstPermanentError+" (permanent)")
}
} else if firstRetryableError != "" {
retryableErrorCount++
f.DeletionRetryQueue.AddOrUpdate(fileId, firstRetryableError)
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, fileId+": "+firstRetryableError+" (will retry)")
}
} else if allSuccessOrNotFound {
isPureSuccess := true
for _, res := range fileIdResults {
if res.Error != "" {
isPureSuccess = false
break
}
}
if isPureSuccess {
successCount++
} else {
notFoundCount++
}
}
}
@ -359,11 +400,15 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
totalErrors := retryableErrorCount + permanentErrorCount
if totalErrors > 0 {
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
totalErrors, len(uniqueFileIds), retryableErrorCount, permanentErrorCount)
if totalErrors > MaxLoggedErrorDetails {
logMessage += fmt.Sprintf(" (showing first %d)", MaxLoggedErrorDetails)
totalErrors, len(uniqueFileIdsSlice), retryableErrorCount, permanentErrorCount)
if len(errorDetails) > 0 {
if totalErrors > MaxLoggedErrorDetails {
logMessage += fmt.Sprintf(" (showing first %d)", len(errorDetails))
}
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
} else {
glog.V(0).Info(logMessage)
}
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
if f.DeletionRetryQueue.Size() > 0 {

26
weed/filer/filer_deletion_test.go

@ -274,7 +274,17 @@ func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) {
t.Fatalf("Expected queue size 1 after first add, got %d", queue.Size())
}
// Add same file ID again - should increment retry count (simulates duplicate)
// Get initial retry count
queue.lock.Lock()
item1, exists := queue.itemIndex["file1"]
if !exists {
queue.lock.Unlock()
t.Fatal("Item not found in queue after first add")
}
initialRetryCount := item1.RetryCount
queue.lock.Unlock()
// Add same file ID again - should NOT increment retry count (just update error)
queue.AddOrUpdate("file1", "timeout error again")
// Verify still only one item exists in queue (not duplicated)
@ -282,16 +292,18 @@ func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) {
t.Errorf("Expected queue size 1 after duplicate add, got %d (duplicates detected)", queue.Size())
}
// Verify retry count incremented to 2 by checking internal state
// Note: This documents the current behavior - AddOrUpdate increments retry count on duplicate
// Verify retry count did NOT increment (AddOrUpdate only updates error, not count)
queue.lock.Lock()
item, exists := queue.itemIndex["file1"]
item2, exists := queue.itemIndex["file1"]
queue.lock.Unlock()
if !exists {
t.Fatal("Item not found in queue")
t.Fatal("Item not found in queue after second add")
}
if item2.RetryCount != initialRetryCount {
t.Errorf("Expected RetryCount to stay at %d after duplicate add (should not increment), got %d", initialRetryCount, item2.RetryCount)
}
if item.RetryCount != 2 {
t.Errorf("Expected RetryCount 2 after duplicate add (each AddOrUpdate increments), got %d", item.RetryCount)
if item2.LastError != "timeout error again" {
t.Errorf("Expected LastError to be updated to 'timeout error again', got %q", item2.LastError)
}
}
Loading…
Cancel
Save