Browse Source

check in-flight items also; dedup code

pull/7402/head
chrislu 1 month ago
parent
commit
2b5444528d
  1. 222
      weed/filer/filer_deletion.go

222
weed/filer/filer_deletion.go

@ -62,7 +62,8 @@ type DeletionRetryItem struct {
RetryCount int
NextRetryAt time.Time
LastError string
heapIndex int // index in the heap (for heap.Interface)
heapIndex int // index in the heap (for heap.Interface)
inFlight bool // true when item is being processed, prevents duplicate additions
}
// retryHeap implements heap.Interface for DeletionRetryItem
@ -162,13 +163,17 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
q.lock.Lock()
defer q.lock.Unlock()
// Check if item already exists
// Check if item already exists (including in-flight items)
if item, exists := q.itemIndex[fileId]; exists {
// Item is already in the queue. Just update the error.
// Item is already in the queue or being processed. Just update the error.
// The existing retry schedule should proceed.
// RetryCount is only incremented in RequeueForRetry when an actual retry is performed.
item.LastError = errorMsg
glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt))
if item.inFlight {
glog.V(2).Infof("retry for %s in-flight: attempt %d, will preserve retry state", fileId, item.RetryCount)
} else {
glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt))
}
return
}
@ -179,6 +184,7 @@ func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
RetryCount: 1,
NextRetryAt: time.Now().Add(delay),
LastError: errorMsg,
inFlight: false,
}
heap.Push(&q.heap, item)
q.itemIndex[fileId] = item
@ -199,14 +205,14 @@ func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg s
// Calculate next retry time with exponential backoff
delay := calculateBackoff(item.RetryCount)
item.NextRetryAt = time.Now().Add(delay)
item.inFlight = false // Clear in-flight flag
glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay)
// Re-add to heap and index
// Re-add to heap (item still in itemIndex)
heap.Push(&q.heap, item)
q.itemIndex[item.FileId] = item
}
// GetReadyItems returns items that are ready to be retried and removes them from the queue
// GetReadyItems returns items that are ready to be retried and marks them as in-flight
// Time complexity: O(K log N) where K is the number of ready items
// Items are processed in order of NextRetryAt (earliest first)
func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
@ -225,14 +231,15 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
break
}
// Remove from heap and index
// Remove from heap but keep in itemIndex with inFlight flag
heap.Pop(&q.heap)
delete(q.itemIndex, item.FileId)
if item.RetryCount <= MaxRetryAttempts {
item.inFlight = true // Mark as being processed
readyItems = append(readyItems, item)
} else {
// Max attempts reached, log and discard
// Max attempts reached, log and discard completely
delete(q.itemIndex, item.FileId)
glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError)
}
}
@ -240,6 +247,16 @@ func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
return readyItems
}
// Remove removes an item from the queue (called when deletion succeeds or fails permanently)
// Time complexity: O(1)
func (q *DeletionRetryQueue) Remove(item *DeletionRetryItem) {
q.lock.Lock()
defer q.lock.Unlock()
// Item was already removed from heap by GetReadyItems, just remove from index
delete(q.itemIndex, item.FileId)
}
// Size returns the current size of the retry queue
func (q *DeletionRetryQueue) Size() int {
q.lock.Lock()
@ -335,60 +352,23 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
var errorDetails []string
for _, fileId := range uniqueFileIdsSlice {
fileIdResults, found := resultsByFileId[fileId]
if !found {
// This can happen if lookup fails for the volume.
// To be safe, we treat it as a retryable error.
outcome := classifyDeletionOutcome(fileId, resultsByFileId)
switch outcome.status {
case "success":
successCount++
case "not_found":
notFoundCount++
case "retryable", "no_result":
retryableErrorCount++
f.DeletionRetryQueue.AddOrUpdate(fileId, "no deletion result from volume server")
f.DeletionRetryQueue.AddOrUpdate(fileId, outcome.errorMsg)
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, fileId+": no deletion result (will retry)")
}
continue
}
var firstRetryableError, firstPermanentError string
allSuccessOrNotFound := true
for _, res := range fileIdResults {
if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) {
allSuccessOrNotFound = false
if isRetryableError(res.Error) {
if firstRetryableError == "" {
firstRetryableError = res.Error
}
} else {
if firstPermanentError == "" {
firstPermanentError = res.Error
}
}
errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (will retry)")
}
}
// Determine overall outcome: permanent errors take precedence, then retryable errors
if firstPermanentError != "" {
case "permanent":
permanentErrorCount++
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, fileId+": "+firstPermanentError+" (permanent)")
}
} else if firstRetryableError != "" {
retryableErrorCount++
f.DeletionRetryQueue.AddOrUpdate(fileId, firstRetryableError)
if len(errorDetails) < MaxLoggedErrorDetails {
errorDetails = append(errorDetails, fileId+": "+firstRetryableError+" (will retry)")
}
} else if allSuccessOrNotFound {
isPureSuccess := true
for _, res := range fileIdResults {
if res.Error != "" {
isPureSuccess = false
break
}
}
if isPureSuccess {
successCount++
} else {
notFoundCount++
errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (permanent)")
}
}
}
@ -416,6 +396,64 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([
}
}
// deletionOutcome represents the result of classifying deletion results for a file
type deletionOutcome struct {
status string // "success", "not_found", "retryable", "permanent", "no_result"
errorMsg string
}
// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome
func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome {
fileIdResults, found := resultsByFileId[fileId]
if !found {
return deletionOutcome{
status: "no_result",
errorMsg: "no deletion result from volume server",
}
}
var firstRetryableError, firstPermanentError string
allSuccessOrNotFound := true
for _, res := range fileIdResults {
if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) {
allSuccessOrNotFound = false
if isRetryableError(res.Error) {
if firstRetryableError == "" {
firstRetryableError = res.Error
}
} else {
if firstPermanentError == "" {
firstPermanentError = res.Error
}
}
}
}
// Determine overall outcome: permanent errors take precedence, then retryable errors
if firstPermanentError != "" {
return deletionOutcome{status: "permanent", errorMsg: firstPermanentError}
} else if firstRetryableError != "" {
return deletionOutcome{status: "retryable", errorMsg: firstRetryableError}
} else if allSuccessOrNotFound {
// Check if it's pure success or "not found"
isPureSuccess := true
for _, res := range fileIdResults {
if res.Error != "" {
isPureSuccess = false
break
}
}
if isPureSuccess {
return deletionOutcome{status: "success", errorMsg: ""}
}
return deletionOutcome{status: "not_found", errorMsg: ""}
}
// Shouldn't reach here, but return retryable as safe default
return deletionOutcome{status: "retryable", errorMsg: "unknown error"}
}
// isRetryableError determines if an error is retryable based on its message.
//
// Current implementation uses string matching which is brittle and may break
@ -498,58 +536,26 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu
// Process results - iterate over readyItems to ensure all items are accounted for
var successCount, notFoundCount, retryCount, permanentErrorCount int
for _, item := range readyItems {
fileIdResults, found := resultsByFileId[item.FileId]
if !found {
// No result returned for this file ID - could indicate a bug or edge case.
// Re-queue to avoid silent data loss.
glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId)
f.DeletionRetryQueue.RequeueForRetry(item, "no deletion result from volume server")
outcome := classifyDeletionOutcome(item.FileId, resultsByFileId)
switch outcome.status {
case "success":
successCount++
f.DeletionRetryQueue.Remove(item) // Remove from queue (success)
glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount)
case "not_found":
notFoundCount++
f.DeletionRetryQueue.Remove(item) // Remove from queue (already deleted)
case "retryable", "no_result":
retryCount++
continue
}
// Aggregate results for all replicas
var firstRetryableError string
var firstPermanentError string
allSuccessOrNotFound := true
for _, res := range fileIdResults {
if res.Error != "" && res.Error != "not found" && !strings.Contains(res.Error, storage.ErrorDeleted.Error()) {
allSuccessOrNotFound = false
if isRetryableError(res.Error) {
if firstRetryableError == "" {
firstRetryableError = res.Error
}
} else {
if firstPermanentError == "" {
firstPermanentError = res.Error
}
}
if outcome.status == "no_result" {
glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId)
}
}
// Determine overall outcome: permanent errors take precedence, then retryable errors
if firstPermanentError != "" {
f.DeletionRetryQueue.RequeueForRetry(item, outcome.errorMsg)
case "permanent":
permanentErrorCount++
glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, firstPermanentError)
} else if firstRetryableError != "" {
retryCount++
f.DeletionRetryQueue.RequeueForRetry(item, firstRetryableError)
} else if allSuccessOrNotFound {
// Check if it's pure success or "not found"
isPureSuccess := true
for _, res := range fileIdResults {
if res.Error != "" {
isPureSuccess = false
break
}
}
if isPureSuccess {
successCount++
glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount)
} else {
notFoundCount++
}
f.DeletionRetryQueue.Remove(item) // Remove from queue (permanent failure)
glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, outcome.errorMsg)
}
}

Loading…
Cancel
Save