Chris Lu
6 years ago
4 changed files with 170 additions and 10 deletions
-
60weed/filesys/dir.go
-
61weed/filesys/filehandle.go
-
13weed/filesys/wfs.go
-
46weed/filesys/wfs_deletion.go
@ -0,0 +1,46 @@ |
|||||
|
package filesys |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
) |
||||
|
|
||||
|
func (wfs *WFS) loopProcessingDeletion() { |
||||
|
|
||||
|
ticker := time.NewTicker(2 * time.Second) |
||||
|
|
||||
|
wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
||||
|
var fileIds []string |
||||
|
for { |
||||
|
select { |
||||
|
case fids := <-wfs.fileIdsDeletionChan: |
||||
|
fileIds = append(fileIds, fids...) |
||||
|
if len(fileIds) >= 256 { |
||||
|
glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) |
||||
|
deleteFileIds(context.Background(), client, fileIds) |
||||
|
fileIds = fileIds[:0] |
||||
|
} |
||||
|
case <-ticker.C: |
||||
|
if len(fileIds) > 0 { |
||||
|
glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) |
||||
|
deleteFileIds(context.Background(), client, fileIds) |
||||
|
fileIds = fileIds[:0] |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (wfs *WFS) asyncDeleteFileChunks(chunks []*filer_pb.FileChunk) { |
||||
|
if len(chunks) > 0 { |
||||
|
var fileIds []string |
||||
|
for _, chunk := range chunks { |
||||
|
fileIds = append(fileIds, chunk.FileId) |
||||
|
} |
||||
|
wfs.fileIdsDeletionChan <- fileIds |
||||
|
} |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue