Browse Source

Rename to postgres_s3

pull/3961/head
Adam Lamar 2 years ago
parent
commit
339541a71b
  1. 4
      docker/filer.toml
  2. 0
      docker/filer_postgres_s3.toml
  3. 12
      weed/filer/postgres_s3/README.md
  4. 8
      weed/filer/postgres_s3/postgres3_kvstore.go
  5. 51
      weed/filer/postgres_s3/postgres3_store.go
  6. 2
      weed/filer/postgres_s3/postgres3_store_test.go
  7. 2
      weed/server/filer_server.go

4
docker/filer.toml

@ -1,3 +1,3 @@
[leveldb3]
[rocksdb]
enabled = true enabled = true
dir = "/data/filer_leveldb3"
dir = "/data/filer_rocksdb"

0
docker/filer_postgres3.toml → docker/filer_postgres_s3.toml

12
weed/filer/postgres3/README.md → weed/filer/postgres_s3/README.md

@ -1,6 +1,6 @@
# postgres3
# postgres_s3
The `postgres3` filer implementation uses postgres-specific features and data structures to improve upon previous SQL implementations based on the `abstract_sql` module.
The `postgres_s3` filer implementation uses postgres-specific features and data structures to improve upon previous SQL implementations based on the `abstract_sql` module.
Of note, the `postgres2` filer implementation may leak directory hierarchy metadata when frequent inserts and deletes are Of note, the `postgres2` filer implementation may leak directory hierarchy metadata when frequent inserts and deletes are
performed using the S3 API. If an application workload pattern creates directories, populates them temporarily, and then performed using the S3 API. If an application workload pattern creates directories, populates them temporarily, and then
@ -11,23 +11,23 @@ remains and places the burden of an unbounded number of unused rows on postgres.
Seaweedfs provides the `-s3.allowEmptyFolder=false` CLI argument to automatically clean up orphaned directory entries, but Seaweedfs provides the `-s3.allowEmptyFolder=false` CLI argument to automatically clean up orphaned directory entries, but
this process necessarily races under high load and can cause unpredictable filer and postgres behavior. this process necessarily races under high load and can cause unpredictable filer and postgres behavior.
To solve this problem, `postgres3` does the following:
To solve this problem, `postgres_s3` does the following:
1. One row in postgres _fully_ represents one object and its metadata 1. One row in postgres _fully_ represents one object and its metadata
2. Insert, update, get, and delete operate on a single row 2. Insert, update, get, and delete operate on a single row
3. An array is stored of possible prefixes for each key 3. An array is stored of possible prefixes for each key
4. List requests leverage the prefixes to dynamically assemble directory entries using a complex `SELECT` statement 4. List requests leverage the prefixes to dynamically assemble directory entries using a complex `SELECT` statement
In order to efficiently query directory entries during list requests, `postgres3` uses special features of
In order to efficiently query directory entries during list requests, `postgres_s3` uses special features of
postgres: postgres:
* An int64 array field called `prefixes` with a hash of each prefix found for a specific key * An int64 array field called `prefixes` with a hash of each prefix found for a specific key
* GIN indexing that provides fast set membership information on array fields * GIN indexing that provides fast set membership information on array fields
* Special functions `split_part` (text parsing) and `cardinality` (length of the array field) * Special functions `split_part` (text parsing) and `cardinality` (length of the array field)
`postgres3` uses automatic upsert capability with `ON CONFLICT ... UPDATE` so that insert and update are the same
`postgres_s3` uses automatic upsert capability with `ON CONFLICT ... UPDATE` so that insert and update are the same
race-free operation. race-free operation.
In the filer metadata tables, all objects start with `/`, causing prefix calculation to include the empty string (`""`). In the filer metadata tables, all objects start with `/`, causing prefix calculation to include the empty string (`""`).
For space and index optimization, `postgres3` does not store the root prefix in the `prefixes` array, and instead
For space and index optimization, `postgres_s3` does not store the root prefix in the `prefixes` array, and instead
relies on the condition `cardinality(prefixes) < 1` to discover objects at the root directory. relies on the condition `cardinality(prefixes) < 1` to discover objects at the root directory.

8
weed/filer/postgres3/postgres3_kvstore.go → weed/filer/postgres_s3/postgres3_kvstore.go

@ -1,4 +1,4 @@
package postgres3
package postgres_s3
/* /*
* Copyright 2022 Splunk Inc. * Copyright 2022 Splunk Inc.
@ -27,7 +27,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql" "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
) )
func (store *Postgres3Store) KvPut(ctx context.Context, key []byte, value []byte) error {
func (store *PostgresS3Store) KvPut(ctx context.Context, key []byte, value []byte) error {
db, _, _, err := store.getTxOrDB(ctx, "", false) db, _, _, err := store.getTxOrDB(ctx, "", false)
if err != nil { if err != nil {
return fmt.Errorf("findDB: %v", err) return fmt.Errorf("findDB: %v", err)
@ -42,7 +42,7 @@ func (store *Postgres3Store) KvPut(ctx context.Context, key []byte, value []byte
return nil return nil
} }
func (store *Postgres3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
func (store *PostgresS3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
db, _, _, err := store.getTxOrDB(ctx, "", false) db, _, _, err := store.getTxOrDB(ctx, "", false)
if err != nil { if err != nil {
return nil, fmt.Errorf("findDB: %v", err) return nil, fmt.Errorf("findDB: %v", err)
@ -63,7 +63,7 @@ func (store *Postgres3Store) KvGet(ctx context.Context, key []byte) (value []byt
return return
} }
func (store *Postgres3Store) KvDelete(ctx context.Context, key []byte) error {
func (store *PostgresS3Store) KvDelete(ctx context.Context, key []byte) error {
db, _, _, err := store.getTxOrDB(ctx, "", false) db, _, _, err := store.getTxOrDB(ctx, "", false)
if err != nil { if err != nil {
return fmt.Errorf("findDB: %v", err) return fmt.Errorf("findDB: %v", err)

51
weed/filer/postgres3/postgres3_store.go → weed/filer/postgres_s3/postgres3_store.go

@ -1,4 +1,4 @@
package postgres3
package postgres_s3
/* /*
* Copyright 2022 Splunk Inc. * Copyright 2022 Splunk Inc.
@ -76,29 +76,28 @@ var (
listEntryInclusivePattern string listEntryInclusivePattern string
) )
var _ filer.BucketAware = (*Postgres3Store)(nil)
var _ filer.BucketAware = (*PostgresS3Store)(nil)
func init() { func init() {
filer.Stores = append(filer.Stores, &Postgres3Store{})
filer.Stores = append(filer.Stores, &PostgresS3Store{})
listEntryExclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">") listEntryExclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">")
listEntryInclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">=") listEntryInclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">=")
} }
type Postgres3Store struct {
type PostgresS3Store struct {
DB *sql.DB DB *sql.DB
SupportBucketTable bool SupportBucketTable bool
dbs map[string]bool dbs map[string]bool
dbsLock sync.Mutex dbsLock sync.Mutex
} }
func (store *Postgres3Store) GetName() string {
return "postgres3"
func (store *PostgresS3Store) GetName() string {
return "postgres_s3"
} }
func (store *Postgres3Store) Initialize(configuration util.Configuration, prefix string) error {
func (store *PostgresS3Store) Initialize(configuration util.Configuration, prefix string) error {
return store.initialize( return store.initialize(
configuration.GetString(prefix+"createTable"),
configuration.GetString(prefix+"username"), configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"), configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"), configuration.GetString(prefix+"hostname"),
@ -112,7 +111,7 @@ func (store *Postgres3Store) Initialize(configuration util.Configuration, prefix
) )
} }
func (store *Postgres3Store) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
func (store *PostgresS3Store) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = true store.SupportBucketTable = true
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" { if user != "" {
@ -154,11 +153,11 @@ func (store *Postgres3Store) initialize(createTable, user, password, hostname st
return nil return nil
} }
func (store *Postgres3Store) CanDropWholeBucket() bool {
func (store *PostgresS3Store) CanDropWholeBucket() bool {
return store.SupportBucketTable return store.SupportBucketTable
} }
func (store *Postgres3Store) OnBucketCreation(bucket string) {
func (store *PostgresS3Store) OnBucketCreation(bucket string) {
store.dbsLock.Lock() store.dbsLock.Lock()
defer store.dbsLock.Unlock() defer store.dbsLock.Unlock()
@ -170,7 +169,7 @@ func (store *Postgres3Store) OnBucketCreation(bucket string) {
store.dbs[bucket] = true store.dbs[bucket] = true
} }
func (store *Postgres3Store) OnBucketDeletion(bucket string) {
func (store *PostgresS3Store) OnBucketDeletion(bucket string) {
store.dbsLock.Lock() store.dbsLock.Lock()
defer store.dbsLock.Unlock() defer store.dbsLock.Unlock()
@ -182,7 +181,7 @@ func (store *Postgres3Store) OnBucketDeletion(bucket string) {
delete(store.dbs, bucket) delete(store.dbs, bucket)
} }
func (store *Postgres3Store) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB abstract_sql.TxOrDB, bucket string, shortPath util.FullPath, err error) {
func (store *PostgresS3Store) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB abstract_sql.TxOrDB, bucket string, shortPath util.FullPath, err error) {
shortPath = fullpath shortPath = fullpath
bucket = abstract_sql.DEFAULT_TABLE bucket = abstract_sql.DEFAULT_TABLE
@ -233,7 +232,7 @@ func (store *Postgres3Store) getTxOrDB(ctx context.Context, fullpath util.FullPa
return return
} }
func (store *Postgres3Store) InsertEntry(ctx context.Context, entry *filer.Entry) error {
func (store *PostgresS3Store) InsertEntry(ctx context.Context, entry *filer.Entry) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err) return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
@ -264,11 +263,11 @@ func (store *Postgres3Store) InsertEntry(ctx context.Context, entry *filer.Entry
return nil return nil
} }
func (store *Postgres3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
func (store *PostgresS3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
return store.InsertEntry(ctx, entry) return store.InsertEntry(ctx, entry)
} }
func (store *Postgres3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
func (store *PostgresS3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil { if err != nil {
@ -295,7 +294,7 @@ func (store *Postgres3Store) FindEntry(ctx context.Context, fullpath util.FullPa
return entry, nil return entry, nil
} }
func (store *Postgres3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
func (store *PostgresS3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err) return fmt.Errorf("findDB %s : %v", fullpath, err)
@ -314,7 +313,7 @@ func (store *Postgres3Store) DeleteEntry(ctx context.Context, fullpath util.Full
return nil return nil
} }
func (store *Postgres3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
func (store *PostgresS3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
if err != nil { if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err) return fmt.Errorf("findDB %s : %v", fullpath, err)
@ -347,11 +346,11 @@ func (store *Postgres3Store) DeleteFolderChildren(ctx context.Context, fullpath
return nil return nil
} }
func (store *Postgres3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *PostgresS3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
} }
func (store *Postgres3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
func (store *PostgresS3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true) db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
if err != nil { if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
@ -423,7 +422,7 @@ func (store *Postgres3Store) ListDirectoryPrefixedEntries(ctx context.Context, d
return lastFileName, nil return lastFileName, nil
} }
func (store *Postgres3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
func (store *PostgresS3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted, Isolation: sql.LevelReadCommitted,
ReadOnly: false, ReadOnly: false,
@ -435,25 +434,25 @@ func (store *Postgres3Store) BeginTransaction(ctx context.Context) (context.Cont
return context.WithValue(ctx, "tx", tx), nil return context.WithValue(ctx, "tx", tx), nil
} }
func (store *Postgres3Store) CommitTransaction(ctx context.Context) error {
func (store *PostgresS3Store) CommitTransaction(ctx context.Context) error {
if tx, ok := ctx.Value("tx").(*sql.Tx); ok { if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
return tx.Commit() return tx.Commit()
} }
return nil return nil
} }
func (store *Postgres3Store) RollbackTransaction(ctx context.Context) error {
func (store *PostgresS3Store) RollbackTransaction(ctx context.Context) error {
if tx, ok := ctx.Value("tx").(*sql.Tx); ok { if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
return tx.Rollback() return tx.Rollback()
} }
return nil return nil
} }
func (store *Postgres3Store) Shutdown() {
func (store *PostgresS3Store) Shutdown() {
store.DB.Close() store.DB.Close()
} }
func (store *Postgres3Store) CreateTable(ctx context.Context, bucket string) error {
func (store *PostgresS3Store) CreateTable(ctx context.Context, bucket string) error {
_, err := store.DB.ExecContext(ctx, fmt.Sprintf(createTablePattern, bucket)) _, err := store.DB.ExecContext(ctx, fmt.Sprintf(createTablePattern, bucket))
if err != nil { if err != nil {
return fmt.Errorf("create bucket table: %v", err) return fmt.Errorf("create bucket table: %v", err)
@ -466,7 +465,7 @@ func (store *Postgres3Store) CreateTable(ctx context.Context, bucket string) err
return err return err
} }
func (store *Postgres3Store) deleteTable(ctx context.Context, bucket string) error {
func (store *PostgresS3Store) deleteTable(ctx context.Context, bucket string) error {
if !store.SupportBucketTable { if !store.SupportBucketTable {
return nil return nil
} }

2
weed/filer/postgres3/postgres3_store_test.go → weed/filer/postgres_s3/postgres3_store_test.go

@ -1,4 +1,4 @@
package postgres3
package postgres_s3
/* /*
* Copyright 2022 Splunk Inc. * Copyright 2022 Splunk Inc.

2
weed/server/filer_server.go

@ -34,7 +34,7 @@ import (
_ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2" _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres" _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2" _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres3"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres_s3"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis2" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis3" _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"

Loading…
Cancel
Save