From b282e34dc2933c71929b1d9297c44b1598344a1f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 20 Nov 2018 20:56:28 -0800 Subject: [PATCH] async file chunk deletion --- weed/filer2/filer.go | 65 +++++------------------ weed/filer2/filer_deletion.go | 88 ++++++++++++++++++++++++++++++++ weed/operation/delete_content.go | 12 ++++- weed/wdclient/vid_map.go | 9 ++++ 4 files changed, 120 insertions(+), 54 deletions(-) create mode 100644 weed/filer2/filer_deletion.go diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 0c0a3e1fa..659054d86 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -10,24 +10,27 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/karlseguin/ccache" - "github.com/chrislusf/seaweedfs/weed/storage" ) type Filer struct { - store FilerStore - directoryCache *ccache.Cache - MasterClient *wdclient.MasterClient + store FilerStore + directoryCache *ccache.Cache + MasterClient *wdclient.MasterClient + fileIdDeletionChan chan string } func NewFiler(masters []string) *Filer { - return &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters), + f := &Filer{ + directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters), + fileIdDeletionChan: make(chan string, 4096), } + + go f.loopProcessingDeletion() + + return f } func (f *Filer) SetStore(store FilerStore) { @@ -229,47 +232,3 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) } - -func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { - var fileIds []string - for _, chunk := range chunks { - fileIds = append(fileIds, chunk.FileId) - } - operation.DeleteFiles(f.GetMaster(), fileIds) -} - -func (f *Filer) DeleteFileByFileId(fileId string) { - volumeServer, err := f.MasterClient.LookupVolumeServer(fileId) - if err != nil { - glog.V(0).Infof("can not find file %s: %v", fileId, err) - } - if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil && err != storage.NotFound { - glog.V(0).Infof("deleting file %s: %v", fileId, err) - } -} - -func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { - - if oldEntry == nil { - return - } - if newEntry == nil { - f.DeleteChunks(oldEntry.Chunks) - } - - var toDelete []*filer_pb.FileChunk - - for _, oldChunk := range oldEntry.Chunks { - found := false - for _, newChunk := range newEntry.Chunks { - if oldChunk.FileId == newChunk.FileId { - found = true - break - } - } - if !found { - toDelete = append(toDelete, oldChunk) - } - } - f.DeleteChunks(toDelete) -} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go new file mode 100644 index 000000000..4561ecf54 --- /dev/null +++ b/weed/filer2/filer_deletion.go @@ -0,0 +1,88 @@ +package filer2 + +import ( + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (f *Filer) loopProcessingDeletion() { + + ticker := time.NewTicker(5 * time.Second) + + lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + m := make(map[string]operation.LookupResult) + for _, vid := range vids { + locs := f.MasterClient.GetVidLocations(vid) + var locations []operation.Location + for _, loc := range locs { + locations = append(locations, operation.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = operation.LookupResult{ + VolumeId: vid, + Locations: locations, + } + } + return m, nil + } + + var fileIds []string + for { + select { + case fid := <-f.fileIdDeletionChan: + fileIds = append(fileIds, fid) + if len(fileIds) >= 4096 { + glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) + operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + fileIds = fileIds[:0] + } + case <-ticker.C: + if len(fileIds) > 0 { + glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) + operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + fileIds = fileIds[:0] + } + } + } +} + +func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + f.fileIdDeletionChan <- chunk.FileId + } +} + +func (f *Filer) DeleteFileByFileId(fileId string) { + f.fileIdDeletionChan <- fileId +} + +func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { + + if oldEntry == nil { + return + } + if newEntry == nil { + f.DeleteChunks(oldEntry.Chunks) + } + + var toDelete []*filer_pb.FileChunk + + for _, oldChunk := range oldEntry.Chunks { + found := false + for _, newChunk := range newEntry.Chunks { + if oldChunk.FileId == newChunk.FileId { + found = true + break + } + } + if !found { + toDelete = append(toDelete, oldChunk) + } + } + f.DeleteChunks(toDelete) +} diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 806bfbe7b..fcb4f718a 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -29,6 +29,16 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { // DeleteFiles batch deletes a list of fileIds func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { + lookupFunc := func(vids []string) (map[string]LookupResult, error) { + return LookupVolumeIds(master, vids) + } + + return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + +} + +func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { + var ret []*volume_server_pb.DeleteResult vid_to_fileIds := make(map[string][]string) @@ -50,7 +60,7 @@ func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteRes vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId) } - lookupResults, err := LookupVolumeIds(master, vids) + lookupResults, err := lookupFunc(vids) if err != nil { return ret, err } diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 02c3efd17..aef29f56f 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -66,6 +66,15 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er return serverUrl, nil } +func (vc *vidMap) GetVidLocations(vid string) (locations []Location) { + id, err := strconv.Atoi(vid) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return nil + } + return vc.GetLocations(uint32(id)) +} + func (vc *vidMap) GetLocations(vid uint32) (locations []Location) { vc.RLock() defer vc.RUnlock()