From 89eb05b50f10b6ca74a374e5435df2f72019f635 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2020 01:02:01 -0700 Subject: [PATCH] filer: support TTL for all filer stores --- weed/command/filer_copy.go | 17 ++++++++--- weed/filer2/filer.go | 28 +++++++++++++++++-- weed/filer2/filer_buckets.go | 2 +- weed/filer2/filer_delete_entry.go | 2 +- weed/filer2/leveldb/leveldb_store_test.go | 6 ++-- weed/filer2/leveldb2/leveldb2_store_test.go | 6 ++-- weed/server/filer_grpc_server.go | 4 +-- weed/server/filer_grpc_server_rename.go | 2 +- weed/server/filer_server_handlers_read_dir.go | 4 +-- weed/storage/volume.go | 4 +-- 10 files changed, 53 insertions(+), 22 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 1162bb204..0aee8cd80 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -20,6 +20,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -41,6 +42,7 @@ type CopyOptions struct { grpcDialOption grpc.DialOption masters []string cipher bool + ttlSec int32 } func init() { @@ -124,6 +126,13 @@ func runCopy(cmd *Command, args []string) bool { copy.masters = masters copy.cipher = cipher + ttl, err := needle.ReadTTL(*copy.ttl) + if err != nil { + fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err) + return false + } + copy.ttlSec = int32(ttl.Minutes()) * 60 + if *cmdCopy.IsDebug { util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } @@ -286,7 +295,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err Count: 1, Replication: *worker.options.replication, Collection: *worker.options.collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + TtlSec: worker.options.ttlSec, ParentPath: task.destinationUrlPath, } @@ -342,7 +351,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err Mime: mimeType, Replication: *worker.options.replication, Collection: *worker.options.collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, @@ -388,7 +397,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, Count: 1, Replication: *worker.options.replication, Collection: *worker.options.collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + TtlSec: worker.options.ttlSec, ParentPath: task.destinationUrlPath, } @@ -469,7 +478,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, Mime: mimeType, Replication: replication, Collection: collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 0b6a5c96e..c3048b45d 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -223,14 +223,36 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er }, }, nil } - return f.store.FindEntry(ctx, p) + entry, err = f.store.FindEntry(ctx, p) + if entry != nil && entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + return nil, filer_pb.ErrNotFound + } + } + return + } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, err error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) + if listErr != nil { + return listedEntries, expiredCount, err + } + for _, entry := range listedEntries { + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.store.DeleteEntry(ctx, p.Child(entry.Name())) + expiredCount++ + continue + } + } + entries = append(entries, entry) + } + return } func (f *Filer) cacheDelDirectory(dirpath string) { diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go index 601b7dbf3..cb65fea14 100644 --- a/weed/filer2/filer_buckets.go +++ b/weed/filer2/filer_buckets.go @@ -28,7 +28,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) { limit := math.MaxInt32 - entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit) + entries, _, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit) if err != nil { glog.V(1).Infof("no buckets found: %v", err) diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go index d0792ac66..b7ec805c5 100644 --- a/weed/filer2/filer_delete_entry.go +++ b/weed/filer2/filer_delete_entry.go @@ -57,7 +57,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry lastFileName := "" includeLastFile := false for { - entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize) + entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize) if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 497158420..dcb99a3bd 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index dc94f2ac7..c1f2d6a0c 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b904c1393..488967ec2 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -53,7 +53,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + entries, expiredCount, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) if err != nil { return err @@ -92,7 +92,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file } } - if len(entries) < paginationLimit { + if len(entries)+expiredCount < paginationLimit { break } diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 0669a26f1..3b2655585 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -68,7 +68,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer includeLastFile := false for { - entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024) + entries, _, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024) if err != nil { return err } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 87e864559..13f60eefe 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit) + entries, expiredCount, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) @@ -40,7 +40,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque return } - shouldDisplayLoadMore := len(entries) == limit + shouldDisplayLoadMore := len(entries)+expiredCount == limit if path == "/" { path = "" } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 88a5db4c5..7da83de7a 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -180,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) + glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60 - glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true }