You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
5.5 KiB

  1. package mysql_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "hash/crc32"
  6. "sync"
  7. "time"
  8. _ "github.com/go-sql-driver/mysql"
  9. )
  10. const (
  11. sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
  12. maxIdleConnections = 100
  13. maxOpenConnections = 50
  14. maxTableNums = 1024
  15. tableName = "filer_mapping"
  16. )
  17. var (
  18. _init_db sync.Once
  19. _db_connections []*sql.DB
  20. )
  21. type MySqlConf struct {
  22. User string
  23. Password string
  24. HostName string
  25. Port int
  26. DataBase string
  27. }
  28. type MySqlStore struct {
  29. dbs []*sql.DB
  30. }
  31. func getDbConnection(confs []MySqlConf) []*sql.DB {
  32. _init_db.Do(func() {
  33. for _, conf := range confs {
  34. sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
  35. var dbErr error
  36. _db_connection, dbErr := sql.Open("mysql", sqlUrl)
  37. if dbErr != nil {
  38. _db_connection.Close()
  39. _db_connection = nil
  40. panic(dbErr)
  41. }
  42. _db_connection.SetMaxIdleConns(maxIdleConnections)
  43. _db_connection.SetMaxOpenConns(maxOpenConnections)
  44. _db_connections = append(_db_connections, _db_connection)
  45. }
  46. })
  47. return _db_connections
  48. }
  49. func NewMysqlStore(confs []MySqlConf) *MySqlStore {
  50. ms := &MySqlStore{
  51. dbs: getDbConnection(confs),
  52. }
  53. for _, db := range ms.dbs {
  54. for i := 0; i < maxTableNums; i++ {
  55. if err := ms.createTables(db, tableName, i); err != nil {
  56. fmt.Printf("create table failed %s", err.Error())
  57. }
  58. }
  59. }
  60. return ms
  61. }
  62. func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
  63. hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
  64. instance_offset = int(hash_value) % len(s.dbs)
  65. table_postfix = int(hash_value) % maxTableNums
  66. return
  67. }
  68. func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
  69. instance_offset, table_postfix := s.hash(path)
  70. instanceId = instance_offset
  71. tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
  72. return
  73. }
  74. func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
  75. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  76. if err != nil {
  77. return "", err
  78. }
  79. fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
  80. if err == sql.ErrNoRows {
  81. //Could not found
  82. err = nil
  83. }
  84. return fid, err
  85. }
  86. func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
  87. var tableFullName string
  88. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  89. if err != nil {
  90. return err
  91. }
  92. if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows {
  93. err = localErr
  94. return
  95. } else {
  96. if len(old_fid) == 0 {
  97. err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  98. } else {
  99. err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  100. }
  101. }
  102. return
  103. }
  104. func (s *MySqlStore) Delete(fullFilePath string) (err error) {
  105. var fid string
  106. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  107. if err != nil {
  108. return err
  109. }
  110. if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  111. return err
  112. } else if fid == "" {
  113. return nil
  114. }
  115. if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  116. return err
  117. } else {
  118. return nil
  119. }
  120. }
  121. func (s *MySqlStore) Close() {
  122. for _, db := range s.dbs {
  123. db.Close()
  124. }
  125. }
  126. var createTable = `
  127. CREATE TABLE IF NOT EXISTS %s_%04d (
  128. id bigint(20) NOT NULL AUTO_INCREMENT,
  129. uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
  130. fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
  131. createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
  132. updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
  133. remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
  134. status tinyint(2) DEFAULT '1' COMMENT 'resource status',
  135. PRIMARY KEY (id),
  136. UNIQUE KEY index_uriPath (uriPath)
  137. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  138. `
  139. func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
  140. stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix))
  141. if err != nil {
  142. return err
  143. }
  144. defer stmt.Close()
  145. _, err = stmt.Exec()
  146. if err != nil {
  147. return err
  148. }
  149. return nil
  150. }
  151. func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
  152. sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
  153. row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
  154. var fid string
  155. err := row.Scan(&fid)
  156. if err != nil {
  157. return "", err
  158. }
  159. return fid, nil
  160. }
  161. func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
  162. sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
  163. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
  164. if err != nil {
  165. return err
  166. }
  167. _, err = res.RowsAffected()
  168. if err != nil {
  169. return err
  170. }
  171. return nil
  172. }
  173. func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
  174. sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
  175. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
  176. if err != nil {
  177. return err
  178. }
  179. _, err = res.RowsAffected()
  180. if err != nil {
  181. return err
  182. }
  183. return nil
  184. }
  185. func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
  186. sqlStatement := "DELETE FROM %s WHERE uriPath=?"
  187. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
  188. if err != nil {
  189. return err
  190. }
  191. _, err = res.RowsAffected()
  192. if err != nil {
  193. return err
  194. }
  195. return nil
  196. }