You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.4 KiB

4 years ago
4 years ago
4 years ago
  1. package hbase
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/tsuna/gohbase"
  11. "github.com/tsuna/gohbase/hrpc"
  12. "io"
  13. )
  14. func init() {
  15. filer.Stores = append(filer.Stores, &HbaseStore{})
  16. }
  17. type HbaseStore struct {
  18. Client gohbase.Client
  19. table []byte
  20. cfKv string
  21. cfMetaDir string
  22. cfFlatDir string
  23. column string
  24. }
  25. func (store *HbaseStore) GetName() string {
  26. return "hbase"
  27. }
  28. func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  29. return store.initialize(
  30. configuration.GetString(prefix+"zkquorum"),
  31. configuration.GetString(prefix+"table"),
  32. )
  33. }
  34. func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
  35. store.Client = gohbase.NewClient(zkquorum)
  36. store.table = []byte(table)
  37. store.cfKv = "kv"
  38. store.cfMetaDir = "meta"
  39. store.cfFlatDir = "flat"
  40. store.column = "a"
  41. // check table exists
  42. key := "whatever"
  43. headers := map[string][]string{store.cfMetaDir: nil}
  44. get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
  45. if err != nil {
  46. return fmt.Errorf("NewGet returned an error: %v", err)
  47. }
  48. _, err = store.Client.Get(get)
  49. if err != gohbase.TableNotFound {
  50. return nil
  51. }
  52. // create table
  53. adminClient := gohbase.NewAdminClient(zkquorum)
  54. cFamilies := []string{store.cfKv, store.cfMetaDir, store.cfFlatDir}
  55. cf := make(map[string]map[string]string, len(cFamilies))
  56. for _, f := range cFamilies {
  57. cf[f] = nil
  58. }
  59. ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
  60. if err := adminClient.CreateTable(ct); err != nil {
  61. return err
  62. }
  63. return nil
  64. }
  65. func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
  66. value, err := entry.EncodeAttributesAndChunks()
  67. if err != nil {
  68. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  69. }
  70. if len(entry.Chunks) > 50 {
  71. value = util.MaybeGzipData(value)
  72. }
  73. return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value)
  74. }
  75. func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  76. return store.InsertEntry(ctx, entry)
  77. }
  78. func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
  79. value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
  80. if err != nil {
  81. if err == filer.ErrKvNotFound {
  82. return nil, filer_pb.ErrNotFound
  83. }
  84. return nil, err
  85. }
  86. entry = &filer.Entry{
  87. FullPath: path,
  88. }
  89. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
  90. if err != nil {
  91. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  92. }
  93. return entry, nil
  94. }
  95. func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
  96. return store.doDelete(ctx, store.cfMetaDir, []byte(path))
  97. }
  98. func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
  99. family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
  100. expectedPrefix := []byte(path+"/")
  101. scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
  102. if err != nil {
  103. return err
  104. }
  105. scanner := store.Client.Scan(scan)
  106. defer scanner.Close()
  107. for {
  108. res, err := scanner.Next()
  109. if err != nil {
  110. break
  111. }
  112. if len(res.Cells) == 0 {
  113. continue
  114. }
  115. cell := res.Cells[0]
  116. if !bytes.HasPrefix(cell.Row, expectedPrefix) {
  117. break
  118. }
  119. err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
  120. if err != nil {
  121. break
  122. }
  123. }
  124. return
  125. }
  126. func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
  127. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
  128. }
  129. func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
  130. family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
  131. expectedPrefix := []byte(dirPath.Child(prefix))
  132. scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
  133. if err != nil {
  134. return nil, err
  135. }
  136. var entries []*filer.Entry
  137. scanner := store.Client.Scan(scan)
  138. defer scanner.Close()
  139. for {
  140. res, err := scanner.Next()
  141. if err == io.EOF {
  142. break
  143. }
  144. if err != nil {
  145. return entries, err
  146. }
  147. if len(res.Cells) == 0 {
  148. continue
  149. }
  150. cell := res.Cells[0]
  151. if !bytes.HasPrefix(cell.Row, expectedPrefix) {
  152. break
  153. }
  154. fullpath := util.FullPath(cell.Row)
  155. value := cell.Value
  156. _, fileName := fullpath.DirAndName()
  157. if fileName == startFileName && !includeStartFile {
  158. continue
  159. }
  160. limit--
  161. if limit < 0 {
  162. break
  163. }
  164. entry := &filer.Entry{
  165. FullPath: fullpath,
  166. }
  167. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
  168. err = decodeErr
  169. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  170. break
  171. }
  172. entries = append(entries, entry)
  173. }
  174. return entries, nil
  175. }
  176. func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  177. return ctx, nil
  178. }
  179. func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
  180. return nil
  181. }
  182. func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
  183. return nil
  184. }
  185. func (store *HbaseStore) Shutdown() {
  186. store.Client.Close()
  187. }