|
|
|
@ -14,6 +14,7 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient" |
|
|
|
) |
|
|
|
|
|
|
|
@ -416,19 +417,31 @@ func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[strin |
|
|
|
func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error), retryQueue *DeletionRetryQueue) { |
|
|
|
// Extract file IDs from retry items
|
|
|
|
fileIds := make([]string, 0, len(readyItems)) |
|
|
|
itemsByFileId := make(map[string]*DeletionRetryItem, len(readyItems)) |
|
|
|
for _, item := range readyItems { |
|
|
|
fileIds = append(fileIds, item.FileId) |
|
|
|
itemsByFileId[item.FileId] = item |
|
|
|
} |
|
|
|
|
|
|
|
// Attempt deletion
|
|
|
|
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) |
|
|
|
|
|
|
|
// Process results
|
|
|
|
var successCount, notFoundCount, retryCount int |
|
|
|
// Create a map for quick lookup of results
|
|
|
|
resultsByFileId := make(map[string]*volume_server_pb.DeleteResult, len(results)) |
|
|
|
for _, result := range results { |
|
|
|
item := itemsByFileId[result.FileId] |
|
|
|
resultsByFileId[result.FileId] = result |
|
|
|
} |
|
|
|
|
|
|
|
// Process results - iterate over readyItems to ensure all items are accounted for
|
|
|
|
var successCount, notFoundCount, retryCount, permanentErrorCount int |
|
|
|
for _, item := range readyItems { |
|
|
|
result, found := resultsByFileId[item.FileId] |
|
|
|
if !found { |
|
|
|
// No result returned for this file ID - could indicate a bug or edge case.
|
|
|
|
// Re-queue to avoid silent data loss.
|
|
|
|
glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId) |
|
|
|
retryQueue.RequeueForRetry(item, "no deletion result from volume server") |
|
|
|
retryCount++ |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if result.Error == "" { |
|
|
|
successCount++ |
|
|
|
@ -442,6 +455,7 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu |
|
|
|
retryQueue.RequeueForRetry(item, result.Error) |
|
|
|
} else { |
|
|
|
// Permanent error on retry - give up
|
|
|
|
permanentErrorCount++ |
|
|
|
glog.Warningf("permanent error on retry for %s after %d attempts: %s", result.FileId, item.RetryCount, result.Error) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -452,6 +466,9 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu |
|
|
|
if retryCount > 0 { |
|
|
|
glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount) |
|
|
|
} |
|
|
|
if permanentErrorCount > 0 { |
|
|
|
glog.Warningf("retry: %d files failed with permanent errors", permanentErrorCount) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { |
|
|
|
|