From 6f30a78a6cbb3eb9bae6e97379345776dab011ca Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 21 Jul 2018 17:40:00 -0700 Subject: [PATCH] update also delete old chunks if different from latest chunks --- weed/filer2/filer.go | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 2ecf3976e..2deb8ffd5 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -4,12 +4,13 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/karlseguin/ccache" "os" "path/filepath" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/operation" ) type Filer struct { @@ -112,7 +113,7 @@ func (f *Filer) CreateEntry(entry *Entry) error { return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) } - f.deleteChunks(oldEntry) + f.deleteChunksIfNotNew(oldEntry, entry) return nil } @@ -149,7 +150,7 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet } if shouldDeleteChunks { - f.deleteChunks(entry) + f.deleteChunks(entry.Chunks) } return f.store.DeleteEntry(p) @@ -157,7 +158,7 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { - p = p[0: len(p)-1] + p = p[0 : len(p)-1] } return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit) } @@ -195,14 +196,36 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) } -func (f *Filer) deleteChunks(entry *Entry) { +func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil { + glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err) + } + } +} + +func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { - if entry == nil { + if oldEntry == nil { return } - for _, chunk := range entry.Chunks { - if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil { - glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err) + if newEntry == nil { + f.deleteChunks(oldEntry.Chunks) + } + + var toDelete []*filer_pb.FileChunk + + for _, oldChunk := range oldEntry.Chunks { + found := false + for _, newChunk := range newEntry.Chunks { + if oldChunk.FileId == newChunk.FileId { + found = true + break + } + } + if !found { + toDelete = append(toDelete, oldChunk) } } + f.deleteChunks(toDelete) }