You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

623 lines
18 KiB

  1. package postgres_store
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/filer"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. _ "github.com/lib/pq"
  12. _ "path/filepath"
  13. "strings"
  14. )
  15. const (
  16. default_maxIdleConnections = 100
  17. default_maxOpenConnections = 50
  18. filesTableName = "files"
  19. directoriesTableName = "directories"
  20. )
  21. var (
  22. _init_db sync.Once
  23. _db_connection *sql.DB
  24. )
  25. type PostgresConf struct {
  26. User string
  27. Password string
  28. HostName string
  29. Port int
  30. DataBase string
  31. SslMode string
  32. MaxIdleConnections int
  33. MaxOpenConnections int
  34. }
  35. type PostgresStore struct {
  36. db *sql.DB
  37. server string
  38. user string
  39. password string
  40. }
  41. func (s *PostgresStore) CreateFile(fullFileName string, fid string) (err error) {
  42. glog.V(3).Infoln("Calling posgres_store CreateFile")
  43. return s.Put(fullFileName, fid)
  44. }
  45. func (s *PostgresStore) FindFile(fullFileName string) (fid string, err error) {
  46. glog.V(3).Infoln("Calling posgres_store FindFile")
  47. return s.Get(fullFileName)
  48. }
  49. func (s *PostgresStore) DeleteFile(fullFileName string) (fid string, err error) {
  50. glog.V(3).Infoln("Calling posgres_store DeleteFile")
  51. return "", s.Delete(fullFileName)
  52. }
  53. func (s *PostgresStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
  54. glog.V(3).Infoln("Calling posgres_store FindDirectory")
  55. return s.FindDir(dirPath)
  56. }
  57. func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
  58. glog.V(3).Infoln("Calling posgres_store ListDirectories")
  59. return s.ListDirs(dirPath)
  60. }
  61. func (s *PostgresStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  62. glog.V(3).Infoln("Calling posgres_store ListFiles")
  63. return s.FindFiles(dirPath, lastFileName, limit)
  64. }
  65. func (s *PostgresStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
  66. glog.V(3).Infoln("Calling posgres_store DeleteDirectory")
  67. return s.DeleteDir(dirPath, recursive)
  68. }
  69. func (s *PostgresStore) Move(fromPath string, toPath string) (err error) {
  70. glog.V(3).Infoln("Calling posgres_store Move")
  71. return errors.New("Move is not yet implemented for the PostgreSQL store.")
  72. }
  73. func databaseExists(db *sql.DB, databaseName string) (bool, error) {
  74. sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
  75. row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
  76. var dbName string
  77. err := row.Scan(&dbName)
  78. if err != nil {
  79. if err == sql.ErrNoRows {
  80. return false, nil
  81. }
  82. return false, err
  83. }
  84. return true, nil
  85. }
  86. func createDatabase(db *sql.DB, databaseName string) error {
  87. sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'"
  88. _, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName))
  89. return err
  90. }
  91. func getDbConnection(conf PostgresConf) *sql.DB {
  92. _init_db.Do(func() {
  93. sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
  94. glog.V(3).Infoln("Opening postgres master database")
  95. var dbErr error
  96. _db_connection, dbErr := sql.Open("postgres", sqlUrl)
  97. if dbErr != nil {
  98. _db_connection.Close()
  99. _db_connection = nil
  100. panic(dbErr)
  101. }
  102. pingErr := _db_connection.Ping()
  103. if pingErr != nil {
  104. _db_connection.Close()
  105. _db_connection = nil
  106. panic(pingErr)
  107. }
  108. glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase)
  109. var existsErr error
  110. dbExists, existsErr := databaseExists(_db_connection, conf.DataBase)
  111. if existsErr != nil {
  112. _db_connection.Close()
  113. _db_connection = nil
  114. panic(existsErr)
  115. }
  116. if !dbExists {
  117. glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase)
  118. createErr := createDatabase(_db_connection, conf.DataBase)
  119. if createErr != nil {
  120. _db_connection.Close()
  121. _db_connection = nil
  122. panic(createErr)
  123. }
  124. }
  125. glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase)
  126. _db_connection.Close()
  127. _db_connection = nil
  128. sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode)
  129. _db_connection, dbErr = sql.Open("postgres", sqlUrl)
  130. if dbErr != nil {
  131. _db_connection.Close()
  132. _db_connection = nil
  133. panic(dbErr)
  134. }
  135. pingErr = _db_connection.Ping()
  136. if pingErr != nil {
  137. _db_connection.Close()
  138. _db_connection = nil
  139. panic(pingErr)
  140. }
  141. var maxIdleConnections, maxOpenConnections int
  142. if conf.MaxIdleConnections != 0 {
  143. maxIdleConnections = conf.MaxIdleConnections
  144. } else {
  145. maxIdleConnections = default_maxIdleConnections
  146. }
  147. if conf.MaxOpenConnections != 0 {
  148. maxOpenConnections = conf.MaxOpenConnections
  149. } else {
  150. maxOpenConnections = default_maxOpenConnections
  151. }
  152. _db_connection.SetMaxIdleConns(maxIdleConnections)
  153. _db_connection.SetMaxOpenConns(maxOpenConnections)
  154. })
  155. return _db_connection
  156. }
  157. //func NewPostgresStore(master string, confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
  158. func NewPostgresStore(master string, conf PostgresConf) *PostgresStore {
  159. pg := &PostgresStore{
  160. db: getDbConnection(conf),
  161. }
  162. pg.createDirectoriesTable()
  163. if err := pg.createFilesTable(); err != nil {
  164. fmt.Printf("create table failed %v", err)
  165. }
  166. return pg
  167. }
  168. func (s *PostgresStore) Get(fullFilePath string) (fid string, err error) {
  169. if err != nil {
  170. return "", fmt.Errorf("PostgresStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
  171. }
  172. fid, err = s.query(fullFilePath)
  173. return fid, err
  174. }
  175. func (s *PostgresStore) Put(fullFilePath string, fid string) (err error) {
  176. var old_fid string
  177. if old_fid, err = s.query(fullFilePath); err != nil && err != sql.ErrNoRows {
  178. return fmt.Errorf("PostgresStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
  179. } else {
  180. if len(old_fid) == 0 {
  181. err = s.insert(fullFilePath, fid)
  182. if err != nil {
  183. return fmt.Errorf("PostgresStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
  184. }
  185. } else {
  186. err = s.update(fullFilePath, fid)
  187. if err != nil {
  188. return fmt.Errorf("PostgresStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
  189. }
  190. }
  191. }
  192. return
  193. }
  194. func (s *PostgresStore) Delete(fullFilePath string) (err error) {
  195. var fid string
  196. if err != nil {
  197. return fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
  198. }
  199. if fid, err = s.query(fullFilePath); err != nil {
  200. return fmt.Errorf("PostgresStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
  201. } else if fid == "" {
  202. return nil
  203. }
  204. if err = s.delete(fullFilePath); err != nil {
  205. return fmt.Errorf("PostgresStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
  206. } else {
  207. return nil
  208. }
  209. }
  210. func (s *PostgresStore) Close() {
  211. s.db.Close()
  212. }
  213. func (s *PostgresStore) FindDir(dirPath string) (dirId filer.DirectoryId, err error) {
  214. dirId, _, err = s.lookupDirectory(dirPath)
  215. return dirId, err
  216. }
  217. func (s *PostgresStore) ListDirs(dirPath string) (dirs []filer.DirectoryEntry, err error) {
  218. dirs, err = s.findDirectories(dirPath, 20)
  219. glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath)
  220. return dirs, err
  221. }
  222. func (s *PostgresStore) DeleteDir(dirPath string, recursive bool) (err error) {
  223. err = s.deleteDirectory(dirPath, recursive)
  224. if err != nil {
  225. glog.V(0).Infof("Error in Postgres DeleteDir '%s' (recursive = '%t'): %s", err)
  226. }
  227. return err
  228. }
  229. func (s *PostgresStore) FindFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  230. files, err = s.findFiles(dirPath, lastFileName, limit)
  231. return files, err
  232. }
  233. var createDirectoryTable = `
  234. CREATE TABLE IF NOT EXISTS %s (
  235. id BIGSERIAL NOT NULL,
  236. directoryRoot VARCHAR(1024) NOT NULL DEFAULT '',
  237. directoryName VARCHAR(1024) NOT NULL DEFAULT '',
  238. CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName)
  239. );
  240. `
  241. var createFileTable = `
  242. CREATE TABLE IF NOT EXISTS %s (
  243. id BIGSERIAL NOT NULL,
  244. directoryPart VARCHAR(1024) NOT NULL DEFAULT '',
  245. filePart VARCHAR(1024) NOT NULL DEFAULT '',
  246. fid VARCHAR(36) NOT NULL DEFAULT '',
  247. createTime BIGINT NOT NULL DEFAULT 0,
  248. updateTime BIGINT NOT NULL DEFAULT 0,
  249. remark VARCHAR(20) NOT NULL DEFAULT '',
  250. status SMALLINT NOT NULL DEFAULT '1',
  251. PRIMARY KEY (id),
  252. CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart)
  253. );
  254. `
  255. func (s *PostgresStore) createDirectoriesTable() error {
  256. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName)
  257. sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName)
  258. stmt, err := s.db.Prepare(sqlCreate)
  259. if err != nil {
  260. return err
  261. }
  262. defer stmt.Close()
  263. _, err = stmt.Exec()
  264. if err != nil {
  265. return err
  266. }
  267. return nil
  268. }
  269. func (s *PostgresStore) createFilesTable() error {
  270. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName)
  271. sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName)
  272. stmt, err := s.db.Prepare(sqlCreate)
  273. if err != nil {
  274. return err
  275. }
  276. defer stmt.Close()
  277. _, err = stmt.Exec()
  278. if err != nil {
  279. return err
  280. }
  281. return nil
  282. }
  283. func (s *PostgresStore) query(uriPath string) (string, error) {
  284. directoryPart, filePart := filepath.Split(uriPath)
  285. sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  286. row := s.db.QueryRow(sqlStatement, directoryPart, filePart)
  287. var fid string
  288. err := row.Scan(&fid)
  289. glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid)
  290. if err != nil {
  291. return "", err
  292. }
  293. return fid, nil
  294. }
  295. func (s *PostgresStore) update(uriPath string, fid string) error {
  296. directoryPart, filePart := filepath.Split(uriPath)
  297. sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName)
  298. glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
  299. res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart)
  300. if err != nil {
  301. return err
  302. }
  303. _, err = res.RowsAffected()
  304. if err != nil {
  305. return err
  306. }
  307. return nil
  308. }
  309. func (s *PostgresStore) insert(uriPath string, fid string) error {
  310. directoryPart, filePart := filepath.Split(uriPath)
  311. existingId, _, _ := s.lookupDirectory(directoryPart)
  312. if existingId == 0 {
  313. s.recursiveInsertDirectory(directoryPart)
  314. }
  315. sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName)
  316. glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
  317. res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix())
  318. if err != nil {
  319. return err
  320. }
  321. rows, err := res.RowsAffected()
  322. if rows != 1 {
  323. return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows)
  324. }
  325. if err != nil {
  326. return err
  327. }
  328. return nil
  329. }
  330. func (s *PostgresStore) recursiveInsertDirectory(dirPath string) {
  331. pathParts := strings.Split(dirPath, "/")
  332. var workingPath string = "/"
  333. for _, part := range pathParts {
  334. if part == "" {
  335. continue
  336. }
  337. workingPath += (part + "/")
  338. existingId, _, _ := s.lookupDirectory(workingPath)
  339. if existingId == 0 {
  340. s.insertDirectory(workingPath)
  341. }
  342. }
  343. }
  344. func (s *PostgresStore) insertDirectory(dirPath string) {
  345. pathParts := strings.Split(dirPath, "/")
  346. directoryRoot := "/"
  347. directoryName := ""
  348. if len(pathParts) > 1 {
  349. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  350. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  351. } else if len(pathParts) == 1 {
  352. directoryRoot = "/"
  353. directoryName = pathParts[0] + "/"
  354. }
  355. sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+
  356. "SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )",
  357. directoriesTableName, directoriesTableName)
  358. glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s",
  359. directoryRoot, directoryName)
  360. _, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName)
  361. if err != nil {
  362. glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s",
  363. directoryRoot, directoryName, err)
  364. }
  365. }
  366. func (s *PostgresStore) delete(uriPath string) error {
  367. directoryPart, filePart := filepath.Split(uriPath)
  368. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  369. glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
  370. res, err := s.db.Exec(sqlStatement, directoryPart, filePart)
  371. if err != nil {
  372. return err
  373. }
  374. _, err = res.RowsAffected()
  375. if err != nil {
  376. return err
  377. }
  378. return nil
  379. }
  380. func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) {
  381. directoryRoot, directoryName := s.mySplitPath(dirPath)
  382. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  383. row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
  384. var id filer.DirectoryId
  385. var dirRoot string
  386. var dirName string
  387. err := row.Scan(&id, &dirRoot, &dirName)
  388. glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName)
  389. if err != nil {
  390. return 0, "", err
  391. }
  392. return id, filepath.Join(dirRoot, dirName), err
  393. }
  394. func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) {
  395. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
  396. rows, err := s.db.Query(sqlStatement, dirPath, limit)
  397. if err != nil {
  398. glog.V(0).Infof("Postgres findDirectories error: %s", err)
  399. }
  400. if rows != nil {
  401. defer rows.Close()
  402. for rows.Next() {
  403. var id filer.DirectoryId
  404. var directoryRoot string
  405. var directoryName string
  406. scanErr := rows.Scan(&id, &directoryRoot, &directoryName)
  407. if scanErr != nil {
  408. err = scanErr
  409. }
  410. dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id})
  411. }
  412. }
  413. return
  414. }
  415. func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool {
  416. if recursive {
  417. return true
  418. }
  419. sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
  420. row := s.db.QueryRow(sqlStatement, dirPath+"%")
  421. var id filer.DirectoryId
  422. err := row.Scan(&id)
  423. if err != nil {
  424. if err == sql.ErrNoRows {
  425. return true
  426. }
  427. }
  428. return false
  429. }
  430. func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) {
  431. pathParts := strings.Split(dirPath, "/")
  432. directoryRoot = "/"
  433. directoryName = ""
  434. if len(pathParts) > 1 {
  435. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  436. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  437. } else if len(pathParts) == 1 {
  438. directoryRoot = "/"
  439. directoryName = pathParts[0] + "/"
  440. }
  441. return directoryRoot, directoryName
  442. }
  443. func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) {
  444. directoryRoot, directoryName := s.mySplitPath(dirPath)
  445. // delete files
  446. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName)
  447. _, err = s.db.Exec(sqlStatement, dirPath)
  448. if err != nil {
  449. return err
  450. }
  451. // delete specific directory if it is empty or recursive delete was requested
  452. safeToDelete := s.safeToDeleteDirectory(dirPath, recursive)
  453. if safeToDelete {
  454. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  455. _, err = s.db.Exec(sqlStatement, directoryRoot, directoryName)
  456. if err != nil {
  457. return err
  458. }
  459. }
  460. if recursive {
  461. // delete descendant files
  462. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName)
  463. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  464. if err != nil {
  465. return err
  466. }
  467. // delete descendant directories
  468. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName)
  469. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  470. if err != nil {
  471. return err
  472. }
  473. }
  474. return err
  475. }
  476. func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  477. var rows *sql.Rows = nil
  478. if lastFileName == "" {
  479. sqlStatement :=
  480. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName)
  481. rows, err = s.db.Query(sqlStatement, dirPath, limit)
  482. } else {
  483. sqlStatement :=
  484. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+
  485. "AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4",
  486. filesTableName, filesTableName)
  487. _, lastFileNameName := filepath.Split(lastFileName)
  488. rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit)
  489. }
  490. if err != nil {
  491. glog.V(0).Infof("Postgres find files error: %s", err)
  492. }
  493. if rows != nil {
  494. defer rows.Close()
  495. for rows.Next() {
  496. var fid filer.FileId
  497. var directoryPart string
  498. var filePart string
  499. scanErr := rows.Scan(&fid, &directoryPart, &filePart)
  500. if scanErr != nil {
  501. err = scanErr
  502. }
  503. files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid})
  504. if len(files) >= limit {
  505. break
  506. }
  507. }
  508. }
  509. glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s",
  510. dirPath, len(files), limit, lastFileName)
  511. return files, err
  512. }