You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
7.4 KiB

  1. package mysql_store
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "hash/crc32"
  6. "sync"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. _ "github.com/go-sql-driver/mysql"
  10. )
  11. const (
  12. sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
  13. default_maxIdleConnections = 100
  14. default_maxOpenConnections = 50
  15. default_maxTableNums = 1024
  16. tableName = "filer_mapping"
  17. )
  18. var (
  19. _init_db sync.Once
  20. _db_connections []*sql.DB
  21. )
  22. type MySqlConf struct {
  23. User string
  24. Password string
  25. HostName string
  26. Port int
  27. DataBase string
  28. MaxIdleConnections int
  29. MaxOpenConnections int
  30. }
  31. type ShardingConf struct {
  32. IsSharding bool `json:"isSharding"`
  33. ShardCount int `json:"shardCount"`
  34. }
  35. type MySqlStore struct {
  36. dbs []*sql.DB
  37. isSharding bool
  38. shardCount int
  39. }
  40. func getDbConnection(confs []MySqlConf) []*sql.DB {
  41. _init_db.Do(func() {
  42. for _, conf := range confs {
  43. sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
  44. var dbErr error
  45. _db_connection, dbErr := sql.Open("mysql", sqlUrl)
  46. if dbErr != nil {
  47. _db_connection.Close()
  48. _db_connection = nil
  49. panic(dbErr)
  50. }
  51. var maxIdleConnections, maxOpenConnections int
  52. if conf.MaxIdleConnections != 0 {
  53. maxIdleConnections = conf.MaxIdleConnections
  54. } else {
  55. maxIdleConnections = default_maxIdleConnections
  56. }
  57. if conf.MaxOpenConnections != 0 {
  58. maxOpenConnections = conf.MaxOpenConnections
  59. } else {
  60. maxOpenConnections = default_maxOpenConnections
  61. }
  62. _db_connection.SetMaxIdleConns(maxIdleConnections)
  63. _db_connection.SetMaxOpenConns(maxOpenConnections)
  64. _db_connections = append(_db_connections, _db_connection)
  65. }
  66. })
  67. return _db_connections
  68. }
  69. func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore {
  70. ms := &MySqlStore{
  71. dbs: getDbConnection(confs),
  72. isSharding: isSharding,
  73. shardCount: shardCount,
  74. }
  75. for _, db := range ms.dbs {
  76. if !isSharding {
  77. ms.shardCount = 1
  78. } else {
  79. if ms.shardCount == 0 {
  80. ms.shardCount = default_maxTableNums
  81. }
  82. }
  83. for i := 0; i < ms.shardCount; i++ {
  84. if err := ms.createTables(db, tableName, i); err != nil {
  85. fmt.Printf("create table failed %v", err)
  86. }
  87. }
  88. }
  89. return ms
  90. }
  91. func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
  92. hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
  93. instance_offset = int(hash_value) % len(s.dbs)
  94. table_postfix = int(hash_value) % s.shardCount
  95. return
  96. }
  97. func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
  98. instance_offset, table_postfix := s.hash(path)
  99. instanceId = instance_offset
  100. if s.isSharding {
  101. tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
  102. } else {
  103. tableFullName = tableName
  104. }
  105. return
  106. }
  107. func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
  108. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  109. if err != nil {
  110. return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
  111. }
  112. fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
  113. if err == sql.ErrNoRows {
  114. //Could not found
  115. err = filer.ErrNotFound
  116. }
  117. return fid, err
  118. }
  119. func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
  120. var tableFullName string
  121. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  122. if err != nil {
  123. return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
  124. }
  125. var old_fid string
  126. if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
  127. return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
  128. } else {
  129. if len(old_fid) == 0 {
  130. err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  131. err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
  132. } else {
  133. err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
  134. err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
  135. }
  136. }
  137. return
  138. }
  139. func (s *MySqlStore) Delete(fullFilePath string) (err error) {
  140. var fid string
  141. instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
  142. if err != nil {
  143. return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
  144. }
  145. if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  146. return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
  147. } else if fid == "" {
  148. return nil
  149. }
  150. if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
  151. return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
  152. } else {
  153. return nil
  154. }
  155. }
  156. func (s *MySqlStore) Close() {
  157. for _, db := range s.dbs {
  158. db.Close()
  159. }
  160. }
  161. var createTable = `
  162. CREATE TABLE IF NOT EXISTS %s (
  163. id bigint(20) NOT NULL AUTO_INCREMENT,
  164. uriPath char(255) NOT NULL DEFAULT "" COMMENT 'http uriPath',
  165. fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
  166. createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
  167. updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
  168. remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
  169. status tinyint(2) DEFAULT '1' COMMENT 'resource status',
  170. PRIMARY KEY (id),
  171. UNIQUE KEY index_uriPath (uriPath)
  172. ) DEFAULT CHARSET=utf8;
  173. `
  174. func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
  175. var realTableName string
  176. if s.isSharding {
  177. realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
  178. } else {
  179. realTableName = tableName
  180. }
  181. stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
  182. if err != nil {
  183. return err
  184. }
  185. defer stmt.Close()
  186. _, err = stmt.Exec()
  187. if err != nil {
  188. return err
  189. }
  190. return nil
  191. }
  192. func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
  193. sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
  194. row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
  195. var fid string
  196. err := row.Scan(&fid)
  197. if err != nil {
  198. return "", err
  199. }
  200. return fid, nil
  201. }
  202. func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
  203. sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
  204. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
  205. if err != nil {
  206. return err
  207. }
  208. _, err = res.RowsAffected()
  209. if err != nil {
  210. return err
  211. }
  212. return nil
  213. }
  214. func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
  215. sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
  216. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
  217. if err != nil {
  218. return err
  219. }
  220. _, err = res.RowsAffected()
  221. if err != nil {
  222. return err
  223. }
  224. return nil
  225. }
  226. func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
  227. sqlStatement := "DELETE FROM %s WHERE uriPath=?"
  228. res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
  229. if err != nil {
  230. return err
  231. }
  232. _, err = res.RowsAffected()
  233. if err != nil {
  234. return err
  235. }
  236. return nil
  237. }