You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

373 lines
10 KiB

7 years ago
4 years ago
4 years ago
5 years ago
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. type SqlGenerator interface {
  15. GetSqlInsert(tableName string) string
  16. GetSqlUpdate(tableName string) string
  17. GetSqlFind(tableName string) string
  18. GetSqlDelete(tableName string) string
  19. GetSqlDeleteFolderChildren(tableName string) string
  20. GetSqlListExclusive(tableName string) string
  21. GetSqlListInclusive(tableName string) string
  22. GetSqlCreateTable(tableName string) string
  23. GetSqlDropTable(tableName string) string
  24. }
  25. type AbstractSqlStore struct {
  26. SqlGenerator
  27. DB *sql.DB
  28. SupportBucketTable bool
  29. dbs map[string]bool
  30. dbsLock sync.Mutex
  31. }
  32. func (store *AbstractSqlStore) CanDropWholeBucket() bool {
  33. return store.SupportBucketTable
  34. }
  35. func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
  36. store.dbsLock.Lock()
  37. defer store.dbsLock.Unlock()
  38. store.CreateTable(context.Background(), bucket)
  39. if store.dbs == nil {
  40. return
  41. }
  42. store.dbs[bucket] = true
  43. }
  44. func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
  45. store.dbsLock.Lock()
  46. defer store.dbsLock.Unlock()
  47. store.deleteTable(context.Background(), bucket)
  48. if store.dbs == nil {
  49. return
  50. }
  51. delete(store.dbs, bucket)
  52. }
  53. const (
  54. DEFAULT_TABLE = "filemeta"
  55. )
  56. type TxOrDB interface {
  57. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  58. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  59. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  60. }
  61. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  62. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  63. Isolation: sql.LevelReadCommitted,
  64. ReadOnly: false,
  65. })
  66. if err != nil {
  67. return ctx, err
  68. }
  69. return context.WithValue(ctx, "tx", tx), nil
  70. }
  71. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  72. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  73. return tx.Commit()
  74. }
  75. return nil
  76. }
  77. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  78. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  79. return tx.Rollback()
  80. }
  81. return nil
  82. }
  83. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
  84. shortPath = fullpath
  85. bucket = DEFAULT_TABLE
  86. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  87. txOrDB = tx
  88. } else {
  89. txOrDB = store.DB
  90. }
  91. if !store.SupportBucketTable {
  92. return
  93. }
  94. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  95. return
  96. }
  97. // detect bucket
  98. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  99. t := strings.Index(bucketAndObjectKey, "/")
  100. if t < 0 && !isForChildren {
  101. return
  102. }
  103. bucket = bucketAndObjectKey
  104. shortPath = "/"
  105. if t > 0 {
  106. bucket = bucketAndObjectKey[:t]
  107. shortPath = util.FullPath(bucketAndObjectKey[t:])
  108. }
  109. if isValidBucket(bucket) {
  110. store.dbsLock.Lock()
  111. defer store.dbsLock.Unlock()
  112. if store.dbs == nil {
  113. store.dbs = make(map[string]bool)
  114. }
  115. if _, found := store.dbs[bucket]; !found {
  116. if err = store.CreateTable(ctx, bucket); err == nil {
  117. store.dbs[bucket] = true
  118. }
  119. }
  120. }
  121. return
  122. }
  123. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  124. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  125. if err != nil {
  126. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  127. }
  128. dir, name := shortPath.DirAndName()
  129. meta, err := entry.EncodeAttributesAndChunks()
  130. if err != nil {
  131. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  132. }
  133. if len(entry.Chunks) > 50 {
  134. meta = util.MaybeGzipData(meta)
  135. }
  136. res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
  137. if err == nil {
  138. return
  139. }
  140. if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
  141. // return fmt.Errorf("insert: %s", err)
  142. // skip this since the error can be in a different language
  143. }
  144. // now the insert failed possibly due to duplication constraints
  145. glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
  146. res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  147. if err != nil {
  148. return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
  149. }
  150. _, err = res.RowsAffected()
  151. if err != nil {
  152. return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err)
  153. }
  154. return nil
  155. }
  156. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  157. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  158. if err != nil {
  159. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  160. }
  161. dir, name := shortPath.DirAndName()
  162. meta, err := entry.EncodeAttributesAndChunks()
  163. if err != nil {
  164. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  165. }
  166. res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  167. if err != nil {
  168. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  169. }
  170. _, err = res.RowsAffected()
  171. if err != nil {
  172. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  173. }
  174. return nil
  175. }
  176. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  177. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  178. if err != nil {
  179. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  180. }
  181. dir, name := shortPath.DirAndName()
  182. row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
  183. var data []byte
  184. if err := row.Scan(&data); err != nil {
  185. if err == sql.ErrNoRows {
  186. return nil, filer_pb.ErrNotFound
  187. }
  188. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  189. }
  190. entry := &filer.Entry{
  191. FullPath: fullpath,
  192. }
  193. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  194. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  195. }
  196. return entry, nil
  197. }
  198. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  199. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  200. if err != nil {
  201. return fmt.Errorf("findDB %s : %v", fullpath, err)
  202. }
  203. dir, name := shortPath.DirAndName()
  204. res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
  205. if err != nil {
  206. return fmt.Errorf("delete %s: %s", fullpath, err)
  207. }
  208. _, err = res.RowsAffected()
  209. if err != nil {
  210. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  211. }
  212. return nil
  213. }
  214. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath, limit int64) error {
  215. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  216. if err != nil {
  217. return fmt.Errorf("findDB %s : %v", fullpath, err)
  218. }
  219. if isValidBucket(bucket) && shortPath == "/" {
  220. if err = store.deleteTable(ctx, bucket); err == nil {
  221. store.dbsLock.Lock()
  222. delete(store.dbs, bucket)
  223. store.dbsLock.Unlock()
  224. return nil
  225. } else {
  226. return err
  227. }
  228. }
  229. for {
  230. glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
  231. res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath), limit)
  232. if err != nil {
  233. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  234. }
  235. rowCount, err := res.RowsAffected()
  236. if err != nil {
  237. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  238. }
  239. if rowCount < limit {
  240. break
  241. }
  242. // to give the Galera Cluster a chance to breath
  243. time.Sleep(time.Second)
  244. }
  245. return nil
  246. }
  247. func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  248. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  249. if err != nil {
  250. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  251. }
  252. sqlText := store.GetSqlListExclusive(bucket)
  253. if includeStartFile {
  254. sqlText = store.GetSqlListInclusive(bucket)
  255. }
  256. rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
  257. if err != nil {
  258. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  259. }
  260. defer rows.Close()
  261. for rows.Next() {
  262. var name string
  263. var data []byte
  264. if err = rows.Scan(&name, &data); err != nil {
  265. glog.V(0).Infof("scan %s : %v", dirPath, err)
  266. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  267. }
  268. lastFileName = name
  269. entry := &filer.Entry{
  270. FullPath: util.NewFullPath(string(dirPath), name),
  271. }
  272. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  273. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  274. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  275. }
  276. if !eachEntryFunc(entry) {
  277. break
  278. }
  279. }
  280. return lastFileName, nil
  281. }
  282. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  283. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  284. }
  285. func (store *AbstractSqlStore) Shutdown() {
  286. store.DB.Close()
  287. }
  288. func isValidBucket(bucket string) bool {
  289. return bucket != DEFAULT_TABLE && bucket != ""
  290. }
  291. func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
  292. if !store.SupportBucketTable {
  293. return nil
  294. }
  295. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
  296. return err
  297. }
  298. func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
  299. if !store.SupportBucketTable {
  300. return nil
  301. }
  302. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
  303. return err
  304. }