Browse Source

refactoring

pull/4462/merge
chrislu 5 months ago
parent
commit
a8fa78b892
  1. 2
      weed/filer/filer_delete_entry.go
  2. 44
      weed/filer/filer_deletion.go

2
weed/filer/filer_delete_entry.go

@ -50,7 +50,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
} }
if shouldDeleteChunks && !isDeleteCollection { if shouldDeleteChunks && !isDeleteCollection {
f.DirectDeleteChunks(entry.GetChunks())
f.DeleteChunks(p, entry.GetChunks())
} }
if isDeleteCollection { if isDeleteCollection {

44
weed/filer/filer_deletion.go

@ -70,50 +70,6 @@ func (f *Filer) loopProcessingDeletion() {
} }
} }
func (f *Filer) doDeleteFileIds(fileIds []string) {
lookupFunc := LookupByMasterClientFn(f.MasterClient)
DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
for len(fileIds) > 0 {
var toDeleteFileIds []string
if len(fileIds) > DeletionBatchSize {
toDeleteFileIds = fileIds[:DeletionBatchSize]
fileIds = fileIds[DeletionBatchSize:]
} else {
toDeleteFileIds = fileIds
fileIds = fileIds[:0]
}
deletionCount := len(toDeleteFileIds)
_, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
if err != nil {
if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
}
}
}
}
func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) {
var fileIdsToDelete []string
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
continue
}
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
for _, dChunk := range dataChunks {
fileIdsToDelete = append(fileIdsToDelete, dChunk.GetFileIdString())
}
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
}
f.doDeleteFileIds(fileIdsToDelete)
}
func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(chunks) f.doDeleteChunks(chunks)
} }

Loading…
Cancel
Save