Browse Source

address comments

pull/7402/head
chrislu 1 month ago
parent
commit
00db26af68
  1. 2
      weed/filer/filer.go
  2. 91
      weed/filer/filer_deletion.go
  3. 53
      weed/filer/filer_deletion_test.go

2
weed/filer/filer.go

@ -55,6 +55,7 @@ type Filer struct {
Dlm *lock_manager.DistributedLockManager
MaxFilenameLength uint32
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@ -68,6 +69,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
Dlm: lock_manager.NewDistributedLockManager(filerHost),
MaxFilenameLength: maxFilenameLength,
deletionQuit: make(chan struct{}),
DeletionRetryQueue: NewDeletionRetryQueue(),
}
if f.UniqueFilerId < 0 {
f.UniqueFilerId = -f.UniqueFilerId

91
weed/filer/filer_deletion.go

@ -231,7 +231,7 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
heap.Pop(&q.heap)
delete(q.itemIndex, item.FileId)
if item.RetryCount < MaxRetryAttempts {
if item.RetryCount <= MaxRetryAttempts {
readyItems = append(readyItems, item)
} else {
// Max attempts reached, log and discard
@ -277,11 +277,8 @@ func (f *Filer) loopProcessingDeletion() {
DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
// Create retry queue
retryQueue := NewDeletionRetryQueue()
// Start retry processor in a separate goroutine
go f.loopProcessingDeletionRetry(lookupFunc, retryQueue)
go f.loopProcessingDeletionRetry(lookupFunc)
ticker := time.NewTicker(DeletionPollInterval)
defer ticker.Stop()
@ -302,7 +299,7 @@ func (f *Filer) loopProcessingDeletion() {
toDeleteFileIds = fileIds
fileIds = fileIds[:0]
}
f.processDeletionBatch(toDeleteFileIds, lookupFunc, retryQueue)
f.processDeletionBatch(toDeleteFileIds, lookupFunc)
}
})
}
@ -312,7 +309,7 @@ func (f *Filer) loopProcessingDeletion() {
// processDeletionBatch handles deletion of a batch of file IDs and processes results.
// It classifies errors into retryable and permanent categories, adds retryable failures
// to the retry queue, and logs appropriate messages.
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
// Process individual results for better error tracking
@ -328,7 +325,7 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
} else if isRetryableError(result.Error) {
// Retryable error - add to retry queue
retryableErrorCount++
retryQueue.AddOrUpdate(result.FileId, result.Error)
f.DeletionRetryQueue.AddOrUpdate(result.FileId, result.Error)
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, result.FileId+": "+result.Error+" (will retry)")
}
@ -355,8 +352,8 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
if retryQueue.Size() > 0 {
glog.V(2).Infof("retry queue size: %d", retryQueue.Size())
if f.DeletionRetryQueue.Size() > 0 {
glog.V(2).Infof("retry queue size: %d", f.DeletionRetryQueue.Size())
}
}
@ -390,7 +387,7 @@ func isRetryableError(errorMsg string) bool {
}
// loopProcessingDeletionRetry processes the retry queue for failed deletions
func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
ticker := time.NewTicker(DeletionRetryPollInterval)
defer ticker.Stop()
@ -398,18 +395,18 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin
for {
select {
case <-f.deletionQuit:
glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", retryQueue.Size())
glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", f.DeletionRetryQueue.Size())
return
case <-ticker.C:
// Get items that are ready to retry
readyItems := retryQueue.GetReadyItems(DeletionRetryBatchSize)
readyItems := f.DeletionRetryQueue.GetReadyItems(DeletionRetryBatchSize)
if len(readyItems) == 0 {
continue
}
glog.V(1).Infof("retrying deletion of %d files", len(readyItems))
f.processRetryBatch(readyItems, lookupFunc, retryQueue)
f.processRetryBatch(readyItems, lookupFunc)
}
}
}
@ -417,7 +414,7 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin
// processRetryBatch attempts to retry deletion of files and processes results.
// Successfully deleted items are removed from tracking, retryable failures are
// re-queued with updated retry counts, and permanent errors are logged and discarded.
func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) {
func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
// Extract file IDs from retry items
fileIds := make([]string, 0, len(readyItems))
for _, item := range readyItems {
@ -427,39 +424,67 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu
// Attempt deletion
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
// Create a map for quick lookup of results
resultsByFileId := make(map[string]*volume_server_pb.DeleteResult, len(results))
// Group results by file ID to handle multiple results for replicated volumes
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult)
for _, result := range results {
resultsByFileId[result.FileId] = result
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result)
}
// Process results - iterate over readyItems to ensure all items are accounted for
var successCount, notFoundCount, retryCount, permanentErrorCount int
for _, item := range readyItems {
result, found := resultsByFileId[item.FileId]
fileIdResults, found := resultsByFileId[item.FileId]
if !found {
// No result returned for this file ID - could indicate a bug or edge case.
// Re-queue to avoid silent data loss.
glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId)
retryQueue.RequeueForRetry(item, "no deletion result from volume server")
f.DeletionRetryQueue.RequeueForRetry(item, "no deletion result from volume server")
retryCount++
continue
}
if result.Error == "" {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", result.FileId, item.RetryCount)
} else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
// Already deleted - success
notFoundCount++
} else if isRetryableError(result.Error) {
// Still failing, re-queue with preserved retry count
retryCount++
retryQueue.RequeueForRetry(item, result.Error)
} else {
// Permanent error on retry - give up
// Aggregate results for all replicas
var firstRetryableError string
var firstPermanentError string
allSuccessOrNotFound := true
for _, res := range fileIdResults {
if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) {
allSuccessOrNotFound = false
if isRetryableError(res.Error) {
if firstRetryableError == "" {
firstRetryableError = res.Error
}
} else {
if firstPermanentError == "" {
firstPermanentError = res.Error
}
}
}
}
// Determine overall outcome: permanent errors take precedence, then retryable errors
if firstPermanentError != "" {
permanentErrorCount++
glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error)
glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, firstPermanentError)
} else if firstRetryableError != "" {
retryCount++
f.DeletionRetryQueue.RequeueForRetry(item, firstRetryableError)
} else if allSuccessOrNotFound {
// Check if it's pure success or "not found"
isPureSuccess := true
for _, res := range fileIdResults {
if res.Error != "" {
isPureSuccess = false
break
}
}
if isPureSuccess {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount)
} else {
notFoundCount++
}
}
}

53
weed/filer/filer_deletion_test.go

@ -104,29 +104,50 @@ func TestDeletionRetryQueue_OverflowProtection(t *testing.T) {
func TestDeletionRetryQueue_MaxAttemptsReached(t *testing.T) {
queue := NewDeletionRetryQueue()
// Add item and set retry count near max
// Add item
queue.AddOrUpdate("file1", "error")
// Manually set high retry count
// Manually set retry count to max
queue.lock.Lock()
if item, exists := queue.itemIndex["file1"]; exists {
item.RetryCount = MaxRetryAttempts
item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now
item, exists := queue.itemIndex["file1"]
if !exists {
queue.lock.Unlock()
t.Fatal("Item not found in queue")
}
item.RetryCount = MaxRetryAttempts
item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now
heap.Fix(&queue.heap, item.heapIndex)
queue.lock.Unlock()
// Try to get ready items - should be discarded
readyItems := queue.GetReadyItems(10)
if len(readyItems) != 0 {
t.Errorf("Expected 0 items (max attempts reached), got %d", len(readyItems))
}
// Try to get ready items - should be returned for the last retry (attempt #10)
readyItems := queue.GetReadyItems(10)
if len(readyItems) != 1 {
t.Fatalf("Expected 1 item for last retry, got %d", len(readyItems))
}
// Should be removed from queue
if queue.Size() != 0 {
t.Errorf("Expected queue size 0 after max attempts, got %d", queue.Size())
}
} else {
// Requeue it, which will increment its retry count beyond the max
queue.RequeueForRetry(readyItems[0], "final error")
// Manually make it ready again
queue.lock.Lock()
item, exists = queue.itemIndex["file1"]
if !exists {
queue.lock.Unlock()
t.Fatal("Item not found in queue")
t.Fatal("Item not found in queue after requeue")
}
item.NextRetryAt = time.Now().Add(-1 * time.Second)
heap.Fix(&queue.heap, item.heapIndex)
queue.lock.Unlock()
// Now it should be discarded (retry count is 11, exceeds max of 10)
readyItems = queue.GetReadyItems(10)
if len(readyItems) != 0 {
t.Errorf("Expected 0 items (max attempts exceeded), got %d", len(readyItems))
}
// Should be removed from queue
if queue.Size() != 0 {
t.Errorf("Expected queue size 0 after max attempts exceeded, got %d", queue.Size())
}
}

Loading…
Cancel
Save