|
|
@ -8,6 +8,8 @@ import ( |
|
|
"sync" |
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"google.golang.org/grpc" |
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage" |
|
|
"github.com/seaweedfs/seaweedfs/weed/storage" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
|
|
|
|
|
@ -339,20 +341,15 @@ func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([ |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc) |
|
|
|
|
|
|
|
|
|
|
|
// Group results by file ID to handle multiple results for replicated volumes
|
|
|
|
|
|
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) |
|
|
|
|
|
for _, result := range results { |
|
|
|
|
|
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Delete files and classify outcomes
|
|
|
|
|
|
outcomes := deleteFilesAndClassify(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc) |
|
|
|
|
|
|
|
|
// Process results
|
|
|
|
|
|
|
|
|
// Process outcomes
|
|
|
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int |
|
|
var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int |
|
|
var errorDetails []string |
|
|
var errorDetails []string |
|
|
|
|
|
|
|
|
for _, fileId := range uniqueFileIdsSlice { |
|
|
for _, fileId := range uniqueFileIdsSlice { |
|
|
outcome := classifyDeletionOutcome(fileId, resultsByFileId) |
|
|
|
|
|
|
|
|
outcome := outcomes[fileId] |
|
|
|
|
|
|
|
|
switch outcome.status { |
|
|
switch outcome.status { |
|
|
case deletionOutcomeSuccess: |
|
|
case deletionOutcomeSuccess: |
|
|
@ -410,6 +407,26 @@ type deletionOutcome struct { |
|
|
errorMsg string |
|
|
errorMsg string |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// deleteFilesAndClassify performs deletion and classifies outcomes for a list of file IDs
|
|
|
|
|
|
func deleteFilesAndClassify(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) map[string]deletionOutcome { |
|
|
|
|
|
// Perform deletion
|
|
|
|
|
|
results := operation.DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) |
|
|
|
|
|
|
|
|
|
|
|
// Group results by file ID to handle multiple results for replicated volumes
|
|
|
|
|
|
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) |
|
|
|
|
|
for _, result := range results { |
|
|
|
|
|
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Classify outcome for each file
|
|
|
|
|
|
outcomes := make(map[string]deletionOutcome, len(fileIds)) |
|
|
|
|
|
for _, fileId := range fileIds { |
|
|
|
|
|
outcomes[fileId] = classifyDeletionOutcome(fileId, resultsByFileId) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return outcomes |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome
|
|
|
// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome
|
|
|
func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome { |
|
|
func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome { |
|
|
fileIdResults, found := resultsByFileId[fileId] |
|
|
fileIdResults, found := resultsByFileId[fileId] |
|
|
@ -532,19 +549,13 @@ func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc fu |
|
|
fileIds = append(fileIds, item.FileId) |
|
|
fileIds = append(fileIds, item.FileId) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Attempt deletion
|
|
|
|
|
|
results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) |
|
|
|
|
|
|
|
|
|
|
|
// Group results by file ID to handle multiple results for replicated volumes
|
|
|
|
|
|
resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult) |
|
|
|
|
|
for _, result := range results { |
|
|
|
|
|
resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Delete files and classify outcomes
|
|
|
|
|
|
outcomes := deleteFilesAndClassify(f.GrpcDialOption, fileIds, lookupFunc) |
|
|
|
|
|
|
|
|
// Process results - iterate over readyItems to ensure all items are accounted for
|
|
|
|
|
|
|
|
|
// Process outcomes - iterate over readyItems to ensure all items are accounted for
|
|
|
var successCount, notFoundCount, retryCount, permanentErrorCount int |
|
|
var successCount, notFoundCount, retryCount, permanentErrorCount int |
|
|
for _, item := range readyItems { |
|
|
for _, item := range readyItems { |
|
|
outcome := classifyDeletionOutcome(item.FileId, resultsByFileId) |
|
|
|
|
|
|
|
|
outcome := outcomes[item.FileId] |
|
|
|
|
|
|
|
|
switch outcome.status { |
|
|
switch outcome.status { |
|
|
case deletionOutcomeSuccess: |
|
|
case deletionOutcomeSuccess: |
|
|
|