From 6aa1a56ec87ca63f7a072eb6c9599e06fbb5ca9a Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Tue, 15 Jun 2021 18:12:39 +0500 Subject: [PATCH] avoid crashes Galera Cluster https://github.com/chrislusf/seaweedfs/issues/2125 --- weed/filer/abstract_sql/abstract_sql_store.go | 28 +++++++++++-------- weed/filer/cassandra/cassandra_store.go | 2 +- weed/filer/elastic/v7/elastic_store.go | 2 +- weed/filer/etcd/etcd_store.go | 2 +- weed/filer/filer.go | 1 + weed/filer/filer_delete_entry.go | 2 +- weed/filer/filerstore.go | 2 +- weed/filer/filerstore_translate_path.go | 4 +-- weed/filer/filerstore_wrapper.go | 4 +-- weed/filer/hbase/hbase_store.go | 2 +- weed/filer/leveldb/leveldb_store.go | 2 +- weed/filer/leveldb2/leveldb2_store.go | 2 +- weed/filer/leveldb3/leveldb3_store.go | 2 +- weed/filer/mongodb/mongodb_store.go | 2 +- weed/filer/mysql/mysql_sql_gen.go | 2 +- weed/filer/redis/universal_redis_store.go | 2 +- weed/filer/redis2/universal_redis_store.go | 2 +- weed/filer/rocksdb/rocksdb_store.go | 2 +- 18 files changed, 36 insertions(+), 29 deletions(-) diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index bb8ced81a..26dff7fe7 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "strings" "sync" + "time" ) type SqlGenerator interface { @@ -261,7 +262,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu return nil } -func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) if err != nil { @@ -279,18 +280,23 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat } } - glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) - - res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) - if err != nil { - return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) - } + for { + glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) + res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath), limit) + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + } - _, err = res.RowsAffected() - if err != nil { - return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + rowCount, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + } + if rowCount < limit { + break + } + // to give the Galera Cluster a chance to breath + time.Sleep(time.Second) } - return nil } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fd2ce91a6..0398f5117 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -154,7 +154,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full return nil } -func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { return nil // filer.ErrUnsupportedSuperLargeDirectoryListing } diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index a16e5ebca..986c55b38 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -186,7 +186,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e return fmt.Errorf("delete entry %v.", err) } -func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool { if err := store.DeleteEntry(ctx, entry.FullPath); err != nil { glog.Errorf("elastic delete %s: %v.", entry.FullPath, err) diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 71ed738f9..96322081a 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -130,7 +130,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.Full return nil } -func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index effdc0e4e..1bcf57fe7 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -19,6 +19,7 @@ import ( const ( LogFlushInterval = time.Minute PaginationSize = 1024 + DeleteMaxRows = 10000 FilerStoreId = "filer.store.id" ) diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 35187d034..be21801dc 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -115,7 +115,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) - if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath, DeleteMaxRows); storeDeletionErr != nil { return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 38927d6fb..63e2e7817 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -25,7 +25,7 @@ type FilerStore interface { // err == filer_pb.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) - DeleteFolderChildren(context.Context, util.FullPath) (err error) + DeleteFolderChildren(context.Context, util.FullPath, int64) (err error) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go index 00bf82ed4..cb9fabfc0 100644 --- a/weed/filer/filerstore_translate_path.go +++ b/weed/filer/filerstore_translate_path.go @@ -100,10 +100,10 @@ func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEn return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath) } -func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { +func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath, limit int64) (err error) { newFullPath := t.translatePath(fp) - return t.actualStore.DeleteFolderChildren(ctx, newFullPath) + return t.actualStore.DeleteFolderChildren(ctx, newFullPath, limit) } func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 2470f340c..997d70a80 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -213,7 +213,7 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry return actualStore.DeleteEntry(ctx, existingEntry.FullPath) } -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath, limit int64) (err error) { actualStore := fsw.getActualStore(fp + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() start := time.Now() @@ -222,7 +222,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util. }() glog.V(4).Infof("DeleteFolderChildren %s", fp) - return actualStore.DeleteFolderChildren(ctx, fp) + return actualStore.DeleteFolderChildren(ctx, fp, limit) } func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index e0d878ca7..43c14cc15 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -109,7 +109,7 @@ func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (e return store.doDelete(ctx, store.cfMetaDir, []byte(path)) } -func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { +func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath, limit int64) (err error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} expectedPrefix := []byte(path.Child("")) diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ce454f36a..50367c87b 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -136,7 +136,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F return nil } -func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { batch := new(leveldb.Batch) diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 124d61c1c..59e831598 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -144,7 +144,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util. return nil } -func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) batch := new(leveldb.Batch) diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index d1cdfbbf6..3db7722b7 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -245,7 +245,7 @@ func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util. return nil } -func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { db, bucket, shortPath, err := store.findDB(fullpath, true) if err != nil { diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 1ef5056f4..5861d86b0 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -167,7 +167,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa return nil } -func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { +func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error { where := bson.M{"directory": fullpath} _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where) diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go index 93d3e3f9e..477baf66b 100644 --- a/weed/filer/mysql/mysql_sql_gen.go +++ b/weed/filer/mysql/mysql_sql_gen.go @@ -38,7 +38,7 @@ func (gen *SqlGenMysql) GetSqlDelete(tableName string) string { } func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(tableName string) string { - return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", tableName) + return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=? LIMIT ?", tableName) } func (gen *SqlGenMysql) GetSqlListExclusive(tableName string) string { diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 30d11a7f4..fb49740cd 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -107,7 +107,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util return nil } -func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) (err error) { members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result() if err != nil { diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index aab3d1f4a..6bb56f5f8 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -127,7 +127,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti return nil } -func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { +func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) (err error) { if store.isSuperLargeDirectory(string(fullpath)) { return nil diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 379a18c62..face5963e 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -148,7 +148,7 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F return nil } -func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { +func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath, limit int64) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") batch := gorocksdb.NewWriteBatch()