From 9d5c1a3cbf6f561280a3021685fe75376a27bb1b Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Mon, 7 Nov 2022 22:32:37 -0700 Subject: [PATCH] postgres3 filer --- docker/compose/local-s3tests-compose.yml | 10 + docker/filer.toml | 4 +- docker/filer_leveldb3.toml | 3 + docker/filer_postgres2.toml | 21 + docker/filer_postgres3.toml | 12 + weed/filer/postgres3/README.md | 33 ++ weed/filer/postgres3/postgres3_kvstore.go | 83 +++ weed/filer/postgres3/postgres3_store.go | 512 +++++++++++++++++++ weed/filer/postgres3/postgres3_store_test.go | 41 ++ weed/server/filer_server.go | 1 + weed/server/filer_server_handlers_tagging.go | 2 +- 11 files changed, 719 insertions(+), 3 deletions(-) create mode 100644 docker/filer_leveldb3.toml create mode 100644 docker/filer_postgres2.toml create mode 100644 docker/filer_postgres3.toml create mode 100644 weed/filer/postgres3/README.md create mode 100644 weed/filer/postgres3/postgres3_kvstore.go create mode 100644 weed/filer/postgres3/postgres3_store.go create mode 100644 weed/filer/postgres3/postgres3_store_test.go diff --git a/docker/compose/local-s3tests-compose.yml b/docker/compose/local-s3tests-compose.yml index f1961700c..b77c3685d 100644 --- a/docker/compose/local-s3tests-compose.yml +++ b/docker/compose/local-s3tests-compose.yml @@ -1,6 +1,16 @@ version: '3.9' services: + postgres: + image: postgres:13 + environment: + - POSTGRES_DB=seaweedfs + - POSTGRES_USER=seaweedfs + - POSTGRES_PASSWORD=seaweedfs + ports: + - "5432:5432" + #command: ["postgres", "-c", "log_statement=all"] + command: ["postgres"] master: image: chrislusf/seaweedfs:local ports: diff --git a/docker/filer.toml b/docker/filer.toml index a11e5de2b..216ea8d2f 100644 --- a/docker/filer.toml +++ b/docker/filer.toml @@ -1,3 +1,3 @@ -[leveldb2] +[leveldb3] enabled = true -dir = "/data/filerldb2" +dir = "/data/filer_leveldb3" diff --git a/docker/filer_leveldb3.toml b/docker/filer_leveldb3.toml new file mode 100644 index 000000000..216ea8d2f --- /dev/null +++ b/docker/filer_leveldb3.toml @@ -0,0 +1,3 @@ +[leveldb3] +enabled = true +dir = "/data/filer_leveldb3" diff --git a/docker/filer_postgres2.toml b/docker/filer_postgres2.toml new file mode 100644 index 000000000..a1ed240c8 --- /dev/null +++ b/docker/filer_postgres2.toml @@ -0,0 +1,21 @@ +[postgres2] +enabled = true +port = 5432 +createTable = """ + CREATE TABLE IF NOT EXISTS "%s" ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) + ); +""" +hostname = "postgres" +username = "seaweedfs" +database = "seaweedfs" # create or use an existing database +password = "seaweedfs" +schema = "" +sslmode = "disable" +connection_max_idle = 5 +connection_max_open = 10 +connection_max_lifetime_seconds = 0 diff --git a/docker/filer_postgres3.toml b/docker/filer_postgres3.toml new file mode 100644 index 000000000..fe3e95ac8 --- /dev/null +++ b/docker/filer_postgres3.toml @@ -0,0 +1,12 @@ +[postgres3] +enabled = true +port = 5432 +hostname = "postgres" +username = "seaweedfs" +database = "seaweedfs" # create or use an existing database +password = "seaweedfs" +schema = "" +sslmode = "disable" +connection_max_idle = 5 +connection_max_open = 10 +connection_max_lifetime_seconds = 0 diff --git a/weed/filer/postgres3/README.md b/weed/filer/postgres3/README.md new file mode 100644 index 000000000..67c88e6b8 --- /dev/null +++ b/weed/filer/postgres3/README.md @@ -0,0 +1,33 @@ +# postgres3 + +The `postgres3` filer implementation uses postgres-specific features and data structures to improve upon previous SQL implementations based on the `abstract_sql` module. + +Of note, the `postgres2` filer implementation may leak directory hierarchy metadata when frequent inserts and deletes are +performed using the S3 API. If an application workload pattern creates directories, populates them temporarily, and then +deletes all objects in the directory, the `postgres2` filer implementation will continue to indefinitely maintain +information about the orphaned directories. While these directories will not be shown in S3 API list requests, the metadata +remains and places the burden of an unbounded number of unused rows on postgres. + +Seaweedfs provides the `-s3.allowEmptyFolder=false` CLI argument to automatically clean up orphaned directory entries, but +this process necessarily races under high load and can cause unpredictable filer and postgres behavior. + +To solve this problem, `postgres3` does the following: + +1. One row in postgres _fully_ represents one object and its metadata +2. Insert, update, get, and delete operate on a single row +3. An array is stored of possible prefixes for each key +4. List requests leverage the prefixes to dynamically assemble directory entries using a complex `SELECT` statement + +In order to efficiently query directory entries during list requests, `postgres3` uses special features of +postgres: + +* An int64 array field called `prefixes` with a hash of each prefix found for a specific key +* GIN indexing that provides fast set membership information on array fields +* Special functions `split_part` (text parsing) and `cardinality` (length of the array field) + +`postgres3` uses automatic upsert capability with `ON CONFLICT ... UPDATE` so that insert and update are the same +race-free operation. + +In the filer metadata tables, all objects start with `/`, causing prefix calculation to include the empty string (`""`). +For space and index optimization, `postgres3` does not store the root prefix in the `prefixes` array, and instead +relies on the condition `cardinality(prefixes) < 1` to discover objects at the root directory. \ No newline at end of file diff --git a/weed/filer/postgres3/postgres3_kvstore.go b/weed/filer/postgres3/postgres3_kvstore.go new file mode 100644 index 000000000..7709873b6 --- /dev/null +++ b/weed/filer/postgres3/postgres3_kvstore.go @@ -0,0 +1,83 @@ +package postgres3 + +/* + * Copyright 2022 Splunk Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "database/sql" + "fmt" + "path" + + "github.com/lib/pq" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" +) + +func (store *Postgres3Store) KvPut(ctx context.Context, key []byte, value []byte) error { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return fmt.Errorf("findDB: %v", err) + } + + prefixes := calculatePrefixes(string(key)) + hashedPrefixes := hashPrefixArray(prefixes) + _, err = db.ExecContext(ctx, fmt.Sprintf(insertEntryPattern, abstract_sql.DEFAULT_TABLE), key, path.Base(string(key)), pq.Array(hashedPrefixes), value) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + return nil +} + +func (store *Postgres3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return nil, fmt.Errorf("findDB: %v", err) + } + + row := db.QueryRowContext(ctx, fmt.Sprintf(findEntryPattern, abstract_sql.DEFAULT_TABLE), key) + + err = row.Scan(&value) + + if err == sql.ErrNoRows { + return nil, filer.ErrKvNotFound + } + + if err != nil { + return nil, fmt.Errorf("kv get: %v", err) + } + + return +} + +func (store *Postgres3Store) KvDelete(ctx context.Context, key []byte) error { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return fmt.Errorf("findDB: %v", err) + } + + res, err := db.ExecContext(ctx, fmt.Sprintf(deleteEntryPattern, abstract_sql.DEFAULT_TABLE), key) + if err != nil { + return fmt.Errorf("kv delete: %s", err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("kv delete no rows affected: %s", err) + } + + return nil +} diff --git a/weed/filer/postgres3/postgres3_store.go b/weed/filer/postgres3/postgres3_store.go new file mode 100644 index 000000000..df73f724f --- /dev/null +++ b/weed/filer/postgres3/postgres3_store.go @@ -0,0 +1,512 @@ +package postgres3 + +/* + * Copyright 2022 Splunk Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "context" + "database/sql" + "fmt" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/lib/pq" + _ "github.com/lib/pq" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const ( + CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" + + createTablePattern = `CREATE TABLE IF NOT EXISTS "%s" ( + key varchar(65535) PRIMARY KEY, + name varchar(65535), + prefixes bigint[], + meta bytea + )` + createTableIndexPattern = `CREATE INDEX on "%s" USING gin (prefixes);` + deleteTablePattern = `DROP TABLE "%s";` + insertEntryPattern = `INSERT INTO "%s" (key, name, prefixes, meta) VALUES ($1, $2, $3, $4) + ON CONFLICT (key) + DO + UPDATE SET meta = EXCLUDED.meta;` + findEntryPattern = `SELECT meta FROM "%s" WHERE key = $1` + deleteEntryPattern = `DELETE FROM "%s" WHERE key = $1` + listEntryQueryPattern = `SELECT key, name, isdir, meta FROM + ( + SELECT key, name, false as isdir, meta FROM "%s" + WHERE prefixes @> $1 AND cardinality(prefixes) < $2 - 1 + AND name __COMPARISON__ $3 AND name LIKE $4 ORDER BY key ASC LIMIT $6 + ) s1 + UNION + ( + SELECT dir, dir, true isdir, NULL::bytea meta FROM + ( + SELECT DISTINCT split_part(key, '/', $2) AS dir FROM "%s" + WHERE prefixes @> $1 AND cardinality(prefixes) > $5 - 1 ORDER BY dir ASC + ) t1 + WHERE t1.dir > $3 AND t1.dir LIKE $4 ORDER BY dir ASC + ) + ORDER BY name ASC LIMIT $6` + deleteFolderChildrenPattern = `DELETE FROM "%s" WHERE prefixes @> $1 and key like $2` +) + +var ( + listEntryExclusivePattern string + listEntryInclusivePattern string +) + +var _ filer.BucketAware = (*Postgres3Store)(nil) + +func init() { + filer.Stores = append(filer.Stores, &Postgres3Store{}) + + listEntryExclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">") + listEntryInclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">=") +} + +type Postgres3Store struct { + DB *sql.DB + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex +} + +func (store *Postgres3Store) GetName() string { + return "postgres3" +} + +func (store *Postgres3Store) Initialize(configuration util.Configuration, prefix string) error { + return store.initialize( + configuration.GetString(prefix+"createTable"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"schema"), + configuration.GetString(prefix+"sslmode"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + configuration.GetInt(prefix+"connection_max_lifetime_seconds"), + ) +} + +func (store *Postgres3Store) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { + store.SupportBucketTable = true + sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) + if user != "" { + sqlUrl += " user=" + user + } + adaptedSqlUrl := sqlUrl + if password != "" { + sqlUrl += " password=" + password + adaptedSqlUrl += " password=ADAPTED" + } + if database != "" { + sqlUrl += " dbname=" + database + adaptedSqlUrl += " dbname=" + database + } + if schema != "" { + sqlUrl += " search_path=" + schema + adaptedSqlUrl += " search_path=" + schema + } + var dbErr error + store.DB, dbErr = sql.Open("postgres", sqlUrl) + if dbErr != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) + } + + store.DB.SetMaxIdleConns(maxIdle) + store.DB.SetMaxOpenConns(maxOpen) + store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second) + + if err = store.DB.Ping(); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { + return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) + } + + return nil +} + +func (store *Postgres3Store) CanDropWholeBucket() bool { + return store.SupportBucketTable +} + +func (store *Postgres3Store) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + store.CreateTable(context.Background(), bucket) + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} + +func (store *Postgres3Store) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + store.deleteTable(context.Background(), bucket) + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + +func (store *Postgres3Store) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB abstract_sql.TxOrDB, bucket string, shortPath util.FullPath, err error) { + + shortPath = fullpath + bucket = abstract_sql.DEFAULT_TABLE + + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + txOrDB = tx + } else { + txOrDB = store.DB + } + + if !store.SupportBucketTable { + return + } + + if !strings.HasPrefix(string(fullpath), "/buckets/") { + return + } + + // detect bucket + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 && !isForChildren { + return + } + bucket = bucketAndObjectKey + shortPath = "/" + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = util.FullPath(bucketAndObjectKey[t:]) + } + + if isValidBucket(bucket) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if store.dbs == nil { + store.dbs = make(map[string]bool) + } + + if _, found := store.dbs[bucket]; !found { + if err = store.CreateTable(ctx, bucket); err == nil { + store.dbs[bucket] = true + } + } + + } + + return +} + +func (store *Postgres3Store) InsertEntry(ctx context.Context, entry *filer.Entry) error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", entry.FullPath, err) + } + + if entry.IsDirectory() { + if isValidBucket(bucket) && !strings.HasPrefix(string(shortPath), "/.uploads") { + // Ignore directory creations, but not bucket creations or multipart uploads + return nil + } + } + + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + + prefixes := calculatePrefixes(string(shortPath)) + hashedPrefixes := hashPrefixArray(prefixes) + _, err = db.ExecContext(ctx, fmt.Sprintf(insertEntryPattern, bucket), shortPath, path.Base(string(shortPath)), pq.Array(hashedPrefixes), meta) + if err != nil { + return fmt.Errorf("insert/upsert %s: %s", entry.FullPath, err) + } + return nil +} + +func (store *Postgres3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + return store.InsertEntry(ctx, entry) +} + +func (store *Postgres3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { + + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + if err != nil { + return nil, fmt.Errorf("findDB %s : %v", fullpath, err) + } + + row := db.QueryRowContext(ctx, fmt.Sprintf(findEntryPattern, bucket), shortPath) + + var data []byte + if err := row.Scan(&data); err != nil { + if err == sql.ErrNoRows { + return nil, filer_pb.ErrNotFound + } + return nil, fmt.Errorf("find %s: %v", fullpath, err) + } + + entry := &filer.Entry{ + FullPath: fullpath, + } + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *Postgres3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + res, err := db.ExecContext(ctx, fmt.Sprintf(deleteEntryPattern, bucket), shortPath) + if err != nil { + return fmt.Errorf("delete %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err) + } + + return nil +} + +func (store *Postgres3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + if isValidBucket(bucket) && shortPath == "/" { + if err = store.deleteTable(ctx, bucket); err == nil { + store.dbsLock.Lock() + delete(store.dbs, bucket) + store.dbsLock.Unlock() + return nil + } else { + return err + } + } + + sqlText := fmt.Sprintf(deleteFolderChildrenPattern, bucket) + prefixes := calculatePrefixes(string(shortPath)) + hashedPrefixes := hashPrefixArray(prefixes) + glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), sqlText, hashedPrefixes) + res, err := db.ExecContext(ctx, sqlText, pq.Array(hashedPrefixes), string(shortPath)+"/%") + if err != nil { + return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) + } + + _, err = res.RowsAffected() + if err != nil { + return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) + } + return nil +} + +func (store *Postgres3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) +} + +func (store *Postgres3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true) + if err != nil { + return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) + } + + slashedShortPath := appendSlash(string(shortPath)) + shortPathParts := len(strings.Split(slashedShortPath, "/")) + + sqlText := fmt.Sprintf(listEntryExclusivePattern, bucket, bucket) + if includeStartFile { + sqlText = fmt.Sprintf(listEntryInclusivePattern, bucket, bucket) + } + + prefixes := calculatePrefixes(string(slashedShortPath)) + hashedPrefixes := hashPrefixArray(prefixes) + + rows, err := db.QueryContext(ctx, sqlText, + pq.Array(hashedPrefixes), + shortPathParts, + startFileName, + prefix+"%", + shortPathParts-1, + limit+1) + + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + defer rows.Close() + + for rows.Next() { + var key string + var name string + var isDir bool + var data []byte + if err = rows.Scan(&key, &name, &isDir, &data); err != nil { + glog.V(0).Infof("scan %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err) + } + + if !isDir { + lastFileName = name + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), name), + } + + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { + glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) + return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + + if !eachEntryFunc(entry) { + break + } + } else { + lastFileName = key + dirName := key + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), dirName), + } + + entry.Attr.Mode |= os.ModeDir | 0775 + if !eachEntryFunc(entry) { + break + } + } + } + + return lastFileName, nil +} + +func (store *Postgres3Store) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: false, + }) + if err != nil { + return ctx, err + } + + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *Postgres3Store) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Commit() + } + return nil +} + +func (store *Postgres3Store) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Rollback() + } + return nil +} + +func (store *Postgres3Store) Shutdown() { + store.DB.Close() +} + +func (store *Postgres3Store) CreateTable(ctx context.Context, bucket string) error { + _, err := store.DB.ExecContext(ctx, fmt.Sprintf(createTablePattern, bucket)) + if err != nil { + return fmt.Errorf("create bucket table: %v", err) + } + + _, err = store.DB.ExecContext(ctx, fmt.Sprintf(createTableIndexPattern, bucket)) + if err != nil { + return fmt.Errorf("create bucket index: %v", err) + } + return err +} + +func (store *Postgres3Store) deleteTable(ctx context.Context, bucket string) error { + if !store.SupportBucketTable { + return nil + } + _, err := store.DB.ExecContext(ctx, fmt.Sprintf(deleteTablePattern, bucket)) + return err +} + +func isValidBucket(bucket string) bool { + return bucket != abstract_sql.DEFAULT_TABLE && bucket != "" +} + +// calculatePrefixes returns the prefixes for a given path. The root prefix "/" is ignored to +// save space in the returned array +func calculatePrefixes(fullPath string) []string { + res := strings.Split(fullPath, "/") + maxPrefixes := len(res) + + var retval []string + for i := 1; i < maxPrefixes; i++ { + calculatedPrefix := strings.Join(res[0:i], "/") + "/" + if calculatedPrefix == "/" { + continue + } + retval = append(retval, calculatedPrefix) + } + return retval +} + +// hashPrefixArray converts input prefix array into int64 hashes +func hashPrefixArray(a []string) []int64 { + hashed := make([]int64, len(a)) + for i := range a { + hashed[i] = util.HashStringToLong(a[i]) + } + return hashed +} + +func appendSlash(s string) string { + if !strings.HasSuffix(s, "/") { + return s + "/" + } + return s +} diff --git a/weed/filer/postgres3/postgres3_store_test.go b/weed/filer/postgres3/postgres3_store_test.go new file mode 100644 index 000000000..89d7d4ef1 --- /dev/null +++ b/weed/filer/postgres3/postgres3_store_test.go @@ -0,0 +1,41 @@ +package postgres3 + +/* + * Copyright 2022 Splunk Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculatePrefixes(t *testing.T) { + assert := assert.New(t) + var path string + var prefixes []string + + path = "/test1" + prefixes = calculatePrefixes(path) + assert.Equal(prefixes, []string(nil)) + + path = "/test1/test2" + prefixes = calculatePrefixes(path) + assert.Equal(prefixes, []string{"/test1/"}) + + path = "/test1/test2/test3" + prefixes = calculatePrefixes(path) + assert.Equal(prefixes, []string{"/test1/", "/test1/test2/"}) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 7b57c68c7..aec36d0a9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -34,6 +34,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2" _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres" _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2" + _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres3" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3" diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index a67610d44..f80fea233 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -89,7 +89,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque // delete all tags or specific tags hasDeletion := false - for header, _ := range existingEntry.Extended { + for header := range existingEntry.Extended { if strings.HasPrefix(header, needle.PairNamePrefix) { if len(deletions) == 0 { delete(existingEntry.Extended, header)