You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
9.9 KiB

7 years ago
4 years ago
4 years ago
5 years ago
5 years ago
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "strings"
  11. "sync"
  12. )
  13. type SqlGenerator interface {
  14. GetSqlInsert(tableName string) string
  15. GetSqlUpdate(tableName string) string
  16. GetSqlFind(tableName string) string
  17. GetSqlDelete(tableName string) string
  18. GetSqlDeleteFolderChildren(tableName string) string
  19. GetSqlListExclusive(tableName string) string
  20. GetSqlListInclusive(tableName string) string
  21. GetSqlCreateTable(tableName string) string
  22. GetSqlDropTable(tableName string) string
  23. }
  24. type AbstractSqlStore struct {
  25. SqlGenerator
  26. DB *sql.DB
  27. SupportBucketTable bool
  28. dbs map[string]bool
  29. dbsLock sync.Mutex
  30. }
  31. func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
  32. store.dbsLock.Lock()
  33. defer store.dbsLock.Unlock()
  34. store.CreateTable(context.Background(), bucket)
  35. if store.dbs == nil {
  36. return
  37. }
  38. store.dbs[bucket] = true
  39. }
  40. func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
  41. store.dbsLock.Lock()
  42. defer store.dbsLock.Unlock()
  43. store.deleteTable(context.Background(), bucket)
  44. if store.dbs == nil {
  45. return
  46. }
  47. delete(store.dbs, bucket)
  48. }
  49. const (
  50. DEFAULT_TABLE = "filemeta"
  51. )
  52. type TxOrDB interface {
  53. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  54. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  55. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  56. }
  57. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  58. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  59. Isolation: sql.LevelReadCommitted,
  60. ReadOnly: false,
  61. })
  62. if err != nil {
  63. return ctx, err
  64. }
  65. return context.WithValue(ctx, "tx", tx), nil
  66. }
  67. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  68. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  69. return tx.Commit()
  70. }
  71. return nil
  72. }
  73. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  74. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  75. return tx.Rollback()
  76. }
  77. return nil
  78. }
  79. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
  80. shortPath = fullpath
  81. bucket = DEFAULT_TABLE
  82. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  83. txOrDB = tx
  84. } else {
  85. txOrDB = store.DB
  86. }
  87. if !store.SupportBucketTable {
  88. return
  89. }
  90. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  91. return
  92. }
  93. // detect bucket
  94. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  95. t := strings.Index(bucketAndObjectKey, "/")
  96. if t < 0 && !isForChildren {
  97. return
  98. }
  99. bucket = bucketAndObjectKey
  100. shortPath = "/"
  101. if t > 0 {
  102. bucket = bucketAndObjectKey[:t]
  103. shortPath = util.FullPath(bucketAndObjectKey[t:])
  104. }
  105. if isValidBucket(bucket) {
  106. store.dbsLock.Lock()
  107. defer store.dbsLock.Unlock()
  108. if store.dbs == nil {
  109. store.dbs = make(map[string]bool)
  110. }
  111. if _, found := store.dbs[bucket]; !found {
  112. if err = store.CreateTable(ctx, bucket); err == nil {
  113. store.dbs[bucket] = true
  114. }
  115. }
  116. }
  117. return
  118. }
  119. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  120. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  121. if err != nil {
  122. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  123. }
  124. dir, name := shortPath.DirAndName()
  125. meta, err := entry.EncodeAttributesAndChunks()
  126. if err != nil {
  127. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  128. }
  129. if len(entry.Chunks) > 50 {
  130. meta = util.MaybeGzipData(meta)
  131. }
  132. res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
  133. if err == nil {
  134. return
  135. }
  136. if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
  137. // return fmt.Errorf("insert: %s", err)
  138. // skip this since the error can be in a different language
  139. }
  140. // now the insert failed possibly due to duplication constraints
  141. glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
  142. res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  143. if err != nil {
  144. return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
  145. }
  146. _, err = res.RowsAffected()
  147. if err != nil {
  148. return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err)
  149. }
  150. return nil
  151. }
  152. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  153. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  154. if err != nil {
  155. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  156. }
  157. dir, name := shortPath.DirAndName()
  158. meta, err := entry.EncodeAttributesAndChunks()
  159. if err != nil {
  160. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  161. }
  162. res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  163. if err != nil {
  164. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  165. }
  166. _, err = res.RowsAffected()
  167. if err != nil {
  168. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  169. }
  170. return nil
  171. }
  172. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  173. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  174. if err != nil {
  175. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  176. }
  177. dir, name := shortPath.DirAndName()
  178. row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
  179. var data []byte
  180. if err := row.Scan(&data); err != nil {
  181. if err == sql.ErrNoRows {
  182. return nil, filer_pb.ErrNotFound
  183. }
  184. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  185. }
  186. entry := &filer.Entry{
  187. FullPath: fullpath,
  188. }
  189. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  190. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  191. }
  192. return entry, nil
  193. }
  194. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  195. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  196. if err != nil {
  197. return fmt.Errorf("findDB %s : %v", fullpath, err)
  198. }
  199. dir, name := shortPath.DirAndName()
  200. res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
  201. if err != nil {
  202. return fmt.Errorf("delete %s: %s", fullpath, err)
  203. }
  204. _, err = res.RowsAffected()
  205. if err != nil {
  206. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  207. }
  208. return nil
  209. }
  210. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  211. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  212. if err != nil {
  213. return fmt.Errorf("findDB %s : %v", fullpath, err)
  214. }
  215. if isValidBucket(bucket) && shortPath == "/" {
  216. if err = store.deleteTable(ctx, bucket); err == nil {
  217. store.dbsLock.Lock()
  218. delete(store.dbs, bucket)
  219. store.dbsLock.Unlock()
  220. return nil
  221. } else {
  222. return err
  223. }
  224. }
  225. glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
  226. res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
  227. if err != nil {
  228. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  229. }
  230. _, err = res.RowsAffected()
  231. if err != nil {
  232. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  233. }
  234. return nil
  235. }
  236. func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  237. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  238. if err != nil {
  239. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  240. }
  241. sqlText := store.GetSqlListExclusive(bucket)
  242. if includeStartFile {
  243. sqlText = store.GetSqlListInclusive(bucket)
  244. }
  245. rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
  246. if err != nil {
  247. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  248. }
  249. defer rows.Close()
  250. for rows.Next() {
  251. var name string
  252. var data []byte
  253. if err = rows.Scan(&name, &data); err != nil {
  254. glog.V(0).Infof("scan %s : %v", dirPath, err)
  255. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  256. }
  257. lastFileName = name
  258. entry := &filer.Entry{
  259. FullPath: util.NewFullPath(string(dirPath), name),
  260. }
  261. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  262. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  263. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  264. }
  265. if !eachEntryFunc(entry) {
  266. break
  267. }
  268. }
  269. return lastFileName, nil
  270. }
  271. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  272. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  273. }
  274. func (store *AbstractSqlStore) Shutdown() {
  275. store.DB.Close()
  276. }
  277. func isValidBucket(bucket string) bool {
  278. return bucket != DEFAULT_TABLE && bucket != ""
  279. }
  280. func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
  281. if !store.SupportBucketTable {
  282. return nil
  283. }
  284. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
  285. return err
  286. }
  287. func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
  288. if !store.SupportBucketTable {
  289. return nil
  290. }
  291. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
  292. return err
  293. }