You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
7.3 KiB

  1. package mysql_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "hash/crc32"
  6. "sync"
  7. "time"
  8. _ "github.com/go-sql-driver/mysql"
  9. )
  10. const (
  11. sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
  12. default_maxIdleConnections = 100
  13. default_maxOpenConnections = 50
  14. default_maxTableNums = 1024
  15. tableName = "filer_mapping"
  16. )
  17. var (
  18. _init_db sync.Once
  19. _db_connections []*sql.DB
  20. )
  21. type MySqlConf struct {
  22. User string
  23. Password string
  24. HostName string
  25. Port int
  26. DataBase string
  27. MaxIdleConnections int
  28. MaxOpenConnections int
  29. }
  30. type ShardingConf struct {
  31. IsSharding bool `json:"isSharding"`
  32. ShardingNum int `json:"shardingNum"`
  33. }
  34. type MySqlStore struct {
  35. dbs []*sql.DB
  36. isSharding bool
  37. shardingNum int
  38. }
  39. func getDbConnection(confs []MySqlConf) []*sql.DB {
  40. _init_db.Do(func() {
  41. for _, conf := range confs {
  42. sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
  43. var dbErr error
  44. _db_connection, dbErr := sql.Open("mysql", sqlUrl)
  45. if dbErr != nil {
  46. _db_connection.Close()
  47. _db_connection = nil
  48. panic(dbErr)
  49. }
  50. var maxIdleConnections, maxOpenConnections int
  51. if conf.MaxIdleConnections != 0 {
  52. maxIdleConnections = conf.MaxIdleConnections
  53. } else {
  54. maxIdleConnections = default_maxIdleConnections
  55. }
  56. if conf.MaxOpenConnections != 0 {
  57. maxOpenConnections = conf.MaxOpenConnections
  58. } else {
  59. maxOpenConnections = default_maxOpenConnections
  60. }
  61. _db_connection.SetMaxIdleConns(maxIdleConnections)
  62. _db_connection.SetMaxOpenConns(maxOpenConnections)
  63. _db_connections = append(_db_connections, _db_connection)
  64. }
  65. })
  66. return _db_connections
  67. }
  68. func NewMysqlStore(confs []MySqlConf, isSharding bool, shardingNum int) *MySqlStore {
  69. ms := &MySqlStore{
  70. dbs: getDbConnection(confs),
  71. isSharding: isSharding,
  72. shardingNum: shardingNum,
  73. }
  74. for _, db := range ms.dbs {
  75. if !isSharding {
  76. ms.shardingNum = 1
  77. } else {
  78. if ms.shardingNum == 0 {
  79. ms.shardingNum = default_maxTableNums
  80. }
  81. }
  82. for i := 0; i < ms.shardingNum; i++ {
  83. if err := ms.createTables(db, tableName, i); err != nil {
  84. fmt.Printf("create table failed %v", err)
  85. }
  86. }
  87. }
  88. return ms
  89. }
  90. func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
  91. hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
  92. instance_offset = int(hash_value) % len(s.dbs)
  93. table_postfix = int(hash_value) % s.shardingNum
  94. return
  95. }
  96. func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
  97. instance_offset, table_postfix := s.hash(path)
  98. instanceId = instance_offset
  99. if s.isSharding {
  100. tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
  101. } else {
  102. tableFullName = tableName
  103. }
  104. return
  105. }
  106. func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
  107. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  108. if err != nil {
  109. return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
  110. }
  111. fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
  112. if err == sql.ErrNoRows {
  113. //Could not found
  114. err = nil
  115. }
  116. return fid, err
  117. }
  118. func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
  119. var tableFullName string
  120. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  121. if err != nil {
  122. return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
  123. }
  124. var old_fid string
  125. if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
  126. return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
  127. } else {
  128. if len(old_fid) == 0 {
  129. err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  130. err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
  131. } else {
  132. err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  133. err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
  134. }
  135. }
  136. return
  137. }
  138. func (s *MySqlStore) Delete(fullFilePath string) (err error) {
  139. var fid string
  140. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  141. if err != nil {
  142. return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
  143. }
  144. if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  145. return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
  146. } else if fid == "" {
  147. return nil
  148. }
  149. if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  150. return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
  151. } else {
  152. return nil
  153. }
  154. }
  155. func (s *MySqlStore) Close() {
  156. for _, db := range s.dbs {
  157. db.Close()
  158. }
  159. }
  160. var createTable = `
  161. CREATE TABLE IF NOT EXISTS %s (
  162. id bigint(20) NOT NULL AUTO_INCREMENT,
  163. uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
  164. fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
  165. createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
  166. updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
  167. remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
  168. status tinyint(2) DEFAULT '1' COMMENT 'resource status',
  169. PRIMARY KEY (id),
  170. UNIQUE KEY index_uriPath (uriPath)
  171. ) DEFAULT CHARSET=utf8;
  172. `
  173. func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
  174. var realTableName string
  175. if s.isSharding {
  176. realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
  177. } else {
  178. realTableName = tableName
  179. }
  180. stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
  181. if err != nil {
  182. return err
  183. }
  184. defer stmt.Close()
  185. _, err = stmt.Exec()
  186. if err != nil {
  187. return err
  188. }
  189. return nil
  190. }
  191. func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
  192. sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
  193. row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
  194. var fid string
  195. err := row.Scan(&fid)
  196. if err != nil {
  197. return "", err
  198. }
  199. return fid, nil
  200. }
  201. func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
  202. sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
  203. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
  204. if err != nil {
  205. return err
  206. }
  207. _, err = res.RowsAffected()
  208. if err != nil {
  209. return err
  210. }
  211. return nil
  212. }
  213. func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
  214. sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
  215. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
  216. if err != nil {
  217. return err
  218. }
  219. _, err = res.RowsAffected()
  220. if err != nil {
  221. return err
  222. }
  223. return nil
  224. }
  225. func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
  226. sqlStatement := "DELETE FROM %s WHERE uriPath=?"
  227. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
  228. if err != nil {
  229. return err
  230. }
  231. _, err = res.RowsAffected()
  232. if err != nil {
  233. return err
  234. }
  235. return nil
  236. }