Browse Source

async file chunk deletion

pull/778/head
Chris Lu 6 years ago
parent
commit
b282e34dc2
  1. 55
      weed/filer2/filer.go
  2. 88
      weed/filer2/filer_deletion.go
  3. 12
      weed/operation/delete_content.go
  4. 9
      weed/wdclient/vid_map.go

55
weed/filer2/filer.go

@ -10,24 +10,27 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/karlseguin/ccache"
"github.com/chrislusf/seaweedfs/weed/storage"
)
type Filer struct {
store FilerStore
directoryCache *ccache.Cache
MasterClient *wdclient.MasterClient
fileIdDeletionChan chan string
}
func NewFiler(masters []string) *Filer {
return &Filer{
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
fileIdDeletionChan: make(chan string, 4096),
}
go f.loopProcessingDeletion()
return f
}
func (f *Filer) SetStore(store FilerStore) {
@ -229,47 +232,3 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
var fileIds []string
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
operation.DeleteFiles(f.GetMaster(), fileIds)
}
func (f *Filer) DeleteFileByFileId(fileId string) {
volumeServer, err := f.MasterClient.LookupVolumeServer(fileId)
if err != nil {
glog.V(0).Infof("can not find file %s: %v", fileId, err)
}
if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil && err != storage.NotFound {
glog.V(0).Infof("deleting file %s: %v", fileId, err)
}
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
return
}
if newEntry == nil {
f.DeleteChunks(oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
for _, oldChunk := range oldEntry.Chunks {
found := false
for _, newChunk := range newEntry.Chunks {
if oldChunk.FileId == newChunk.FileId {
found = true
break
}
}
if !found {
toDelete = append(toDelete, oldChunk)
}
}
f.DeleteChunks(toDelete)
}

88
weed/filer2/filer_deletion.go

@ -0,0 +1,88 @@
package filer2
import (
"time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func (f *Filer) loopProcessingDeletion() {
ticker := time.NewTicker(5 * time.Second)
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
locs := f.MasterClient.GetVidLocations(vid)
var locations []operation.Location
for _, loc := range locs {
locations = append(locations, operation.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
})
}
m[vid] = operation.LookupResult{
VolumeId: vid,
Locations: locations,
}
}
return m, nil
}
var fileIds []string
for {
select {
case fid := <-f.fileIdDeletionChan:
fileIds = append(fileIds, fid)
if len(fileIds) >= 4096 {
glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
fileIds = fileIds[:0]
}
case <-ticker.C:
if len(fileIds) > 0 {
glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
fileIds = fileIds[:0]
}
}
}
}
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
f.fileIdDeletionChan <- chunk.FileId
}
}
func (f *Filer) DeleteFileByFileId(fileId string) {
f.fileIdDeletionChan <- fileId
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
return
}
if newEntry == nil {
f.DeleteChunks(oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
for _, oldChunk := range oldEntry.Chunks {
found := false
for _, newChunk := range newEntry.Chunks {
if oldChunk.FileId == newChunk.FileId {
found = true
break
}
}
if !found {
toDelete = append(toDelete, oldChunk)
}
}
f.DeleteChunks(toDelete)
}

12
weed/operation/delete_content.go

@ -29,6 +29,16 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
// DeleteFiles batch deletes a list of fileIds
func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
return LookupVolumeIds(master, vids)
}
return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
}
func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
vid_to_fileIds := make(map[string][]string)
@ -50,7 +60,7 @@ func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteRes
vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId)
}
lookupResults, err := LookupVolumeIds(master, vids)
lookupResults, err := lookupFunc(vids)
if err != nil {
return ret, err
}

9
weed/wdclient/vid_map.go

@ -66,6 +66,15 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er
return serverUrl, nil
}
func (vc *vidMap) GetVidLocations(vid string) (locations []Location) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
return nil
}
return vc.GetLocations(uint32(id))
}
func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
vc.RLock()
defer vc.RUnlock()

Loading…
Cancel
Save