Browse Source

proper deletion ordering

delete central file store first, then delete local cache
random_access_file
Chris Lu 4 years ago
parent
commit
5b43bddf20
  1. 1
      weed/filer2/filer_delete_entry.go
  2. 20
      weed/filesys/dir.go

1
weed/filer2/filer_delete_entry.go

@ -65,6 +65,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
} }
if lastFileName == "" && !isRecursive && len(entries) > 0 { if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop // only for first iteration in the loop
glog.Errorf("deleting a folder %s has children: %+v", entry.FullPath, entries)
return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
} }

20
weed/filesys/dir.go

@ -218,7 +218,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
glog.V(5).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath()) dirPath := util.FullPath(dir.FullPath())
@ -316,10 +316,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
dir.wfs.deleteFileChunks(entry.Chunks) dir.wfs.deleteFileChunks(entry.Chunks)
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
glog.V(3).Infof("remove file: %v", req) glog.V(3).Infof("remove file: %v", req)
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false)
if err != nil { if err != nil {
@ -327,27 +323,29 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
return fuse.ENOENT return fuse.ENOENT
} }
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
return nil return nil
} }
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
t := util.NewFullPath(dir.FullPath(), req.Name)
dir.wfs.fsNodeCache.DeleteFsNode(t)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
glog.V(3).Infof("remove directory entry: %v", req) glog.V(3).Infof("remove directory entry: %v", req)
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false)
if err != nil { if err != nil {
glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
if strings.Contains(err.Error(), "non-empty"){ if strings.Contains(err.Error(), "non-empty"){
return fuse.EEXIST return fuse.EEXIST
} }
return fuse.ENOENT return fuse.ENOENT
} }
t := util.NewFullPath(dir.FullPath(), req.Name)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
dir.wfs.fsNodeCache.DeleteFsNode(t)
return nil return nil
} }

Loading…
Cancel
Save