You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

436 lines
13 KiB

7 years ago
4 years ago
9 months ago
4 years ago
4 years ago
5 years ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
9 months ago
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "strings"
  12. "sync"
  13. )
  14. type SqlGenerator interface {
  15. GetSqlInsert(tableName string) string
  16. GetSqlUpdate(tableName string) string
  17. GetSqlFind(tableName string) string
  18. GetSqlDelete(tableName string) string
  19. GetSqlDeleteFolderChildren(tableName string) string
  20. GetSqlListExclusive(tableName string) string
  21. GetSqlListInclusive(tableName string) string
  22. GetSqlListRecursive(tableName string) string
  23. GetSqlCreateTable(tableName string) string
  24. GetSqlDropTable(tableName string) string
  25. }
  26. type AbstractSqlStore struct {
  27. SqlGenerator
  28. DB *sql.DB
  29. SupportBucketTable bool
  30. dbs map[string]bool
  31. dbsLock sync.Mutex
  32. }
  33. var _ filer.BucketAware = (*AbstractSqlStore)(nil)
  34. func (store *AbstractSqlStore) CanDropWholeBucket() bool {
  35. return store.SupportBucketTable
  36. }
  37. func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
  38. store.dbsLock.Lock()
  39. defer store.dbsLock.Unlock()
  40. store.CreateTable(context.Background(), bucket)
  41. if store.dbs == nil {
  42. return
  43. }
  44. store.dbs[bucket] = true
  45. }
  46. func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
  47. store.dbsLock.Lock()
  48. defer store.dbsLock.Unlock()
  49. store.deleteTable(context.Background(), bucket)
  50. if store.dbs == nil {
  51. return
  52. }
  53. delete(store.dbs, bucket)
  54. }
  55. const (
  56. DEFAULT_TABLE = "filemeta"
  57. )
  58. type TxOrDB interface {
  59. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  60. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  61. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  62. }
  63. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  64. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  65. Isolation: sql.LevelReadCommitted,
  66. ReadOnly: false,
  67. })
  68. if err != nil {
  69. return ctx, err
  70. }
  71. return context.WithValue(ctx, "tx", tx), nil
  72. }
  73. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  74. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  75. return tx.Commit()
  76. }
  77. return nil
  78. }
  79. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  80. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  81. return tx.Rollback()
  82. }
  83. return nil
  84. }
  85. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
  86. shortPath = fullpath
  87. bucket = DEFAULT_TABLE
  88. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  89. txOrDB = tx
  90. } else {
  91. txOrDB = store.DB
  92. }
  93. if !store.SupportBucketTable {
  94. return
  95. }
  96. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  97. return
  98. }
  99. // detect bucket
  100. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  101. t := strings.Index(bucketAndObjectKey, "/")
  102. if t < 0 && !isForChildren {
  103. return
  104. }
  105. bucket = bucketAndObjectKey
  106. shortPath = "/"
  107. if t > 0 {
  108. bucket = bucketAndObjectKey[:t]
  109. shortPath = util.FullPath(bucketAndObjectKey[t:])
  110. }
  111. if isValidBucket(bucket) {
  112. store.dbsLock.Lock()
  113. defer store.dbsLock.Unlock()
  114. if store.dbs == nil {
  115. store.dbs = make(map[string]bool)
  116. }
  117. if _, found := store.dbs[bucket]; !found {
  118. if err = store.CreateTable(ctx, bucket); err == nil {
  119. store.dbs[bucket] = true
  120. }
  121. }
  122. } else {
  123. err = fmt.Errorf("invalid bucket name %s", bucket)
  124. }
  125. return
  126. }
  127. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  128. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  129. if err != nil {
  130. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  131. }
  132. dir, name := shortPath.DirAndName()
  133. meta, err := entry.EncodeAttributesAndChunks()
  134. if err != nil {
  135. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  136. }
  137. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  138. meta = util.MaybeGzipData(meta)
  139. }
  140. sqlInsert := "insert"
  141. res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
  142. if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate entry") {
  143. // now the insert failed possibly due to duplication constraints
  144. sqlInsert = "falls back to update"
  145. glog.V(1).Infof("insert %s %s: %v", entry.FullPath, sqlInsert, err)
  146. res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  147. }
  148. if err != nil {
  149. return fmt.Errorf("%s %s: %s", sqlInsert, entry.FullPath, err)
  150. }
  151. _, err = res.RowsAffected()
  152. if err != nil {
  153. return fmt.Errorf("%s %s but no rows affected: %s", sqlInsert, entry.FullPath, err)
  154. }
  155. return nil
  156. }
  157. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  158. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  159. if err != nil {
  160. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  161. }
  162. dir, name := shortPath.DirAndName()
  163. meta, err := entry.EncodeAttributesAndChunks()
  164. if err != nil {
  165. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  166. }
  167. res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  168. if err != nil {
  169. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  170. }
  171. _, err = res.RowsAffected()
  172. if err != nil {
  173. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  174. }
  175. return nil
  176. }
  177. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  178. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  179. if err != nil {
  180. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  181. }
  182. dir, name := shortPath.DirAndName()
  183. row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
  184. var data []byte
  185. if err := row.Scan(&data); err != nil {
  186. if err == sql.ErrNoRows {
  187. return nil, filer_pb.ErrNotFound
  188. }
  189. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  190. }
  191. entry := &filer.Entry{
  192. FullPath: fullpath,
  193. }
  194. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  195. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  196. }
  197. return entry, nil
  198. }
  199. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  200. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  201. if err != nil {
  202. return fmt.Errorf("findDB %s : %v", fullpath, err)
  203. }
  204. dir, name := shortPath.DirAndName()
  205. res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
  206. if err != nil {
  207. return fmt.Errorf("delete %s: %s", fullpath, err)
  208. }
  209. _, err = res.RowsAffected()
  210. if err != nil {
  211. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  212. }
  213. return nil
  214. }
  215. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  216. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  217. if err != nil {
  218. return fmt.Errorf("findDB %s : %v", fullpath, err)
  219. }
  220. if isValidBucket(bucket) && shortPath == "/" {
  221. if err = store.deleteTable(ctx, bucket); err == nil {
  222. store.dbsLock.Lock()
  223. delete(store.dbs, bucket)
  224. store.dbsLock.Unlock()
  225. return nil
  226. } else {
  227. return err
  228. }
  229. }
  230. glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
  231. res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
  232. if err != nil {
  233. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  234. }
  235. _, err = res.RowsAffected()
  236. if err != nil {
  237. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  238. }
  239. return nil
  240. }
  241. func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  242. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  243. if err != nil {
  244. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  245. }
  246. sqlText := store.GetSqlListExclusive(bucket)
  247. if includeStartFile {
  248. sqlText = store.GetSqlListInclusive(bucket)
  249. }
  250. rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
  251. if err != nil {
  252. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  253. }
  254. defer rows.Close()
  255. for rows.Next() {
  256. var name string
  257. var data []byte
  258. if err = rows.Scan(&name, &data); err != nil {
  259. glog.V(0).Infof("scan %s : %v", dirPath, err)
  260. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  261. }
  262. lastFileName = name
  263. entry := &filer.Entry{
  264. FullPath: util.NewFullPath(string(dirPath), name),
  265. }
  266. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  267. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  268. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  269. }
  270. if !eachEntryFunc(entry) {
  271. break
  272. }
  273. }
  274. return lastFileName, nil
  275. }
  276. func (store *AbstractSqlStore) ListRecursivePrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, delimiter bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  277. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  278. if err != nil {
  279. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  280. }
  281. bucketDir := ""
  282. if bucket != DEFAULT_TABLE {
  283. bucketDir = fmt.Sprintf("/buckets/%s", bucket)
  284. }
  285. shortDir := string(shortPath)
  286. namePrefix := prefix + "%"
  287. var dirPrefix string
  288. isPrefixEndsWithDelimiter := false
  289. if delimiter {
  290. if prefix == "" && len(startFileName) == 0 {
  291. dirPrefix = shortDir
  292. limit += 1
  293. isPrefixEndsWithDelimiter = true
  294. }
  295. } else {
  296. if strings.HasSuffix(shortDir, "/") {
  297. dirPrefix = fmt.Sprintf("%s%s%%", shortDir, prefix)
  298. } else {
  299. dirPrefix = fmt.Sprintf("%s/%s%%", shortDir, prefix)
  300. }
  301. }
  302. rows, err := db.QueryContext(ctx, store.GetSqlListRecursive(bucket), startFileName, util.HashStringToLong(shortDir), namePrefix, dirPrefix, limit+1)
  303. if err != nil {
  304. glog.Errorf("list %s : %v", dirPath, err)
  305. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  306. }
  307. defer rows.Close()
  308. for rows.Next() {
  309. var dir, name, fileName string
  310. var data []byte
  311. if err = rows.Scan(&dir, &name, &data); err != nil {
  312. glog.V(0).Infof("scan %s : %v", dirPath, err)
  313. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  314. }
  315. if strings.HasSuffix(dir, "/") {
  316. fileName = dir + name
  317. } else {
  318. fileName = fmt.Sprintf("%s/%s", dir, name)
  319. }
  320. lastFileName = fmt.Sprintf("%s%s", dir, name)
  321. entry := &filer.Entry{
  322. FullPath: util.NewFullPath(bucketDir, fileName),
  323. }
  324. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  325. glog.Errorf("scan decode %s : %v", entry.FullPath, err)
  326. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  327. }
  328. isDirectory := entry.IsDirectory() && entry.Attr.Mime == "" && entry.Attr.FileSize == 0
  329. if !delimiter && isDirectory {
  330. continue
  331. }
  332. glog.V(0).Infof("ListRecursivePrefixedEntries bucket %s, shortDir: %s, bucketDir: %s, lastFileName %s, fileName %s", bucket, shortDir, bucketDir, lastFileName, fileName)
  333. if isPrefixEndsWithDelimiter && shortDir == lastFileName && isDirectory {
  334. continue
  335. }
  336. if !eachEntryFunc(entry) {
  337. break
  338. }
  339. }
  340. return lastFileName, nil
  341. }
  342. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  343. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  344. }
  345. func (store *AbstractSqlStore) Shutdown() {
  346. store.DB.Close()
  347. }
  348. func isValidBucket(bucket string) bool {
  349. if s3bucket.VerifyS3BucketName(bucket) != nil {
  350. return false
  351. }
  352. return bucket != DEFAULT_TABLE && bucket != ""
  353. }
  354. func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
  355. if !store.SupportBucketTable {
  356. return nil
  357. }
  358. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
  359. return err
  360. }
  361. func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
  362. if !store.SupportBucketTable {
  363. return nil
  364. }
  365. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
  366. return err
  367. }