You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

454 lines
13 KiB

7 years ago
7 years ago
  1. package postgres_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. _ "github.com/lib/pq"
  10. _ "path/filepath"
  11. "strings"
  12. )
  13. func databaseExists(db *sql.DB, databaseName string) (bool, error) {
  14. sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
  15. row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
  16. var dbName string
  17. err := row.Scan(&dbName)
  18. if err != nil {
  19. if err == sql.ErrNoRows {
  20. return false, nil
  21. }
  22. return false, err
  23. }
  24. return true, nil
  25. }
  26. func createDatabase(db *sql.DB, databaseName string) error {
  27. sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'"
  28. _, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName))
  29. return err
  30. }
  31. func getDbConnection(conf PostgresConf) *sql.DB {
  32. _init_db.Do(func() {
  33. sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
  34. glog.V(3).Infoln("Opening postgres master database")
  35. var dbErr error
  36. _db_connection, dbErr := sql.Open("postgres", sqlUrl)
  37. if dbErr != nil {
  38. _db_connection.Close()
  39. _db_connection = nil
  40. panic(dbErr)
  41. }
  42. pingErr := _db_connection.Ping()
  43. if pingErr != nil {
  44. _db_connection.Close()
  45. _db_connection = nil
  46. panic(pingErr)
  47. }
  48. glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase)
  49. var existsErr error
  50. dbExists, existsErr := databaseExists(_db_connection, conf.DataBase)
  51. if existsErr != nil {
  52. _db_connection.Close()
  53. _db_connection = nil
  54. panic(existsErr)
  55. }
  56. if !dbExists {
  57. glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase)
  58. createErr := createDatabase(_db_connection, conf.DataBase)
  59. if createErr != nil {
  60. _db_connection.Close()
  61. _db_connection = nil
  62. panic(createErr)
  63. }
  64. }
  65. glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase)
  66. _db_connection.Close()
  67. _db_connection = nil
  68. sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode)
  69. _db_connection, dbErr = sql.Open("postgres", sqlUrl)
  70. if dbErr != nil {
  71. _db_connection.Close()
  72. _db_connection = nil
  73. panic(dbErr)
  74. }
  75. pingErr = _db_connection.Ping()
  76. if pingErr != nil {
  77. _db_connection.Close()
  78. _db_connection = nil
  79. panic(pingErr)
  80. }
  81. maxIdleConnections, maxOpenConnections := default_maxIdleConnections, default_maxOpenConnections
  82. if conf.MaxIdleConnections != 0 {
  83. maxIdleConnections = conf.MaxIdleConnections
  84. }
  85. if conf.MaxOpenConnections != 0 {
  86. maxOpenConnections = conf.MaxOpenConnections
  87. }
  88. _db_connection.SetMaxIdleConns(maxIdleConnections)
  89. _db_connection.SetMaxOpenConns(maxOpenConnections)
  90. })
  91. return _db_connection
  92. }
  93. var createDirectoryTable = `
  94. CREATE TABLE IF NOT EXISTS %s (
  95. id BIGSERIAL NOT NULL,
  96. directoryRoot VARCHAR(1024) NOT NULL DEFAULT '',
  97. directoryName VARCHAR(1024) NOT NULL DEFAULT '',
  98. CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName)
  99. );
  100. `
  101. var createFileTable = `
  102. CREATE TABLE IF NOT EXISTS %s (
  103. id BIGSERIAL NOT NULL,
  104. directoryPart VARCHAR(1024) NOT NULL DEFAULT '',
  105. filePart VARCHAR(1024) NOT NULL DEFAULT '',
  106. fid VARCHAR(36) NOT NULL DEFAULT '',
  107. createTime BIGINT NOT NULL DEFAULT 0,
  108. updateTime BIGINT NOT NULL DEFAULT 0,
  109. remark VARCHAR(20) NOT NULL DEFAULT '',
  110. status SMALLINT NOT NULL DEFAULT '1',
  111. PRIMARY KEY (id),
  112. CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart)
  113. );
  114. `
  115. func (s *PostgresStore) createDirectoriesTable() error {
  116. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName)
  117. sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName)
  118. stmt, err := s.db.Prepare(sqlCreate)
  119. if err != nil {
  120. return err
  121. }
  122. defer stmt.Close()
  123. _, err = stmt.Exec()
  124. if err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. func (s *PostgresStore) createFilesTable() error {
  130. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName)
  131. sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName)
  132. stmt, err := s.db.Prepare(sqlCreate)
  133. if err != nil {
  134. return err
  135. }
  136. defer stmt.Close()
  137. _, err = stmt.Exec()
  138. if err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. func (s *PostgresStore) query(uriPath string) (string, error) {
  144. directoryPart, filePart := filepath.Split(uriPath)
  145. sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  146. row := s.db.QueryRow(sqlStatement, directoryPart, filePart)
  147. var fid string
  148. err := row.Scan(&fid)
  149. glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid)
  150. if err != nil {
  151. return "", err
  152. }
  153. return fid, nil
  154. }
  155. func (s *PostgresStore) update(uriPath string, fid string) error {
  156. directoryPart, filePart := filepath.Split(uriPath)
  157. sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName)
  158. glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
  159. res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart)
  160. if err != nil {
  161. return err
  162. }
  163. _, err = res.RowsAffected()
  164. if err != nil {
  165. return err
  166. }
  167. return nil
  168. }
  169. func (s *PostgresStore) insert(uriPath string, fid string) error {
  170. directoryPart, filePart := filepath.Split(uriPath)
  171. existingId, _, _ := s.lookupDirectory(directoryPart)
  172. if existingId == 0 {
  173. s.recursiveInsertDirectory(directoryPart)
  174. }
  175. sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName)
  176. glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
  177. res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix())
  178. if err != nil {
  179. return err
  180. }
  181. rows, err := res.RowsAffected()
  182. if rows != 1 {
  183. return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows)
  184. }
  185. if err != nil {
  186. return err
  187. }
  188. return nil
  189. }
  190. func (s *PostgresStore) recursiveInsertDirectory(dirPath string) {
  191. pathParts := strings.Split(dirPath, "/")
  192. var workingPath string = "/"
  193. for _, part := range pathParts {
  194. if part == "" {
  195. continue
  196. }
  197. workingPath += (part + "/")
  198. existingId, _, _ := s.lookupDirectory(workingPath)
  199. if existingId == 0 {
  200. s.insertDirectory(workingPath)
  201. }
  202. }
  203. }
  204. func (s *PostgresStore) insertDirectory(dirPath string) {
  205. pathParts := strings.Split(dirPath, "/")
  206. directoryRoot := "/"
  207. directoryName := ""
  208. if len(pathParts) > 1 {
  209. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  210. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  211. } else if len(pathParts) == 1 {
  212. directoryRoot = "/"
  213. directoryName = pathParts[0] + "/"
  214. }
  215. sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+
  216. "SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )",
  217. directoriesTableName, directoriesTableName)
  218. glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s",
  219. directoryRoot, directoryName)
  220. _, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName)
  221. if err != nil {
  222. glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s",
  223. directoryRoot, directoryName, err)
  224. }
  225. }
  226. func (s *PostgresStore) delete(uriPath string) error {
  227. directoryPart, filePart := filepath.Split(uriPath)
  228. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
  229. glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
  230. res, err := s.db.Exec(sqlStatement, directoryPart, filePart)
  231. if err != nil {
  232. return err
  233. }
  234. _, err = res.RowsAffected()
  235. if err != nil {
  236. return err
  237. }
  238. return nil
  239. }
  240. func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) {
  241. directoryRoot, directoryName := s.mySplitPath(dirPath)
  242. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  243. row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
  244. var id filer.DirectoryId
  245. var dirRoot string
  246. var dirName string
  247. err := row.Scan(&id, &dirRoot, &dirName)
  248. glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName)
  249. if err != nil {
  250. return 0, "", err
  251. }
  252. return id, filepath.Join(dirRoot, dirName), err
  253. }
  254. func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) {
  255. sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
  256. rows, err := s.db.Query(sqlStatement, dirPath, limit)
  257. if err != nil {
  258. glog.V(0).Infof("Postgres findDirectories error: %s", err)
  259. }
  260. if rows != nil {
  261. defer rows.Close()
  262. for rows.Next() {
  263. var id filer.DirectoryId
  264. var directoryRoot string
  265. var directoryName string
  266. scanErr := rows.Scan(&id, &directoryRoot, &directoryName)
  267. if scanErr != nil {
  268. err = scanErr
  269. }
  270. dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id})
  271. }
  272. }
  273. return
  274. }
  275. func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool {
  276. if recursive {
  277. return true
  278. }
  279. sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
  280. row := s.db.QueryRow(sqlStatement, dirPath+"%")
  281. var id filer.DirectoryId
  282. err := row.Scan(&id)
  283. if err != nil {
  284. if err == sql.ErrNoRows {
  285. return true
  286. }
  287. }
  288. return false
  289. }
  290. func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) {
  291. pathParts := strings.Split(dirPath, "/")
  292. directoryRoot = "/"
  293. directoryName = ""
  294. if len(pathParts) > 1 {
  295. directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
  296. directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
  297. } else if len(pathParts) == 1 {
  298. directoryRoot = "/"
  299. directoryName = pathParts[0] + "/"
  300. }
  301. return directoryRoot, directoryName
  302. }
  303. func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) {
  304. directoryRoot, directoryName := s.mySplitPath(dirPath)
  305. // delete files
  306. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName)
  307. _, err = s.db.Exec(sqlStatement, dirPath)
  308. if err != nil {
  309. return err
  310. }
  311. // delete specific directory if it is empty or recursive delete was requested
  312. safeToDelete := s.safeToDeleteDirectory(dirPath, recursive)
  313. if safeToDelete {
  314. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
  315. _, err = s.db.Exec(sqlStatement, directoryRoot, directoryName)
  316. if err != nil {
  317. return err
  318. }
  319. }
  320. if recursive {
  321. // delete descendant files
  322. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName)
  323. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  324. if err != nil {
  325. return err
  326. }
  327. // delete descendant directories
  328. sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName)
  329. _, err = s.db.Exec(sqlStatement, dirPath+"%")
  330. if err != nil {
  331. return err
  332. }
  333. }
  334. return err
  335. }
  336. func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
  337. var rows *sql.Rows = nil
  338. if lastFileName == "" {
  339. sqlStatement :=
  340. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName)
  341. rows, err = s.db.Query(sqlStatement, dirPath, limit)
  342. } else {
  343. sqlStatement :=
  344. fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+
  345. "AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4",
  346. filesTableName, filesTableName)
  347. _, lastFileNameName := filepath.Split(lastFileName)
  348. rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit)
  349. }
  350. if err != nil {
  351. glog.V(0).Infof("Postgres find files error: %s", err)
  352. }
  353. if rows != nil {
  354. defer rows.Close()
  355. for rows.Next() {
  356. var fid filer.FileId
  357. var directoryPart string
  358. var filePart string
  359. scanErr := rows.Scan(&fid, &directoryPart, &filePart)
  360. if scanErr != nil {
  361. err = scanErr
  362. }
  363. files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid})
  364. if len(files) >= limit {
  365. break
  366. }
  367. }
  368. }
  369. glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s",
  370. dirPath, len(files), limit, lastFileName)
  371. return files, err
  372. }