From bbb6ebc3c024e8f2992ba8ad4eb3426364e70566 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Dec 2019 00:23:05 -0800 Subject: [PATCH] filer: DeleteFolderChildren for deleting large folders --- weed/command/scaffold.go | 9 -- .../filer2/abstract_sql/abstract_sql_store.go | 30 +++- weed/filer2/cassandra/cassandra_store.go | 11 ++ weed/filer2/etcd/etcd_store.go | 10 ++ weed/filer2/filer.go | 62 -------- weed/filer2/filer_client_util.go | 49 +++--- weed/filer2/filer_delete_entry.go | 102 ++++++++++++ weed/filer2/filer_deletion.go | 7 +- weed/filer2/filerstore.go | 11 ++ weed/filer2/leveldb/leveldb_store.go | 35 +++- weed/filer2/leveldb2/leveldb2_store.go | 35 +++- weed/filer2/memdb/memdb_store.go | 132 ---------------- weed/filer2/memdb/memdb_store_test.go | 149 ------------------ weed/filer2/mysql/mysql_store.go | 1 + weed/filer2/postgres/postgres_store.go | 1 + weed/filer2/redis/universal_redis_store.go | 18 +++ weed/filer2/tikv/tikv_store.go | 32 ++++ weed/s3api/s3api_server.go | 6 - weed/server/filer_grpc_server.go | 6 +- weed/server/filer_server.go | 1 - weed/server/filer_server_handlers_write.go | 2 +- .../filer_server_handlers_write_autochunk.go | 2 +- 22 files changed, 310 insertions(+), 401 deletions(-) create mode 100644 weed/filer2/filer_delete_entry.go delete mode 100644 weed/filer2/memdb/memdb_store.go delete mode 100644 weed/filer2/memdb/memdb_store_test.go diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index ebb72dc44..6ad8effc2 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -59,15 +59,6 @@ const ( # $HOME/.seaweedfs/filer.toml # /etc/seaweedfs/filer.toml -[memory] -# local in memory, mostly for testing purpose -enabled = false - -[leveldb] -# local on disk, mostly for simple single-machine setup, fairly scalable -enabled = false -dir = "." # directory to store level db files - [leveldb2] # local on disk, mostly for simple single-machine setup, fairly scalable # faster than previous leveldb, recommended. diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 3e8554957..d512467c7 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -10,13 +10,14 @@ import ( ) type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlListExclusive string - SqlListInclusive string + DB *sql.DB + SqlInsert string + SqlUpdate string + SqlFind string + SqlDelete string + SqlDeleteFolderChildren string + SqlListExclusive string + SqlListInclusive string } type TxOrDB interface { @@ -132,6 +133,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. return nil } +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { + + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + } + + return nil +} + func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index 466be5bf3..dcaab8bc4 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -112,6 +112,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu return nil } +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=?", + fullpath).Exec(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go index 1b0f928d0..2eb9e3e86 100644 --- a/weed/filer2/etcd/etcd_store.go +++ b/weed/filer2/etcd/etcd_store.go @@ -123,6 +123,16 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { + return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + func (store *EtcdStore) ListDirectoryEntries( ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, ) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index c5d10196f..b724e20fd 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -3,7 +3,6 @@ package filer2 import ( "context" "fmt" - "math" "os" "path/filepath" "strings" @@ -207,67 +206,6 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er return f.store.FindEntry(ctx, p) } -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { - entry, err := f.FindEntry(ctx, p) - if err != nil { - return err - } - - if entry.IsDirectory() { - limit := int(1) - if isRecursive { - limit = math.MaxInt32 - } - lastFileName := "" - includeLastFile := false - for limit > 0 { - entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, PaginationSize) - if err != nil { - glog.Errorf("list folder %s: %v", p, err) - return fmt.Errorf("list folder %s: %v", p, err) - } - - if len(entries) == 0 { - break - } - - if isRecursive { - for _, sub := range entries { - lastFileName = sub.Name() - err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks) - if err != nil && !ignoreRecursiveError { - return err - } - limit-- - if limit <= 0 { - break - } - } - } - - if len(entries) < PaginationSize { - break - } - } - - f.cacheDelDirectory(string(p)) - - } - - if shouldDeleteChunks { - f.DeleteChunks(p, entry.Chunks) - } - - if p == "/" { - return nil - } - glog.V(3).Infof("deleting entry %v", p) - - f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) - - return f.store.DeleteEntry(ctx, p) -} - func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go index 6d656fa11..1a10f7c20 100644 --- a/weed/filer2/filer_client_util.go +++ b/weed/filer2/filer_client_util.go @@ -3,6 +3,8 @@ package filer2 import ( "context" "fmt" + "io" + "math" "strings" "sync" @@ -124,35 +126,42 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) return } -func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { +func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { lastEntryName := "" - for { - - request := &filer_pb.ListEntriesRequest{ - Directory: fullDirPath, - StartFromFileName: lastEntryName, - Limit: PaginationSize, - } + request := &filer_pb.ListEntriesRequest{ + Directory: fullDirPath, + Prefix: prefix, + StartFromFileName: lastEntryName, + Limit: math.MaxUint32, + } - glog.V(3).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } + glog.V(3).Infof("read directory: %v", request) + stream, err := client.ListEntries(ctx, request) + if err != nil { + return fmt.Errorf("list %s: %v", fullDirPath, err) + } - for _, entry := range resp.Entries { - fn(entry) - lastEntryName = entry.Name + var prevEntry *filer_pb.Entry + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + if prevEntry != nil { + fn(prevEntry, true) + } + break + } else { + return recvErr + } } - - if len(resp.Entries) < PaginationSize { - break + if prevEntry != nil { + fn(prevEntry, false) } - + prevEntry = resp.Entry } return nil diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go new file mode 100644 index 000000000..75a09e7ef --- /dev/null +++ b/weed/filer2/filer_delete_entry.go @@ -0,0 +1,102 @@ +package filer2 + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { + if p == "/" { + return nil + } + + entry, findErr := f.FindEntry(ctx, p) + if findErr != nil { + return findErr + } + + var chunks []*filer_pb.FileChunk + chunks = append(chunks, entry.Chunks...) + if entry.IsDirectory() { + // delete the folder children, not including the folder itself + var dirChunks []*filer_pb.FileChunk + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks) + if err != nil { + return fmt.Errorf("delete directory %s: %v", p, err) + } + chunks = append(chunks, dirChunks...) + f.cacheDelDirectory(string(p)) + } + // delete the file or folder + err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks) + if err != nil { + return fmt.Errorf("delete file %s: %v", p, err) + } + + if shouldDeleteChunks { + go f.DeleteChunks(chunks) + } + + return nil +} + +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) { + + lastFileName := "" + includeLastFile := false + for { + entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize) + if err != nil { + glog.Errorf("list folder %s: %v", entry.FullPath, err) + return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + } + if lastFileName == "" && !isRecursive && len(entries) > 0 { + // only for first iteration in the loop + return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + } + + for _, sub := range entries { + lastFileName = sub.Name() + var dirChunks []*filer_pb.FileChunk + if sub.IsDirectory() { + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks) + } + if err != nil && !ignoreRecursiveError { + return nil, err + } + if shouldDeleteChunks { + chunks = append(chunks, dirChunks...) + } + } + + if len(entries) < PaginationSize { + break + } + } + + f.cacheDelDirectory(string(entry.FullPath)) + + glog.V(3).Infof("deleting directory %v", entry.FullPath) + + if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) + + return chunks, nil +} + +func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) { + + glog.V(3).Infof("deleting entry %v", entry.FullPath) + + if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("filer store delete: %v", storeDeletionErr) + } + f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) + + return nil +} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 25e27e504..9937685f7 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -51,9 +51,8 @@ func (f *Filer) loopProcessingDeletion() { } } -func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) { +func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String()) f.fileIdDeletionChan <- chunk.GetFileIdString() } } @@ -70,7 +69,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { return } if newEntry == nil { - f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks) + f.DeleteChunks(oldEntry.Chunks) } var toDelete []*filer_pb.FileChunk @@ -84,5 +83,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(oldEntry.FullPath, toDelete) + f.DeleteChunks(toDelete) } diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 8caa44ee2..0bb0bd611 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -20,6 +20,7 @@ type FilerStore interface { // err == filer2.ErrNotFound if not found FindEntry(context.Context, FullPath) (entry *Entry, err error) DeleteEntry(context.Context, FullPath) (err error) + DeleteFolderChildren(context.Context, FullPath) (err error) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) BeginTransaction(ctx context.Context) (context.Context, error) @@ -97,6 +98,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err return fsw.actualStore.DeleteEntry(ctx, fp) } +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) + }() + + return fsw.actualStore.DeleteFolderChildren(ctx, fp) +} + func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() start := time.Now() diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index d00eba859..4952b3b3a 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -5,12 +5,13 @@ import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + weed_util "github.com/chrislusf/seaweedfs/weed/util" ) const ( @@ -123,6 +124,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full return nil } +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + batch := new(leveldb.Batch) + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete([]byte(genKey(string(fullpath), fileName))) + } + iter.Release() + + err = store.db.Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go index 4b47d2eb3..8a16822ab 100644 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -8,12 +8,13 @@ import ( "io" "os" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + weed_util "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -134,6 +135,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful return nil } +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) + + batch := new(leveldb.Batch) + + iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete(append(directoryPrefix, []byte(fileName)...)) + } + iter.Release() + + err = store.dbs[partitionId].Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go deleted file mode 100644 index 9c10a5472..000000000 --- a/weed/filer2/memdb/memdb_store.go +++ /dev/null @@ -1,132 +0,0 @@ -package memdb - -import ( - "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/google/btree" - "strings" - "sync" -) - -func init() { - filer2.Stores = append(filer2.Stores, &MemDbStore{}) -} - -type MemDbStore struct { - tree *btree.BTree - treeLock sync.Mutex -} - -type entryItem struct { - *filer2.Entry -} - -func (a entryItem) Less(b btree.Item) bool { - return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0 -} - -func (store *MemDbStore) GetName() string { - return "memory" -} - -func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) { - store.tree = btree.New(8) - return nil -} - -func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) { - return ctx, nil -} -func (store *MemDbStore) CommitTransaction(ctx context.Context) error { - return nil -} -func (store *MemDbStore) RollbackTransaction(ctx context.Context) error { - return nil -} - -func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { - // println("inserting", entry.FullPath) - store.treeLock.Lock() - store.tree.ReplaceOrInsert(entryItem{entry}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - if _, err = store.FindEntry(ctx, entry.FullPath); err != nil { - return fmt.Errorf("no such file %s : %v", entry.FullPath, err) - } - store.treeLock.Lock() - store.tree.ReplaceOrInsert(entryItem{entry}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { - item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}}) - if item == nil { - return nil, filer2.ErrNotFound - } - entry = item.(entryItem).Entry - return entry, nil -} - -func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { - store.treeLock.Lock() - store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}}) - store.treeLock.Unlock() - return nil -} - -func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { - - startFrom := string(fullpath) - if startFileName != "" { - startFrom = startFrom + "/" + startFileName - } - - store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}}, - func(item btree.Item) bool { - if limit <= 0 { - return false - } - entry := item.(entryItem).Entry - // println("checking", entry.FullPath) - - if entry.FullPath == fullpath { - // skipping the current directory - // println("skipping the folder", entry.FullPath) - return true - } - - dir, name := entry.FullPath.DirAndName() - if name == startFileName { - if inclusive { - limit-- - entries = append(entries, entry) - } - return true - } - - // only iterate the same prefix - if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) { - // println("breaking from", entry.FullPath) - return false - } - - if dir != string(fullpath) { - // this could be items in deeper directories - // println("skipping deeper folder", entry.FullPath) - return true - } - // now process the directory items - // println("adding entry", entry.FullPath) - limit-- - entries = append(entries, entry) - return true - }, - ) - return entries, nil -} diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go deleted file mode 100644 index 3fd806aeb..000000000 --- a/weed/filer2/memdb/memdb_store_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package memdb - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer2" - "testing" -) - -func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - store := &MemDbStore{} - store.Initialize(nil) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") - - entry1 := &filer2.Entry{ - FullPath: fullpath, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - if err := filer.CreateEntry(ctx, entry1); err != nil { - t.Errorf("create entry %v: %v", entry1.FullPath, err) - return - } - - entry, err := filer.FindEntry(ctx, fullpath) - - if err != nil { - t.Errorf("find entry: %v", err) - return - } - - if entry.FullPath != entry1.FullPath { - t.Errorf("find wrong entry: %v", entry.FullPath) - return - } - -} - -func TestCreateFileAndList(t *testing.T) { - filer := filer2.NewFiler(nil, nil) - store := &MemDbStore{} - store.Initialize(nil) - filer.SetStore(store) - filer.DisableDirectoryCache() - - ctx := context.Background() - - entry1 := &filer2.Entry{ - FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"), - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - entry2 := &filer2.Entry{ - FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"), - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - - filer.CreateEntry(ctx, entry1) - filer.CreateEntry(ctx, entry2) - - // checking the 2 files - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100) - - if err != nil { - t.Errorf("list entries: %v", err) - return - } - - if len(entries) != 2 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - if entries[0].FullPath != entry1.FullPath { - t.Errorf("find wrong entry 1: %v", entries[0].FullPath) - return - } - - if entries[1].FullPath != entry2.FullPath { - t.Errorf("find wrong entry 2: %v", entries[1].FullPath) - return - } - - // checking the offset - entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // checking root directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // add file3 - file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg") - entry3 := &filer2.Entry{ - FullPath: file3Path, - Attr: filer2.Attr{ - Mode: 0440, - Uid: 1234, - Gid: 5678, - }, - } - filer.CreateEntry(ctx, entry3) - - // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 2 { - t.Errorf("list entries count: %v", len(entries)) - return - } - - // delete file and count - filer.DeleteEntryMetaAndData(ctx, file3Path, false, false, false) - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) - if len(entries) != 1 { - t.Errorf("list entries count: %v", len(entries)) - return - } - -} diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go index 3eca80ff7..d1b06ece5 100644 --- a/weed/filer2/mysql/mysql_store.go +++ b/weed/filer2/mysql/mysql_store.go @@ -46,6 +46,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go index ffd3d1e01..3ec000fe0 100644 --- a/weed/filer2/postgres/postgres_store.go +++ b/weed/filer2/postgres/postgres_store.go @@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4" store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" + store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4" diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index ce41d4d70..62257e91e 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -99,6 +99,24 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file return nil } +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete folder %s : %v", fullpath, err) + } + + for _, fileName := range members { + path := filer2.NewFullPath(string(fullpath), fileName) + _, err = store.Client.Del(string(path)).Result() + if err != nil { + return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + } + } + + return nil +} + func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go index 0e6b93c86..4eb8cb90d 100644 --- a/weed/filer2/tikv/tikv_store.go +++ b/weed/filer2/tikv/tikv_store.go @@ -141,6 +141,38 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { + + directoryPrefix := genDirectoryKeyPrefix(fullpath, "") + + tx := store.getTx(ctx) + + iter, err := tx.Iter(directoryPrefix, nil) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err) + } + defer iter.Close() + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + iter.Next() + continue + } + + if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + iter.Next() + } + + return nil +} + func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 24458592d..edf634444 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -1,12 +1,6 @@ package s3api import ( - _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" - _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" "github.com/gorilla/mux" "google.golang.org/grpc" "net/http" diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index acfd2260d..ad51d49cf 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -140,7 +140,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr }) if err == nil { - fs.filer.DeleteChunks(fullpath, garbages) + fs.filer.DeleteChunks(garbages) } return &filer_pb.CreateEntryResponse{}, err @@ -189,8 +189,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(entry.FullPath, unusedChunks) - fs.filer.DeleteChunks(entry.FullPath, garbages) + fs.filer.DeleteChunks(unusedChunks) + fs.filer.DeleteChunks(garbages) } fs.filer.NotifyUpdateEvent(entry, newEntry, true) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2cf26b1bb..41ba81366 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -18,7 +18,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index c919efbc9..fb6855a99 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -194,7 +194,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // glog.V(4).Infof("saving %s => %+v", path, entry) if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + fs.filer.DeleteChunks(entry.Chunks) glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, dbErr) err = dbErr diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 492b55943..7c872c2b4 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -177,7 +177,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r Chunks: fileChunks, } if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)