You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
12 KiB

7 years ago
4 years ago
8 months ago
4 years ago
4 years ago
5 years ago
8 months ago
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "strings"
  11. "sync"
  12. )
  13. type SqlGenerator interface {
  14. GetSqlInsert(tableName string) string
  15. GetSqlUpdate(tableName string) string
  16. GetSqlFind(tableName string) string
  17. GetSqlDelete(tableName string) string
  18. GetSqlDeleteFolderChildren(tableName string) string
  19. GetSqlListExclusive(tableName string) string
  20. GetSqlListInclusive(tableName string) string
  21. GetSqlListRecursive(tableName string) string
  22. GetSqlCreateTable(tableName string) string
  23. GetSqlDropTable(tableName string) string
  24. }
  25. type AbstractSqlStore struct {
  26. SqlGenerator
  27. DB *sql.DB
  28. SupportBucketTable bool
  29. dbs map[string]bool
  30. dbsLock sync.Mutex
  31. }
  32. var _ filer.BucketAware = (*AbstractSqlStore)(nil)
  33. func (store *AbstractSqlStore) CanDropWholeBucket() bool {
  34. return store.SupportBucketTable
  35. }
  36. func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
  37. store.dbsLock.Lock()
  38. defer store.dbsLock.Unlock()
  39. store.CreateTable(context.Background(), bucket)
  40. if store.dbs == nil {
  41. return
  42. }
  43. store.dbs[bucket] = true
  44. }
  45. func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
  46. store.dbsLock.Lock()
  47. defer store.dbsLock.Unlock()
  48. store.deleteTable(context.Background(), bucket)
  49. if store.dbs == nil {
  50. return
  51. }
  52. delete(store.dbs, bucket)
  53. }
  54. const (
  55. DEFAULT_TABLE = "filemeta"
  56. )
  57. type TxOrDB interface {
  58. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  59. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  60. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  61. }
  62. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  63. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  64. Isolation: sql.LevelReadCommitted,
  65. ReadOnly: false,
  66. })
  67. if err != nil {
  68. return ctx, err
  69. }
  70. return context.WithValue(ctx, "tx", tx), nil
  71. }
  72. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  73. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  74. return tx.Commit()
  75. }
  76. return nil
  77. }
  78. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  79. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  80. return tx.Rollback()
  81. }
  82. return nil
  83. }
  84. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
  85. shortPath = fullpath
  86. bucket = DEFAULT_TABLE
  87. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  88. txOrDB = tx
  89. } else {
  90. txOrDB = store.DB
  91. }
  92. if !store.SupportBucketTable {
  93. return
  94. }
  95. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  96. return
  97. }
  98. // detect bucket
  99. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  100. t := strings.Index(bucketAndObjectKey, "/")
  101. if t < 0 && !isForChildren {
  102. return
  103. }
  104. bucket = bucketAndObjectKey
  105. shortPath = "/"
  106. if t > 0 {
  107. bucket = bucketAndObjectKey[:t]
  108. shortPath = util.FullPath(bucketAndObjectKey[t:])
  109. }
  110. if isValidBucket(bucket) {
  111. store.dbsLock.Lock()
  112. defer store.dbsLock.Unlock()
  113. if store.dbs == nil {
  114. store.dbs = make(map[string]bool)
  115. }
  116. if _, found := store.dbs[bucket]; !found {
  117. if err = store.CreateTable(ctx, bucket); err == nil {
  118. store.dbs[bucket] = true
  119. }
  120. }
  121. }
  122. return
  123. }
  124. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  125. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  126. if err != nil {
  127. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  128. }
  129. dir, name := shortPath.DirAndName()
  130. meta, err := entry.EncodeAttributesAndChunks()
  131. if err != nil {
  132. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  133. }
  134. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  135. meta = util.MaybeGzipData(meta)
  136. }
  137. sqlInsert := "insert"
  138. res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
  139. if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") {
  140. // now the insert failed possibly due to duplication constraints
  141. sqlInsert = "falls back to update"
  142. glog.V(1).Infof("insert %s %s: %v", entry.FullPath, sqlInsert, err)
  143. res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  144. }
  145. if err != nil {
  146. return fmt.Errorf("%s %s: %s", sqlInsert, entry.FullPath, err)
  147. }
  148. _, err = res.RowsAffected()
  149. if err != nil {
  150. return fmt.Errorf("%s %s but no rows affected: %s", sqlInsert, entry.FullPath, err)
  151. }
  152. return nil
  153. }
  154. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  155. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  156. if err != nil {
  157. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  158. }
  159. dir, name := shortPath.DirAndName()
  160. meta, err := entry.EncodeAttributesAndChunks()
  161. if err != nil {
  162. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  163. }
  164. res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  165. if err != nil {
  166. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  167. }
  168. _, err = res.RowsAffected()
  169. if err != nil {
  170. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  171. }
  172. return nil
  173. }
  174. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  175. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  176. if err != nil {
  177. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  178. }
  179. dir, name := shortPath.DirAndName()
  180. row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
  181. var data []byte
  182. if err := row.Scan(&data); err != nil {
  183. if err == sql.ErrNoRows {
  184. return nil, filer_pb.ErrNotFound
  185. }
  186. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  187. }
  188. entry := &filer.Entry{
  189. FullPath: fullpath,
  190. }
  191. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  192. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  193. }
  194. return entry, nil
  195. }
  196. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  197. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  198. if err != nil {
  199. return fmt.Errorf("findDB %s : %v", fullpath, err)
  200. }
  201. dir, name := shortPath.DirAndName()
  202. res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
  203. if err != nil {
  204. return fmt.Errorf("delete %s: %s", fullpath, err)
  205. }
  206. _, err = res.RowsAffected()
  207. if err != nil {
  208. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  209. }
  210. return nil
  211. }
  212. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  213. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  214. if err != nil {
  215. return fmt.Errorf("findDB %s : %v", fullpath, err)
  216. }
  217. if isValidBucket(bucket) && shortPath == "/" {
  218. if err = store.deleteTable(ctx, bucket); err == nil {
  219. store.dbsLock.Lock()
  220. delete(store.dbs, bucket)
  221. store.dbsLock.Unlock()
  222. return nil
  223. } else {
  224. return err
  225. }
  226. }
  227. glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
  228. res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
  229. if err != nil {
  230. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  231. }
  232. _, err = res.RowsAffected()
  233. if err != nil {
  234. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  235. }
  236. return nil
  237. }
  238. func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  239. glog.V(5).Infof("ListDirectoryPrefixedEntries dirPath %v, includeStartFile %v", dirPath, includeStartFile)
  240. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  241. if err != nil {
  242. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  243. }
  244. sqlText := store.GetSqlListExclusive(bucket)
  245. if includeStartFile {
  246. sqlText = store.GetSqlListInclusive(bucket)
  247. }
  248. rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
  249. if err != nil {
  250. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  251. }
  252. defer rows.Close()
  253. for rows.Next() {
  254. var name string
  255. var data []byte
  256. if err = rows.Scan(&name, &data); err != nil {
  257. glog.V(0).Infof("scan %s : %v", dirPath, err)
  258. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  259. }
  260. lastFileName = name
  261. entry := &filer.Entry{
  262. FullPath: util.NewFullPath(string(dirPath), name),
  263. }
  264. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  265. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  266. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  267. }
  268. if !eachEntryFunc(entry) {
  269. break
  270. }
  271. }
  272. return lastFileName, nil
  273. }
  274. func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  275. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  276. if err != nil {
  277. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  278. }
  279. glog.V(5).Infof("ListRecursivePrefixedEntries lastFileName %s shortPath %v, prefix %v, sql %s", lastFileName, string(shortPath), prefix, store.GetSqlListRecursive(bucket))
  280. rows, err := db.QueryContext(ctx, store.GetSqlListRecursive(bucket), startFileName, util.HashStringToLong(string(shortPath)), prefix+"%", string(shortPath)+prefix+"%", limit+1)
  281. if err != nil {
  282. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  283. }
  284. defer rows.Close()
  285. for rows.Next() {
  286. var dir, name string
  287. var data []byte
  288. if err = rows.Scan(&dir, &name, &data); err != nil {
  289. glog.V(0).Infof("scan %s : %v", dirPath, err)
  290. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  291. }
  292. glog.V(0).Infof("scan dir %s name %v", dir, name)
  293. entry := &filer.Entry{
  294. FullPath: util.NewFullPath(dir, name),
  295. }
  296. lastFileName = string(entry.FullPath)
  297. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  298. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  299. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  300. }
  301. if !eachEntryFunc(entry) {
  302. break
  303. }
  304. }
  305. return lastFileName, nil
  306. }
  307. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  308. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  309. }
  310. func (store *AbstractSqlStore) Shutdown() {
  311. store.DB.Close()
  312. }
  313. func isValidBucket(bucket string) bool {
  314. return bucket != DEFAULT_TABLE && bucket != ""
  315. }
  316. func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
  317. if !store.SupportBucketTable {
  318. return nil
  319. }
  320. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
  321. return err
  322. }
  323. func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
  324. if !store.SupportBucketTable {
  325. return nil
  326. }
  327. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
  328. return err
  329. }