You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
9.8 KiB

  1. package postgres_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "hash/crc32"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. _ "github.com/lib/pq"
  10. )
  11. const (
  12. default_maxIdleConnections = 100
  13. default_maxOpenConnections = 50
  14. default_maxTableNums = 1024
  15. tableName = "filer_mapping"
  16. )
  17. var (
  18. _init_db sync.Once
  19. _db_connections []*sql.DB
  20. )
  21. type PostgresConf struct {
  22. User string
  23. Password string
  24. HostName string
  25. Port int
  26. DataBase string
  27. SslMode string
  28. MaxIdleConnections int
  29. MaxOpenConnections int
  30. }
  31. type ShardingConf struct {
  32. IsSharding bool `json:"isSharding"`
  33. ShardCount int `json:"shardCount"`
  34. }
  35. type PostgresStore struct {
  36. dbs []*sql.DB
  37. isSharding bool
  38. shardCount int
  39. server string
  40. user string
  41. password string
  42. }
  43. func databaseExists(db *sql.DB, databaseName string) (bool, error) {
  44. sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
  45. row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
  46. var dbName string
  47. err := row.Scan(&dbName)
  48. if err != nil {
  49. if err == sql.ErrNoRows {
  50. return false, nil
  51. }
  52. return false, err
  53. }
  54. return true, nil
  55. }
  56. func createDatabase(db *sql.DB, databaseName string) (error) {
  57. sqlStatement := "CREATE DATABASE %s ENCODING='UTF8'";
  58. _, err := db.Exec(fmt.Sprintf(sqlStatement, databaseName))
  59. return err
  60. }
  61. func getDbConnection(confs []PostgresConf) []*sql.DB {
  62. _init_db.Do(func() {
  63. for _, conf := range confs {
  64. sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
  65. glog.V(3).Infoln("Opening postgres master database")
  66. var dbErr error
  67. _db_connection, dbErr := sql.Open("postgres", sqlUrl)
  68. if dbErr != nil {
  69. _db_connection.Close()
  70. _db_connection = nil
  71. panic(dbErr)
  72. }
  73. pingErr := _db_connection.Ping()
  74. if pingErr != nil {
  75. _db_connection.Close()
  76. _db_connection = nil
  77. panic(pingErr)
  78. }
  79. glog.V(3).Infoln("Checking to see if DB exists: ", conf.DataBase)
  80. var existsErr error
  81. dbExists, existsErr := databaseExists(_db_connection, conf.DataBase)
  82. if existsErr != nil {
  83. _db_connection.Close()
  84. _db_connection = nil
  85. panic(existsErr)
  86. }
  87. if !dbExists {
  88. glog.V(3).Infoln("Database doesn't exist. Attempting to create one: ", conf.DataBase)
  89. createErr := createDatabase(_db_connection, conf.DataBase)
  90. if createErr != nil {
  91. _db_connection.Close()
  92. _db_connection = nil
  93. panic(createErr)
  94. }
  95. }
  96. glog.V(3).Infoln("Closing master postgres database and opening configured database: ", conf.DataBase)
  97. _db_connection.Close()
  98. _db_connection = nil
  99. sqlUrl = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=60 fallback_application_name=filestore", conf.HostName, conf.Port, conf.User, conf.Password, conf.DataBase, conf.SslMode)
  100. _db_connection, dbErr = sql.Open("postgres", sqlUrl)
  101. if dbErr != nil {
  102. _db_connection.Close()
  103. _db_connection = nil
  104. panic(dbErr)
  105. }
  106. pingErr = _db_connection.Ping()
  107. if pingErr != nil {
  108. _db_connection.Close()
  109. _db_connection = nil
  110. panic(pingErr)
  111. }
  112. var maxIdleConnections, maxOpenConnections int
  113. if conf.MaxIdleConnections != 0 {
  114. maxIdleConnections = conf.MaxIdleConnections
  115. } else {
  116. maxIdleConnections = default_maxIdleConnections
  117. }
  118. if conf.MaxOpenConnections != 0 {
  119. maxOpenConnections = conf.MaxOpenConnections
  120. } else {
  121. maxOpenConnections = default_maxOpenConnections
  122. }
  123. _db_connection.SetMaxIdleConns(maxIdleConnections)
  124. _db_connection.SetMaxOpenConns(maxOpenConnections)
  125. _db_connections = append(_db_connections, _db_connection)
  126. }
  127. })
  128. return _db_connections
  129. }
  130. func NewPostgresStore(confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
  131. pg := &PostgresStore{
  132. dbs: getDbConnection(confs),
  133. isSharding: isSharding,
  134. shardCount: shardCount,
  135. }
  136. for _, db := range pg.dbs {
  137. if !isSharding {
  138. pg.shardCount = 1
  139. } else {
  140. if pg.shardCount == 0 {
  141. pg.shardCount = default_maxTableNums
  142. }
  143. }
  144. for i := 0; i < pg.shardCount; i++ {
  145. if err := pg.createTables(db, tableName, i); err != nil {
  146. fmt.Printf("create table failed %v", err)
  147. }
  148. }
  149. }
  150. return pg
  151. }
  152. func (s *PostgresStore) hash(fullFileName string) (instance_offset, table_postfix int) {
  153. hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
  154. instance_offset = int(hash_value) % len(s.dbs)
  155. table_postfix = int(hash_value) % s.shardCount
  156. return
  157. }
  158. func (s *PostgresStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
  159. instance_offset, table_postfix := s.hash(path)
  160. instanceId = instance_offset
  161. if s.isSharding {
  162. tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
  163. } else {
  164. tableFullName = tableName
  165. }
  166. return
  167. }
  168. func (s *PostgresStore) Get(fullFilePath string) (fid string, err error) {
  169. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  170. if err != nil {
  171. return "", fmt.Errorf("PostgresStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
  172. }
  173. fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
  174. return fid, err
  175. }
  176. func (s *PostgresStore) Put(fullFilePath string, fid string) (err error) {
  177. var tableFullName string
  178. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  179. if err != nil {
  180. return fmt.Errorf("PostgresStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
  181. }
  182. var old_fid string
  183. if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
  184. return fmt.Errorf("PostgresStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
  185. } else {
  186. if len(old_fid) == 0 {
  187. err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  188. if err != nil {
  189. return fmt.Errorf("PostgresStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
  190. }
  191. } else {
  192. err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  193. if err != nil {
  194. return fmt.Errorf("PostgresStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
  195. }
  196. }
  197. }
  198. return
  199. }
  200. func (s *PostgresStore) Delete(fullFilePath string) (err error) {
  201. var fid string
  202. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  203. if err != nil {
  204. return fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
  205. }
  206. if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  207. return fmt.Errorf("PostgresStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
  208. } else if fid == "" {
  209. return nil
  210. }
  211. if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  212. return fmt.Errorf("PostgresStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
  213. } else {
  214. return nil
  215. }
  216. }
  217. func (s *PostgresStore) Close() {
  218. for _, db := range s.dbs {
  219. db.Close()
  220. }
  221. }
  222. var createTable = `
  223. CREATE TABLE IF NOT EXISTS %s (
  224. id BIGSERIAL NOT NULL,
  225. uriPath VARCHAR(1024) NOT NULL DEFAULT '',
  226. fid VARCHAR(36) NOT NULL DEFAULT '',
  227. createTime BIGINT NOT NULL DEFAULT 0,
  228. updateTime BIGINT NOT NULL DEFAULT 0,
  229. remark VARCHAR(20) NOT NULL DEFAULT '',
  230. status SMALLINT NOT NULL DEFAULT '1',
  231. PRIMARY KEY (id),
  232. CONSTRAINT %s_index_uriPath UNIQUE (uriPath)
  233. );
  234. `
  235. func (s *PostgresStore) createTables(db *sql.DB, tableName string, postfix int) error {
  236. var realTableName string
  237. if s.isSharding {
  238. realTableName = fmt.Sprintf("%s_%04d", tableName, postfix)
  239. } else {
  240. realTableName = tableName
  241. }
  242. glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", realTableName)
  243. sqlCreate := fmt.Sprintf(createTable, realTableName, realTableName)
  244. stmt, err := db.Prepare(sqlCreate)
  245. if err != nil {
  246. return err
  247. }
  248. defer stmt.Close()
  249. _, err = stmt.Exec()
  250. if err != nil {
  251. return err
  252. }
  253. return nil
  254. }
  255. func (s *PostgresStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
  256. sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE uriPath=$1", tableName)
  257. row := db.QueryRow(sqlStatement, uriPath)
  258. var fid string
  259. err := row.Scan(&fid)
  260. glog.V(3).Infof("Postgres query -- looking up path '%s' and found id '%s' ", uriPath, fid)
  261. if err != nil {
  262. return "", err
  263. }
  264. return fid, nil
  265. }
  266. func (s *PostgresStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
  267. sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE uriPath=$3", tableName)
  268. glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
  269. res, err := db.Exec(sqlStatement, fid, time.Now().Unix(), uriPath)
  270. if err != nil {
  271. return err
  272. }
  273. _, err = res.RowsAffected()
  274. if err != nil {
  275. return err
  276. }
  277. return nil
  278. }
  279. func (s *PostgresStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
  280. sqlStatement := fmt.Sprintf("INSERT INTO %s (uriPath,fid,createTime) VALUES($1, $2, $3)", tableName)
  281. glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
  282. res, err := db.Exec(sqlStatement, uriPath, fid, time.Now().Unix())
  283. if err != nil {
  284. return err
  285. }
  286. rows, err := res.RowsAffected()
  287. if rows != 1 {
  288. return fmt.Errorf("Postgres insert -- rows affected = %d. Expecting 1", rows)
  289. }
  290. if err != nil {
  291. return err
  292. }
  293. return nil
  294. }
  295. func (s *PostgresStore) delete(uriPath string, db *sql.DB, tableName string) error {
  296. sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE uriPath=$1", tableName)
  297. glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
  298. res, err := db.Exec(sqlStatement, uriPath)
  299. if err != nil {
  300. return err
  301. }
  302. _, err = res.RowsAffected()
  303. if err != nil {
  304. return err
  305. }
  306. return nil
  307. }