You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

512 lines
14 KiB

2 years ago
2 years ago
  1. package postgres3
  2. /*
  3. * Copyright 2022 Splunk Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. import (
  18. "context"
  19. "database/sql"
  20. "fmt"
  21. "os"
  22. "path"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/lib/pq"
  27. _ "github.com/lib/pq"
  28. "github.com/seaweedfs/seaweedfs/weed/filer"
  29. "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
  30. "github.com/seaweedfs/seaweedfs/weed/glog"
  31. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  32. "github.com/seaweedfs/seaweedfs/weed/util"
  33. )
  34. const (
  35. CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
  36. createTablePattern = `CREATE TABLE IF NOT EXISTS "%s" (
  37. key varchar(65535) PRIMARY KEY,
  38. name varchar(65535),
  39. prefixes bigint[],
  40. meta bytea
  41. )`
  42. createTableIndexPattern = `CREATE INDEX on "%s" USING gin (prefixes);`
  43. deleteTablePattern = `DROP TABLE "%s";`
  44. insertEntryPattern = `INSERT INTO "%s" (key, name, prefixes, meta) VALUES ($1, $2, $3, $4)
  45. ON CONFLICT (key)
  46. DO
  47. UPDATE SET meta = EXCLUDED.meta;`
  48. findEntryPattern = `SELECT meta FROM "%s" WHERE key = $1`
  49. deleteEntryPattern = `DELETE FROM "%s" WHERE key = $1`
  50. listEntryQueryPattern = `SELECT key, name, isdir, meta FROM
  51. (
  52. SELECT key, name, false as isdir, meta FROM "%s"
  53. WHERE prefixes @> $1 AND cardinality(prefixes) < $5
  54. AND name __COMPARISON__ $3 AND name LIKE $4 ORDER BY key ASC LIMIT $6
  55. ) s1
  56. UNION
  57. (
  58. SELECT dir, dir, true isdir, NULL::bytea meta FROM
  59. (
  60. SELECT DISTINCT split_part(key, '/', $2) AS dir FROM "%s"
  61. WHERE prefixes @> $1 AND cardinality(prefixes) > $5 - 1 ORDER BY dir ASC
  62. ) t1
  63. WHERE t1.dir > $3 AND t1.dir LIKE $4 ORDER BY dir ASC
  64. )
  65. ORDER BY name ASC LIMIT $6`
  66. deleteFolderChildrenPattern = `DELETE FROM "%s" WHERE prefixes @> $1 and key like $2`
  67. )
  68. var (
  69. listEntryExclusivePattern string
  70. listEntryInclusivePattern string
  71. )
  72. var _ filer.BucketAware = (*Postgres3Store)(nil)
  73. func init() {
  74. filer.Stores = append(filer.Stores, &Postgres3Store{})
  75. listEntryExclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">")
  76. listEntryInclusivePattern = strings.ReplaceAll(listEntryQueryPattern, "__COMPARISON__", ">=")
  77. }
  78. type Postgres3Store struct {
  79. DB *sql.DB
  80. SupportBucketTable bool
  81. dbs map[string]bool
  82. dbsLock sync.Mutex
  83. }
  84. func (store *Postgres3Store) GetName() string {
  85. return "postgres3"
  86. }
  87. func (store *Postgres3Store) Initialize(configuration util.Configuration, prefix string) error {
  88. return store.initialize(
  89. configuration.GetString(prefix+"createTable"),
  90. configuration.GetString(prefix+"username"),
  91. configuration.GetString(prefix+"password"),
  92. configuration.GetString(prefix+"hostname"),
  93. configuration.GetInt(prefix+"port"),
  94. configuration.GetString(prefix+"database"),
  95. configuration.GetString(prefix+"schema"),
  96. configuration.GetString(prefix+"sslmode"),
  97. configuration.GetInt(prefix+"connection_max_idle"),
  98. configuration.GetInt(prefix+"connection_max_open"),
  99. configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
  100. )
  101. }
  102. func (store *Postgres3Store) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
  103. store.SupportBucketTable = true
  104. sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
  105. if user != "" {
  106. sqlUrl += " user=" + user
  107. }
  108. adaptedSqlUrl := sqlUrl
  109. if password != "" {
  110. sqlUrl += " password=" + password
  111. adaptedSqlUrl += " password=ADAPTED"
  112. }
  113. if database != "" {
  114. sqlUrl += " dbname=" + database
  115. adaptedSqlUrl += " dbname=" + database
  116. }
  117. if schema != "" {
  118. sqlUrl += " search_path=" + schema
  119. adaptedSqlUrl += " search_path=" + schema
  120. }
  121. var dbErr error
  122. store.DB, dbErr = sql.Open("postgres", sqlUrl)
  123. if dbErr != nil {
  124. store.DB.Close()
  125. store.DB = nil
  126. return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
  127. }
  128. store.DB.SetMaxIdleConns(maxIdle)
  129. store.DB.SetMaxOpenConns(maxOpen)
  130. store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
  131. if err = store.DB.Ping(); err != nil {
  132. return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
  133. }
  134. if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
  135. return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
  136. }
  137. return nil
  138. }
  139. func (store *Postgres3Store) CanDropWholeBucket() bool {
  140. return store.SupportBucketTable
  141. }
  142. func (store *Postgres3Store) OnBucketCreation(bucket string) {
  143. store.dbsLock.Lock()
  144. defer store.dbsLock.Unlock()
  145. store.CreateTable(context.Background(), bucket)
  146. if store.dbs == nil {
  147. return
  148. }
  149. store.dbs[bucket] = true
  150. }
  151. func (store *Postgres3Store) OnBucketDeletion(bucket string) {
  152. store.dbsLock.Lock()
  153. defer store.dbsLock.Unlock()
  154. store.deleteTable(context.Background(), bucket)
  155. if store.dbs == nil {
  156. return
  157. }
  158. delete(store.dbs, bucket)
  159. }
  160. func (store *Postgres3Store) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB abstract_sql.TxOrDB, bucket string, shortPath util.FullPath, err error) {
  161. shortPath = fullpath
  162. bucket = abstract_sql.DEFAULT_TABLE
  163. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  164. txOrDB = tx
  165. } else {
  166. txOrDB = store.DB
  167. }
  168. if !store.SupportBucketTable {
  169. return
  170. }
  171. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  172. return
  173. }
  174. // detect bucket
  175. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  176. t := strings.Index(bucketAndObjectKey, "/")
  177. if t < 0 && !isForChildren {
  178. return
  179. }
  180. bucket = bucketAndObjectKey
  181. shortPath = "/"
  182. if t > 0 {
  183. bucket = bucketAndObjectKey[:t]
  184. shortPath = util.FullPath(bucketAndObjectKey[t:])
  185. }
  186. if isValidBucket(bucket) {
  187. store.dbsLock.Lock()
  188. defer store.dbsLock.Unlock()
  189. if store.dbs == nil {
  190. store.dbs = make(map[string]bool)
  191. }
  192. if _, found := store.dbs[bucket]; !found {
  193. if err = store.CreateTable(ctx, bucket); err == nil {
  194. store.dbs[bucket] = true
  195. }
  196. }
  197. }
  198. return
  199. }
  200. func (store *Postgres3Store) InsertEntry(ctx context.Context, entry *filer.Entry) error {
  201. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  202. if err != nil {
  203. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  204. }
  205. if entry.IsDirectory() {
  206. if isValidBucket(bucket) && !strings.HasPrefix(string(shortPath), "/.uploads") {
  207. // Ignore directory creations, but not bucket creations or multipart uploads
  208. return nil
  209. }
  210. }
  211. meta, err := entry.EncodeAttributesAndChunks()
  212. if err != nil {
  213. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  214. }
  215. if len(entry.Chunks) > 50 {
  216. meta = util.MaybeGzipData(meta)
  217. }
  218. prefixes := calculatePrefixes(string(shortPath))
  219. hashedPrefixes := hashPrefixArray(prefixes)
  220. _, err = db.ExecContext(ctx, fmt.Sprintf(insertEntryPattern, bucket), shortPath, path.Base(string(shortPath)), pq.Array(hashedPrefixes), meta)
  221. if err != nil {
  222. return fmt.Errorf("insert/upsert %s: %s", entry.FullPath, err)
  223. }
  224. return nil
  225. }
  226. func (store *Postgres3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
  227. return store.InsertEntry(ctx, entry)
  228. }
  229. func (store *Postgres3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  230. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  231. if err != nil {
  232. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  233. }
  234. row := db.QueryRowContext(ctx, fmt.Sprintf(findEntryPattern, bucket), shortPath)
  235. var data []byte
  236. if err := row.Scan(&data); err != nil {
  237. if err == sql.ErrNoRows {
  238. return nil, filer_pb.ErrNotFound
  239. }
  240. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  241. }
  242. entry := &filer.Entry{
  243. FullPath: fullpath,
  244. }
  245. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  246. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  247. }
  248. return entry, nil
  249. }
  250. func (store *Postgres3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  251. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  252. if err != nil {
  253. return fmt.Errorf("findDB %s : %v", fullpath, err)
  254. }
  255. res, err := db.ExecContext(ctx, fmt.Sprintf(deleteEntryPattern, bucket), shortPath)
  256. if err != nil {
  257. return fmt.Errorf("delete %s: %s", fullpath, err)
  258. }
  259. _, err = res.RowsAffected()
  260. if err != nil {
  261. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  262. }
  263. return nil
  264. }
  265. func (store *Postgres3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  266. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  267. if err != nil {
  268. return fmt.Errorf("findDB %s : %v", fullpath, err)
  269. }
  270. if isValidBucket(bucket) && shortPath == "/" {
  271. if err = store.deleteTable(ctx, bucket); err == nil {
  272. store.dbsLock.Lock()
  273. delete(store.dbs, bucket)
  274. store.dbsLock.Unlock()
  275. return nil
  276. } else {
  277. return err
  278. }
  279. }
  280. sqlText := fmt.Sprintf(deleteFolderChildrenPattern, bucket)
  281. prefixes := calculatePrefixes(string(shortPath))
  282. hashedPrefixes := hashPrefixArray(prefixes)
  283. glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), sqlText, hashedPrefixes)
  284. res, err := db.ExecContext(ctx, sqlText, pq.Array(hashedPrefixes), string(shortPath)+"/%")
  285. if err != nil {
  286. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  287. }
  288. _, err = res.RowsAffected()
  289. if err != nil {
  290. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  291. }
  292. return nil
  293. }
  294. func (store *Postgres3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  295. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  296. }
  297. func (store *Postgres3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  298. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  299. if err != nil {
  300. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  301. }
  302. slashedShortPath := appendSlash(string(shortPath))
  303. shortPathParts := len(strings.Split(slashedShortPath, "/"))
  304. sqlText := fmt.Sprintf(listEntryExclusivePattern, bucket, bucket)
  305. if includeStartFile {
  306. sqlText = fmt.Sprintf(listEntryInclusivePattern, bucket, bucket)
  307. }
  308. prefixes := calculatePrefixes(string(slashedShortPath))
  309. hashedPrefixes := hashPrefixArray(prefixes)
  310. rows, err := db.QueryContext(ctx, sqlText,
  311. pq.Array(hashedPrefixes),
  312. shortPathParts,
  313. startFileName,
  314. prefix+"%",
  315. shortPathParts-1,
  316. limit+1)
  317. if err != nil {
  318. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  319. }
  320. defer rows.Close()
  321. for rows.Next() {
  322. var key string
  323. var name string
  324. var isDir bool
  325. var data []byte
  326. if err = rows.Scan(&key, &name, &isDir, &data); err != nil {
  327. glog.V(0).Infof("scan %s : %v", dirPath, err)
  328. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  329. }
  330. if !isDir {
  331. lastFileName = name
  332. entry := &filer.Entry{
  333. FullPath: util.NewFullPath(string(dirPath), name),
  334. }
  335. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  336. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  337. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  338. }
  339. if !eachEntryFunc(entry) {
  340. break
  341. }
  342. } else {
  343. lastFileName = key
  344. dirName := key
  345. entry := &filer.Entry{
  346. FullPath: util.NewFullPath(string(dirPath), dirName),
  347. }
  348. entry.Attr.Mode |= os.ModeDir | 0775
  349. if !eachEntryFunc(entry) {
  350. break
  351. }
  352. }
  353. }
  354. return lastFileName, nil
  355. }
  356. func (store *Postgres3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
  357. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  358. Isolation: sql.LevelReadCommitted,
  359. ReadOnly: false,
  360. })
  361. if err != nil {
  362. return ctx, err
  363. }
  364. return context.WithValue(ctx, "tx", tx), nil
  365. }
  366. func (store *Postgres3Store) CommitTransaction(ctx context.Context) error {
  367. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  368. return tx.Commit()
  369. }
  370. return nil
  371. }
  372. func (store *Postgres3Store) RollbackTransaction(ctx context.Context) error {
  373. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  374. return tx.Rollback()
  375. }
  376. return nil
  377. }
  378. func (store *Postgres3Store) Shutdown() {
  379. store.DB.Close()
  380. }
  381. func (store *Postgres3Store) CreateTable(ctx context.Context, bucket string) error {
  382. _, err := store.DB.ExecContext(ctx, fmt.Sprintf(createTablePattern, bucket))
  383. if err != nil {
  384. return fmt.Errorf("create bucket table: %v", err)
  385. }
  386. _, err = store.DB.ExecContext(ctx, fmt.Sprintf(createTableIndexPattern, bucket))
  387. if err != nil {
  388. return fmt.Errorf("create bucket index: %v", err)
  389. }
  390. return err
  391. }
  392. func (store *Postgres3Store) deleteTable(ctx context.Context, bucket string) error {
  393. if !store.SupportBucketTable {
  394. return nil
  395. }
  396. _, err := store.DB.ExecContext(ctx, fmt.Sprintf(deleteTablePattern, bucket))
  397. return err
  398. }
  399. func isValidBucket(bucket string) bool {
  400. return bucket != abstract_sql.DEFAULT_TABLE && bucket != ""
  401. }
  402. // calculatePrefixes returns the prefixes for a given path. The root prefix "/" is ignored to
  403. // save space in the returned array
  404. func calculatePrefixes(fullPath string) []string {
  405. res := strings.Split(fullPath, "/")
  406. maxPrefixes := len(res)
  407. var retval []string
  408. for i := 1; i < maxPrefixes; i++ {
  409. calculatedPrefix := strings.Join(res[0:i], "/") + "/"
  410. if calculatedPrefix == "/" {
  411. continue
  412. }
  413. retval = append(retval, calculatedPrefix)
  414. }
  415. return retval
  416. }
  417. // hashPrefixArray converts input prefix array into int64 hashes
  418. func hashPrefixArray(a []string) []int64 {
  419. hashed := make([]int64, len(a))
  420. for i := range a {
  421. hashed[i] = util.HashStringToLong(a[i])
  422. }
  423. return hashed
  424. }
  425. func appendSlash(s string) string {
  426. if !strings.HasSuffix(s, "/") {
  427. return s + "/"
  428. }
  429. return s
  430. }