You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

459 lines
13 KiB

7 years ago
  1. package postgres_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. _ "github.com/lib/pq"
  10. _ "path/filepath"
  11. "strings"
  12. )
  13. func databaseExists(db *sql.DB, databaseName string) (bool, error) {
  14. sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
  15. row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
  16. var dbName string
  17. err := row.Scan(&dbName)
  18. if err != nil {
  19. if err == sql.ErrNoRows {
  20. return false, nil
  21. }
  22. return false, err
  23. }
  24. return true, nil
  25. }
  26. func createDatabase(db *sql.DB, databaseName string) error {
  27. sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'"
  28. _, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName))
  29. return err
  30. }
  31. func getDbConnection(conf PostgresConf) *sql.DB {
  32. _init_db.Do(func() {
  33. sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
  34. glog.V(3).Infoln("Opening postgres master database")
  35. var dbErr error
  36. _db_connection, dbErr := sql.Open("postgres", sqlUrl)
  37. if dbErr != nil {
  38. _db_connection.Close()
  39. _db_connection = nil
  40. panic(dbErr)
  41. }
  42. pingErr := _db_connection.Ping()
  43. if pingErr != nil {
  44. _db_connection.Close()
  45. _db_connection = nil
  46. panic(pingErr)
  47. }
  48. glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase)
  49. var existsErr error
  50. dbExists, existsErr := databaseExists(_db_connection, conf.DataBase)
  51. if existsErr != nil {
  52. _db_connection.Close()
  53. _db_connection = nil
  54. panic(existsErr)
  55. }
  56. if !dbExists {
  57. glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase)
  58. createErr := createDatabase(_db_connection, conf.DataBase)
  59. if createErr != nil {
  60. _db_connection.Close()
  61. _db_connection = nil
  62. panic(createErr)
  63. }
  64. }
  65. glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase)
  66. _db_connection.Close()
  67. _db_connection = nil
  68. sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode)
  69. _db_connection, dbErr = sql.Open("postgres", sqlUrl)
  70. if dbErr != nil {
  71. _db_connection.Close()
  72. _db_connection = nil
  73. panic(dbErr)
  74. }
  75. pingErr = _db_connection.Ping()
  76. if pingErr != nil {
  77. _db_connection.Close()
  78. _db_connection = nil
  79. panic(pingErr)
  80. }
  81. var maxIdleConnections, maxOpenConnections int
  82. if conf.MaxIdleConnections != 0 {
  83. maxIdleConnections = conf.MaxIdleConnections
  84. } else {
  85. maxIdleConnections = default_maxIdleConnections
  86. }
  87. if conf.MaxOpenConnections != 0 {
  88. maxOpenConnections = conf.MaxOpenConnections
  89. } else {
  90. maxOpenConnections = default_maxOpenConnections
  91. }
  92. _db_connection.SetMaxIdleConns(maxIdleConnections)
  93. _db_connection.SetMaxOpenConns(maxOpenConnections)
  94. })
  95. return _db_connection
  96. }
  97. var createDirectoryTable = `
  98. CREATE TABLE IF NOT EXISTS %s (
  99. id BIGSERIAL NOT NULL,
  100. directoryRoot VARCHAR(1024) NOT NULL DEFAULT '',
  101. directoryName VARCHAR(1024) NOT NULL DEFAULT '',
  102. CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName)
  103. );
  104. `
  105. var createFileTable = `
  106. CREATE TABLE IF NOT EXISTS %s (
  107. id BIGSERIAL NOT NULL,
  108. directoryPart VARCHAR(1024) NOT NULL DEFAULT '',
  109. filePart VARCHAR(1024) NOT NULL DEFAULT '',
  110. fid VARCHAR(36) NOT NULL DEFAULT '',
  111. createTime BIGINT NOT NULL DEFAULT 0,
  112. updateTime BIGINT NOT NULL DEFAULT 0,
  113. remark VARCHAR(20) NOT NULL DEFAULT '',
  114. status SMALLINT NOT NULL DEFAULT '1',
  115. PRIMARY KEY (id),
  116. CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart)
  117. );
  118. `
  119. func (s *PostgresStore) createDirectoriesTable() error {
  120. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName)
  121. sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName)
  122. stmt, err := s.db.Prepare(sqlCreate)
  123. if err != nil {
  124. return err
  125. }
  126. defer stmt.Close()
  127. _, err = stmt.Exec()
  128. if err != nil {
  129. return err
  130. }
  131. return nil
  132. }
  133. func (s *PostgresStore) createFilesTable() error {
  134. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName)
  135. sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName)
  136. stmt, err := s.db.Prepare(sqlCreate)
  137. if err != nil {
  138. return err
  139. }
  140. defer stmt.Close()
  141. _, err = stmt.Exec()
  142. if err != nil {
  143. return err
  144. }
  145. return nil
  146. }
  147. func (s *PostgresStore) query(uriPath string) (string, error) {
  148. directoryPart, filePart := filepath.Split(uriPath)
  149. sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  150. row := s.db.QueryRow(sqlStatement, directoryPart, filePart)
  151. var fid string
  152. err := row.Scan(&fid)
  153. glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid)
  154. if err != nil {
  155. return "", err
  156. }
  157. return fid, nil
  158. }
  159. func (s *PostgresStore) update(uriPath string, fid string) error {
  160. directoryPart, filePart := filepath.Split(uriPath)
  161. sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName)
  162. glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
  163. res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart)
  164. if err != nil {
  165. return err
  166. }
  167. _, err = res.RowsAffected()
  168. if err != nil {
  169. return err
  170. }
  171. return nil
  172. }
  173. func (s *PostgresStore) insert(uriPath string, fid string) error {
  174. directoryPart, filePart := filepath.Split(uriPath)
  175. existingId, _, _ := s.lookupDirectory(directoryPart)
  176. if existingId == 0 {
  177. s.recursiveInsertDirectory(directoryPart)
  178. }
  179. sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName)
  180. glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
  181. res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix())
  182. if err != nil {
  183. return err
  184. }
  185. rows, err := res.RowsAffected()
  186. if rows != 1 {
  187. return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows)
  188. }
  189. if err != nil {
  190. return err
  191. }
  192. return nil
  193. }
  194. func (s *PostgresStore) recursiveInsertDirectory(dirPath string) {
  195. pathParts := strings.Split(dirPath, "/")
  196. var workingPath string = "/"
  197. for _, part := range pathParts {
  198. if part == "" {
  199. continue
  200. }
  201. workingPath += (part + "/")
  202. existingId, _, _ := s.lookupDirectory(workingPath)
  203. if existingId == 0 {
  204. s.insertDirectory(workingPath)
  205. }
  206. }
  207. }
  208. func (s *PostgresStore) insertDirectory(dirPath string) {
  209. pathParts := strings.Split(dirPath, "/")
  210. directoryRoot := "/"
  211. directoryName := ""
  212. if len(pathParts) > 1 {
  213. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  214. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  215. } else if len(pathParts) == 1 {
  216. directoryRoot = "/"
  217. directoryName = pathParts[0] + "/"
  218. }
  219. sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+
  220. "SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )",
  221. directoriesTableName, directoriesTableName)
  222. glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s",
  223. directoryRoot, directoryName)
  224. _, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName)
  225. if err != nil {
  226. glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s",
  227. directoryRoot, directoryName, err)
  228. }
  229. }
  230. func (s *PostgresStore) delete(uriPath string) error {
  231. directoryPart, filePart := filepath.Split(uriPath)
  232. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  233. glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
  234. res, err := s.db.Exec(sqlStatement, directoryPart, filePart)
  235. if err != nil {
  236. return err
  237. }
  238. _, err = res.RowsAffected()
  239. if err != nil {
  240. return err
  241. }
  242. return nil
  243. }
  244. func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) {
  245. directoryRoot, directoryName := s.mySplitPath(dirPath)
  246. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  247. row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
  248. var id filer.DirectoryId
  249. var dirRoot string
  250. var dirName string
  251. err := row.Scan(&id, &dirRoot, &dirName)
  252. glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName)
  253. if err != nil {
  254. return 0, "", err
  255. }
  256. return id, filepath.Join(dirRoot, dirName), err
  257. }
  258. func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) {
  259. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
  260. rows, err := s.db.Query(sqlStatement, dirPath, limit)
  261. if err != nil {
  262. glog.V(0).Infof("Postgres findDirectories error: %s", err)
  263. }
  264. if rows != nil {
  265. defer rows.Close()
  266. for rows.Next() {
  267. var id filer.DirectoryId
  268. var directoryRoot string
  269. var directoryName string
  270. scanErr := rows.Scan(&id, &directoryRoot, &directoryName)
  271. if scanErr != nil {
  272. err = scanErr
  273. }
  274. dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id})
  275. }
  276. }
  277. return
  278. }
  279. func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool {
  280. if recursive {
  281. return true
  282. }
  283. sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
  284. row := s.db.QueryRow(sqlStatement, dirPath+"%")
  285. var id filer.DirectoryId
  286. err := row.Scan(&id)
  287. if err != nil {
  288. if err == sql.ErrNoRows {
  289. return true
  290. }
  291. }
  292. return false
  293. }
  294. func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) {
  295. pathParts := strings.Split(dirPath, "/")
  296. directoryRoot = "/"
  297. directoryName = ""
  298. if len(pathParts) > 1 {
  299. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  300. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  301. } else if len(pathParts) == 1 {
  302. directoryRoot = "/"
  303. directoryName = pathParts[0] + "/"
  304. }
  305. return directoryRoot, directoryName
  306. }
  307. func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) {
  308. directoryRoot, directoryName := s.mySplitPath(dirPath)
  309. // delete files
  310. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName)
  311. _, err = s.db.Exec(sqlStatement, dirPath)
  312. if err != nil {
  313. return err
  314. }
  315. // delete specific directory if it is empty or recursive delete was requested
  316. safeToDelete := s.safeToDeleteDirectory(dirPath, recursive)
  317. if safeToDelete {
  318. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  319. _, err = s.db.Exec(sqlStatement, directoryRoot, directoryName)
  320. if err != nil {
  321. return err
  322. }
  323. }
  324. if recursive {
  325. // delete descendant files
  326. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName)
  327. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  328. if err != nil {
  329. return err
  330. }
  331. // delete descendant directories
  332. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName)
  333. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  334. if err != nil {
  335. return err
  336. }
  337. }
  338. return err
  339. }
  340. func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  341. var rows *sql.Rows = nil
  342. if lastFileName == "" {
  343. sqlStatement :=
  344. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName)
  345. rows, err = s.db.Query(sqlStatement, dirPath, limit)
  346. } else {
  347. sqlStatement :=
  348. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+
  349. "AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4",
  350. filesTableName, filesTableName)
  351. _, lastFileNameName := filepath.Split(lastFileName)
  352. rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit)
  353. }
  354. if err != nil {
  355. glog.V(0).Infof("Postgres find files error: %s", err)
  356. }
  357. if rows != nil {
  358. defer rows.Close()
  359. for rows.Next() {
  360. var fid filer.FileId
  361. var directoryPart string
  362. var filePart string
  363. scanErr := rows.Scan(&fid, &directoryPart, &filePart)
  364. if scanErr != nil {
  365. err = scanErr
  366. }
  367. files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid})
  368. if len(files) >= limit {
  369. break
  370. }
  371. }
  372. }
  373. glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s",
  374. dirPath, len(files), limit, lastFileName)
  375. return files, err
  376. }