Browse Source

filer and mount deletion resolves manifest chunks also

pull/1444/head
Chris Lu 4 years ago
parent
commit
f2a8574448
  1. 37
      weed/filer2/filechunk_manifest.go
  2. 18
      weed/filer2/filer_deletion.go
  3. 12
      weed/filesys/wfs_deletion.go

37
weed/filer2/filechunk_manifest.go

@ -28,7 +28,7 @@ func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) { func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
for _, c := range chunks { for _, c := range chunks {
if !c.IsChunkManifest {
if c.IsChunkManifest {
manifestChunks = append(manifestChunks, c) manifestChunks = append(manifestChunks, c)
} else { } else {
nonManifestChunks = append(nonManifestChunks, c) nonManifestChunks = append(nonManifestChunks, c)
@ -37,7 +37,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return return
} }
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this // TODO maybe parallel this
for _, chunk := range chunks { for _, chunk := range chunks {
if !chunk.IsChunkManifest { if !chunk.IsChunkManifest {
@ -45,19 +45,14 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
continue continue
} }
// IsChunkManifest
data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
if err != nil { if err != nil {
return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
}
m := &filer_pb.FileChunkManifest{}
if err := proto.Unmarshal(data, m); err != nil {
return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
return chunks, nil, err
} }
manifestChunks = append(manifestChunks, chunk) manifestChunks = append(manifestChunks, chunk)
// recursive // recursive
filer_pb.AfterEntryDeserialization(m.Chunks)
dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
if subErr != nil { if subErr != nil {
return chunks, nil, subErr return chunks, nil, subErr
} }
@ -67,6 +62,26 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return return
} }
func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
// IsChunkManifest
data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
m := &filer_pb.FileChunkManifest{}
if err := proto.Unmarshal(data, m); err != nil {
return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
}
// recursive
filer_pb.AfterEntryDeserialization(m.Chunks)
return m.Chunks, nil
}
// TODO fetch from cache for weed mount? // TODO fetch from cache for weed mount?
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlString, err := lookupFileIdFn(fileId) urlString, err := lookupFileIdFn(fileId)

18
weed/filer2/filer_deletion.go

@ -70,16 +70,20 @@ func (f *Filer) loopProcessingDeletion() {
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks { for _, chunk := range chunks {
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
if !chunk.IsChunkManifest {
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
continue
}
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
for _, dChunk := range dataChunks {
f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
}
} }
} }
// DeleteFileByFileId direct delete by file id.
// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
f.fileIdDeletionQueue.EnQueue(fileId)
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil { if oldEntry == nil {

12
weed/filesys/wfs_deletion.go

@ -18,7 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
var fileIds []string var fileIds []string
for _, chunk := range chunks { for _, chunk := range chunks {
fileIds = append(fileIds, chunk.GetFileIdString())
if !chunk.IsChunkManifest {
fileIds = append(fileIds, chunk.GetFileIdString())
continue
}
dataChunks, manifestResolveErr := filer2.ResolveOneChunkManifest(filer2.LookupFn(wfs), chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
for _, dChunk := range dataChunks {
fileIds = append(fileIds, dChunk.GetFileIdString())
}
} }
wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {

Loading…
Cancel
Save