You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
634 lines
22 KiB
634 lines
22 KiB
package filer
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
)
|
|
|
|
const (
|
|
// Maximum number of retry attempts for failed deletions
|
|
MaxRetryAttempts = 10
|
|
// Initial retry delay (will be doubled with each attempt)
|
|
InitialRetryDelay = 5 * time.Minute
|
|
// Maximum retry delay
|
|
MaxRetryDelay = 6 * time.Hour
|
|
// Interval for checking retry queue for ready items
|
|
DeletionRetryPollInterval = 1 * time.Minute
|
|
// Maximum number of items to process per retry iteration
|
|
DeletionRetryBatchSize = 1000
|
|
// Maximum number of error details to include in log messages
|
|
MaxLoggedErrorDetails = 10
|
|
// Interval for polling the deletion queue for new items
|
|
// Using a prime number to de-synchronize with other periodic tasks
|
|
DeletionPollInterval = 1123 * time.Millisecond
|
|
// Maximum number of file IDs to delete per batch (roughly 20 bytes per file ID)
|
|
DeletionBatchSize = 100000
|
|
)
|
|
|
|
// retryablePatterns contains error message patterns that indicate temporary/transient conditions
|
|
// that should be retried. These patterns are based on actual error messages from the deletion pipeline.
|
|
var retryablePatterns = []string{
|
|
"is read only", // Volume temporarily read-only (tiering, maintenance)
|
|
"error reading from server", // Network I/O errors
|
|
"connection reset by peer", // Network connection issues
|
|
"closed network connection", // Network connection closed unexpectedly
|
|
"connection refused", // Server temporarily unavailable
|
|
"timeout", // Operation timeout (network or server)
|
|
"deadline exceeded", // Context deadline exceeded
|
|
"context canceled", // Context cancellation (may be transient)
|
|
"lookup error", // Volume lookup failures
|
|
"lookup failed", // Volume server discovery issues
|
|
"too many requests", // Rate limiting / backpressure
|
|
"service unavailable", // HTTP 503 errors
|
|
"temporarily unavailable", // Temporary service issues
|
|
"try again", // Explicit retry suggestion
|
|
"i/o timeout", // Network I/O timeout
|
|
"broken pipe", // Connection broken during operation
|
|
}
|
|
|
|
// DeletionRetryItem represents a file deletion that failed and needs to be retried
|
|
type DeletionRetryItem struct {
|
|
FileId string
|
|
RetryCount int
|
|
NextRetryAt time.Time
|
|
LastError string
|
|
heapIndex int // index in the heap (for heap.Interface)
|
|
inFlight bool // true when item is being processed, prevents duplicate additions
|
|
}
|
|
|
|
// retryHeap implements heap.Interface for DeletionRetryItem
|
|
// Items are ordered by NextRetryAt (earliest first)
|
|
type retryHeap []*DeletionRetryItem
|
|
|
|
// Compile-time assertion that retryHeap implements heap.Interface
|
|
var _ heap.Interface = (*retryHeap)(nil)
|
|
|
|
func (h retryHeap) Len() int { return len(h) }
|
|
|
|
func (h retryHeap) Less(i, j int) bool {
|
|
return h[i].NextRetryAt.Before(h[j].NextRetryAt)
|
|
}
|
|
|
|
func (h retryHeap) Swap(i, j int) {
|
|
h[i], h[j] = h[j], h[i]
|
|
h[i].heapIndex = i
|
|
h[j].heapIndex = j
|
|
}
|
|
|
|
func (h *retryHeap) Push(x any) {
|
|
item := x.(*DeletionRetryItem)
|
|
item.heapIndex = len(*h)
|
|
*h = append(*h, item)
|
|
}
|
|
|
|
func (h *retryHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil // avoid memory leak
|
|
item.heapIndex = -1 // mark as removed
|
|
*h = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// DeletionRetryQueue manages the queue of failed deletions that need to be retried.
|
|
// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items.
|
|
//
|
|
// LIMITATION: Current implementation stores retry queue in memory only.
|
|
// On filer restart, all pending retries are lost. With MaxRetryDelay up to 6 hours,
|
|
// process restarts during this window will cause retry state loss.
|
|
//
|
|
// TODO: Consider persisting retry queue to durable storage for production resilience:
|
|
// - Option 1: Leverage existing Filer store (KV operations)
|
|
// - Option 2: Periodic snapshots to disk with recovery on startup
|
|
// - Option 3: Write-ahead log for retry queue mutations
|
|
// - Trade-offs: Performance vs durability, complexity vs reliability
|
|
//
|
|
// For now, accepting in-memory storage as pragmatic initial implementation.
|
|
// Lost retries will be eventually consistent as files remain in deletion queue.
|
|
type DeletionRetryQueue struct {
|
|
heap retryHeap
|
|
itemIndex map[string]*DeletionRetryItem // for O(1) lookup by FileId
|
|
lock sync.Mutex
|
|
}
|
|
|
|
// NewDeletionRetryQueue creates a new retry queue
|
|
func NewDeletionRetryQueue() *DeletionRetryQueue {
|
|
q := &DeletionRetryQueue{
|
|
heap: make(retryHeap, 0),
|
|
itemIndex: make(map[string]*DeletionRetryItem),
|
|
}
|
|
heap.Init(&q.heap)
|
|
return q
|
|
}
|
|
|
|
// calculateBackoff calculates the exponential backoff delay for a given retry count.
|
|
// Uses exponential backoff formula: InitialRetryDelay * 2^(retryCount-1)
|
|
// The first retry (retryCount=1) uses InitialRetryDelay, second uses 2x, third uses 4x, etc.
|
|
// Includes overflow protection and caps at MaxRetryDelay.
|
|
func calculateBackoff(retryCount int) time.Duration {
|
|
// The first retry is attempt 1, but shift should start at 0
|
|
if retryCount <= 1 {
|
|
return InitialRetryDelay
|
|
}
|
|
|
|
shiftAmount := uint(retryCount - 1)
|
|
|
|
// time.Duration is an int64. A left shift of 63 or more will result in a
|
|
// negative number or zero. The multiplication can also overflow much earlier
|
|
// (around a shift of 25 for a 5-minute initial delay).
|
|
// The `delay <= 0` check below correctly catches all these overflow cases.
|
|
delay := InitialRetryDelay << shiftAmount
|
|
|
|
if delay <= 0 || delay > MaxRetryDelay {
|
|
return MaxRetryDelay
|
|
}
|
|
|
|
return delay
|
|
}
|
|
|
|
// AddOrUpdate adds a new failed deletion or updates an existing one
|
|
// Time complexity: O(log N) for insertion/update
|
|
func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
// Check if item already exists (including in-flight items)
|
|
if item, exists := q.itemIndex[fileId]; exists {
|
|
// Item is already in the queue or being processed. Just update the error.
|
|
// The existing retry schedule should proceed.
|
|
// RetryCount is only incremented in RequeueForRetry when an actual retry is performed.
|
|
item.LastError = errorMsg
|
|
if item.inFlight {
|
|
glog.V(2).Infof("retry for %s in-flight: attempt %d, will preserve retry state", fileId, item.RetryCount)
|
|
} else {
|
|
glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt))
|
|
}
|
|
return
|
|
}
|
|
|
|
// Add new item
|
|
delay := InitialRetryDelay
|
|
item := &DeletionRetryItem{
|
|
FileId: fileId,
|
|
RetryCount: 1,
|
|
NextRetryAt: time.Now().Add(delay),
|
|
LastError: errorMsg,
|
|
inFlight: false,
|
|
}
|
|
heap.Push(&q.heap, item)
|
|
q.itemIndex[fileId] = item
|
|
glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay)
|
|
}
|
|
|
|
// RequeueForRetry re-adds a previously failed item back to the queue with incremented retry count.
|
|
// This method MUST be used when re-queuing items from processRetryBatch to preserve retry state.
|
|
// Time complexity: O(log N) for insertion
|
|
func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg string) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
// Increment retry count
|
|
item.RetryCount++
|
|
item.LastError = errorMsg
|
|
|
|
// Calculate next retry time with exponential backoff
|
|
delay := calculateBackoff(item.RetryCount)
|
|
item.NextRetryAt = time.Now().Add(delay)
|
|
item.inFlight = false // Clear in-flight flag
|
|
glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay)
|
|
|
|
// Re-add to heap (item still in itemIndex)
|
|
heap.Push(&q.heap, item)
|
|
}
|
|
|
|
// GetReadyItems returns items that are ready to be retried and marks them as in-flight
|
|
// Time complexity: O(K log N) where K is the number of ready items
|
|
// Items are processed in order of NextRetryAt (earliest first)
|
|
func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
now := time.Now()
|
|
var readyItems []*DeletionRetryItem
|
|
|
|
// Peek at items from the top of the heap (earliest NextRetryAt)
|
|
for len(q.heap) > 0 && len(readyItems) < maxItems {
|
|
item := q.heap[0]
|
|
|
|
// If the earliest item is not ready yet, no other items are ready either
|
|
if item.NextRetryAt.After(now) {
|
|
break
|
|
}
|
|
|
|
// Remove from heap but keep in itemIndex with inFlight flag
|
|
heap.Pop(&q.heap)
|
|
|
|
if item.RetryCount <= MaxRetryAttempts {
|
|
item.inFlight = true // Mark as being processed
|
|
readyItems = append(readyItems, item)
|
|
} else {
|
|
// Max attempts reached, log and discard completely
|
|
delete(q.itemIndex, item.FileId)
|
|
glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError)
|
|
}
|
|
}
|
|
|
|
return readyItems
|
|
}
|
|
|
|
// Remove removes an item from the queue (called when deletion succeeds or fails permanently)
|
|
// Time complexity: O(1)
|
|
func (q *DeletionRetryQueue) Remove(item *DeletionRetryItem) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
// Item was already removed from heap by GetReadyItems, just remove from index
|
|
delete(q.itemIndex, item.FileId)
|
|
}
|
|
|
|
// Size returns the current size of the retry queue
|
|
func (q *DeletionRetryQueue) Size() int {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
return len(q.heap)
|
|
}
|
|
|
|
func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) {
|
|
return func(vids []string) (map[string]*operation.LookupResult, error) {
|
|
m := make(map[string]*operation.LookupResult)
|
|
for _, vid := range vids {
|
|
locs, _ := masterClient.GetVidLocations(vid)
|
|
var locations []operation.Location
|
|
for _, loc := range locs {
|
|
locations = append(locations, operation.Location{
|
|
Url: loc.Url,
|
|
PublicUrl: loc.PublicUrl,
|
|
GrpcPort: loc.GrpcPort,
|
|
})
|
|
}
|
|
m[vid] = &operation.LookupResult{
|
|
VolumeOrFileId: vid,
|
|
Locations: locations,
|
|
}
|
|
}
|
|
return m, nil
|
|
}
|
|
}
|
|
|
|
func (f *Filer) loopProcessingDeletion() {
|
|
|
|
lookupFunc := LookupByMasterClientFn(f.MasterClient)
|
|
|
|
// Start retry processor in a separate goroutine
|
|
go f.loopProcessingDeletionRetry(lookupFunc)
|
|
|
|
ticker := time.NewTicker(DeletionPollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-f.deletionQuit:
|
|
glog.V(0).Infof("deletion processor shutting down")
|
|
return
|
|
case <-ticker.C:
|
|
f.fileIdDeletionQueue.Consume(func(fileIds []string) {
|
|
for i := 0; i < len(fileIds); i += DeletionBatchSize {
|
|
end := i + DeletionBatchSize
|
|
if end > len(fileIds) {
|
|
end = len(fileIds)
|
|
}
|
|
toDeleteFileIds := fileIds[i:end]
|
|
f.processDeletionBatch(toDeleteFileIds, lookupFunc)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// processDeletionBatch handles deletion of a batch of file IDs and processes results.
|
|
// It classifies errors into retryable and permanent categories, adds retryable failures
|
|
// to the retry queue, and logs appropriate messages.
|
|
func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
|
|
// Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch.
|
|
uniqueFileIdsSlice := make([]string, 0, len(toDeleteFileIds))
|
|
processed := make(map[string]struct{}, len(toDeleteFileIds))
|
|
for _, fileId := range toDeleteFileIds {
|
|
if _, found := processed[fileId]; !found {
|
|
processed[fileId] = struct{}{}
|
|
uniqueFileIdsSlice = append(uniqueFileIdsSlice, fileId)
|
|
}
|
|
}
|
|
|
|
if len(uniqueFileIdsSlice) == 0 {
|
|
return
|
|
}
|
|
|
|
// Delete files and classify outcomes
|
|
outcomes := deleteFilesAndClassify(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc)
|
|
|
|
// Process outcomes
|
|
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
|
|
var errorDetails []string
|
|
|
|
for _, fileId := range uniqueFileIdsSlice {
|
|
outcome := outcomes[fileId]
|
|
|
|
switch outcome.status {
|
|
case deletionOutcomeSuccess:
|
|
successCount++
|
|
case deletionOutcomeNotFound:
|
|
notFoundCount++
|
|
case deletionOutcomeRetryable, deletionOutcomeNoResult:
|
|
retryableErrorCount++
|
|
f.DeletionRetryQueue.AddOrUpdate(fileId, outcome.errorMsg)
|
|
if len(errorDetails) < MaxLoggedErrorDetails {
|
|
errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (will retry)")
|
|
}
|
|
case deletionOutcomePermanent:
|
|
permanentErrorCount++
|
|
if len(errorDetails) < MaxLoggedErrorDetails {
|
|
errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (permanent)")
|
|
}
|
|
}
|
|
}
|
|
|
|
if successCount > 0 || notFoundCount > 0 {
|
|
glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
|
|
}
|
|
|
|
totalErrors := retryableErrorCount + permanentErrorCount
|
|
if totalErrors > 0 {
|
|
logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
|
|
totalErrors, len(uniqueFileIdsSlice), retryableErrorCount, permanentErrorCount)
|
|
if len(errorDetails) > 0 {
|
|
if totalErrors > MaxLoggedErrorDetails {
|
|
logMessage += fmt.Sprintf(" (showing first %d)", len(errorDetails))
|
|
}
|
|
glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
|
|
} else {
|
|
glog.V(0).Info(logMessage)
|
|
}
|
|
}
|
|
|
|
if f.DeletionRetryQueue.Size() > 0 {
|
|
glog.V(2).Infof("retry queue size: %d", f.DeletionRetryQueue.Size())
|
|
}
|
|
}
|
|
|
|
const (
|
|
deletionOutcomeSuccess = "success"
|
|
deletionOutcomeNotFound = "not_found"
|
|
deletionOutcomeRetryable = "retryable"
|
|
deletionOutcomePermanent = "permanent"
|
|
deletionOutcomeNoResult = "no_result"
|
|
)
|
|
|
|
// deletionOutcome represents the result of classifying deletion results for a file
|
|
type deletionOutcome struct {
|
|
status string // One of the deletionOutcome* constants
|
|
errorMsg string
|
|
}
|
|
|
|
// deleteFilesAndClassify performs deletion and classifies outcomes for a list of file IDs
|
|
func deleteFilesAndClassify(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) map[string]deletionOutcome {
|
|
// Perform deletion
|
|
results := operation.DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
|
|
|
|
// Group results by file ID to handle multiple results for replicated volumes
|
|
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult)
|
|
for _, result := range results {
|
|
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result)
|
|
}
|
|
|
|
// Classify outcome for each file
|
|
outcomes := make(map[string]deletionOutcome, len(fileIds))
|
|
for _, fileId := range fileIds {
|
|
outcomes[fileId] = classifyDeletionOutcome(fileId, resultsByFileId)
|
|
}
|
|
|
|
return outcomes
|
|
}
|
|
|
|
// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome
|
|
// Uses a single pass through results with early return for permanent errors (highest priority)
|
|
// Priority: Permanent > Retryable > Success > Not Found
|
|
func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome {
|
|
fileIdResults, found := resultsByFileId[fileId]
|
|
if !found || len(fileIdResults) == 0 {
|
|
return deletionOutcome{
|
|
status: deletionOutcomeNoResult,
|
|
errorMsg: "no deletion result from volume server",
|
|
}
|
|
}
|
|
|
|
var firstRetryableError string
|
|
hasSuccess := false
|
|
|
|
for _, res := range fileIdResults {
|
|
if res.Error == "" {
|
|
hasSuccess = true
|
|
continue
|
|
}
|
|
if strings.Contains(res.Error, storage.ErrorDeleted.Error()) || res.Error == "not found" {
|
|
continue
|
|
}
|
|
|
|
if isRetryableError(res.Error) {
|
|
if firstRetryableError == "" {
|
|
firstRetryableError = res.Error
|
|
}
|
|
} else {
|
|
// Permanent error takes highest precedence - return immediately
|
|
return deletionOutcome{status: deletionOutcomePermanent, errorMsg: res.Error}
|
|
}
|
|
}
|
|
|
|
if firstRetryableError != "" {
|
|
return deletionOutcome{status: deletionOutcomeRetryable, errorMsg: firstRetryableError}
|
|
}
|
|
|
|
if hasSuccess {
|
|
return deletionOutcome{status: deletionOutcomeSuccess, errorMsg: ""}
|
|
}
|
|
|
|
// If we are here, all results were "not found"
|
|
return deletionOutcome{status: deletionOutcomeNotFound, errorMsg: ""}
|
|
}
|
|
|
|
// isRetryableError determines if an error is retryable based on its message.
|
|
//
|
|
// Current implementation uses string matching which is brittle and may break
|
|
// if error messages change in dependencies. This is acceptable for the initial
|
|
// implementation but should be improved in the future.
|
|
//
|
|
// TODO: Consider these improvements for more robust error handling:
|
|
// - Pass DeleteResult instead of just error string to access Status codes
|
|
// - Use HTTP status codes (503 Service Unavailable, 429 Too Many Requests, etc.)
|
|
// - Implement structured error types that can be checked with errors.Is/errors.As
|
|
// - Extract and check gRPC status codes for better classification
|
|
// - Add error wrapping in the deletion pipeline to preserve error context
|
|
//
|
|
// For now, we use conservative string matching for known transient error patterns.
|
|
func isRetryableError(errorMsg string) bool {
|
|
// Empty errors are not retryable
|
|
if errorMsg == "" {
|
|
return false
|
|
}
|
|
|
|
errorLower := strings.ToLower(errorMsg)
|
|
for _, pattern := range retryablePatterns {
|
|
if strings.Contains(errorLower, pattern) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// loopProcessingDeletionRetry processes the retry queue for failed deletions
|
|
func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
|
|
|
|
ticker := time.NewTicker(DeletionRetryPollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-f.deletionQuit:
|
|
glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", f.DeletionRetryQueue.Size())
|
|
return
|
|
case <-ticker.C:
|
|
// Process all ready items in batches until queue is empty
|
|
totalProcessed := 0
|
|
for {
|
|
readyItems := f.DeletionRetryQueue.GetReadyItems(DeletionRetryBatchSize)
|
|
if len(readyItems) == 0 {
|
|
break
|
|
}
|
|
|
|
f.processRetryBatch(readyItems, lookupFunc)
|
|
totalProcessed += len(readyItems)
|
|
}
|
|
|
|
if totalProcessed > 0 {
|
|
glog.V(1).Infof("retried deletion of %d files", totalProcessed)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processRetryBatch attempts to retry deletion of files and processes results.
|
|
// Successfully deleted items are removed from tracking, retryable failures are
|
|
// re-queued with updated retry counts, and permanent errors are logged and discarded.
|
|
func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
|
|
// Extract file IDs from retry items
|
|
fileIds := make([]string, 0, len(readyItems))
|
|
for _, item := range readyItems {
|
|
fileIds = append(fileIds, item.FileId)
|
|
}
|
|
|
|
// Delete files and classify outcomes
|
|
outcomes := deleteFilesAndClassify(f.GrpcDialOption, fileIds, lookupFunc)
|
|
|
|
// Process outcomes - iterate over readyItems to ensure all items are accounted for
|
|
var successCount, notFoundCount, retryCount, permanentErrorCount int
|
|
for _, item := range readyItems {
|
|
outcome := outcomes[item.FileId]
|
|
|
|
switch outcome.status {
|
|
case deletionOutcomeSuccess:
|
|
successCount++
|
|
f.DeletionRetryQueue.Remove(item) // Remove from queue (success)
|
|
glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount)
|
|
case deletionOutcomeNotFound:
|
|
notFoundCount++
|
|
f.DeletionRetryQueue.Remove(item) // Remove from queue (already deleted)
|
|
case deletionOutcomeRetryable, deletionOutcomeNoResult:
|
|
retryCount++
|
|
if outcome.status == deletionOutcomeNoResult {
|
|
glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId)
|
|
}
|
|
f.DeletionRetryQueue.RequeueForRetry(item, outcome.errorMsg)
|
|
case deletionOutcomePermanent:
|
|
permanentErrorCount++
|
|
f.DeletionRetryQueue.Remove(item) // Remove from queue (permanent failure)
|
|
glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, outcome.errorMsg)
|
|
}
|
|
}
|
|
|
|
if successCount > 0 || notFoundCount > 0 {
|
|
glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount)
|
|
}
|
|
if retryCount > 0 {
|
|
glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount)
|
|
}
|
|
if permanentErrorCount > 0 {
|
|
glog.Warningf("retry: %d files failed with permanent errors", permanentErrorCount)
|
|
}
|
|
}
|
|
|
|
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
|
|
f.doDeleteChunks(ctx, chunks)
|
|
}
|
|
|
|
func (f *Filer) DeleteChunks(ctx context.Context, fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
|
|
rule := f.FilerConf.MatchStorageRule(string(fullpath))
|
|
if rule.DisableChunkDeletion {
|
|
return
|
|
}
|
|
f.doDeleteChunks(ctx, chunks)
|
|
}
|
|
|
|
func (f *Filer) doDeleteChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
|
|
for _, chunk := range chunks {
|
|
if !chunk.IsChunkManifest {
|
|
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
|
|
continue
|
|
}
|
|
dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk)
|
|
if manifestResolveErr != nil {
|
|
glog.V(0).InfofCtx(ctx, "failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
|
}
|
|
for _, dChunk := range dataChunks {
|
|
f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
|
|
}
|
|
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
|
|
}
|
|
}
|
|
|
|
func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
|
|
for _, chunk := range chunks {
|
|
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
|
|
}
|
|
}
|
|
|
|
func (f *Filer) deleteChunksIfNotNew(ctx context.Context, oldEntry, newEntry *Entry) {
|
|
var oldChunks, newChunks []*filer_pb.FileChunk
|
|
if oldEntry != nil {
|
|
oldChunks = oldEntry.GetChunks()
|
|
}
|
|
if newEntry != nil {
|
|
newChunks = newEntry.GetChunks()
|
|
}
|
|
|
|
toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
|
|
if err != nil {
|
|
glog.ErrorfCtx(ctx, "Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
|
|
return
|
|
}
|
|
f.DeleteChunksNotRecursive(toDelete)
|
|
}
|