diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index e8cf564e3..ba4625bab 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -28,7 +28,7 @@ func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) { for _, c := range chunks { - if !c.IsChunkManifest { + if c.IsChunkManifest { manifestChunks = append(manifestChunks, c) } else { nonManifestChunks = append(nonManifestChunks, c) @@ -37,7 +37,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa return } -func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { if !chunk.IsChunkManifest { @@ -45,19 +45,14 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil continue } - // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) + resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) if err != nil { - return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) - } - m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { - return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) + return chunks, nil, err } + manifestChunks = append(manifestChunks, chunk) // recursive - filer_pb.AfterEntryDeserialization(m.Chunks) - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) if subErr != nil { return chunks, nil, subErr } @@ -67,6 +62,26 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil return } +func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { + if !chunk.IsChunkManifest { + return + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) + } + + // recursive + filer_pb.AfterEntryDeserialization(m.Chunks) + return m.Chunks, nil +} + // TODO fetch from cache for weed mount? func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { urlString, err := lookupFileIdFn(fileId) diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 2ff9dac63..e3eb0e61f 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -70,16 +70,20 @@ func (f *Filer) loopProcessingDeletion() { func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + if !chunk.IsChunkManifest { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString()) + } } } -// DeleteFileByFileId direct delete by file id. -// Only used when the fileId is not being managed by snapshots. -func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionQueue.EnQueue(fileId) -} - func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { if oldEntry == nil { diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 203ebdad1..210ae1ba4 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -18,7 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { var fileIds []string for _, chunk := range chunks { - fileIds = append(fileIds, chunk.GetFileIdString()) + if !chunk.IsChunkManifest { + fileIds = append(fileIds, chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := filer2.ResolveOneChunkManifest(filer2.LookupFn(wfs), chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + fileIds = append(fileIds, dChunk.GetFileIdString()) + } } wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {