You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

231 lines
5.7 KiB

package hbase
import (
"bytes"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc"
"io"
)
func init() {
filer.Stores = append(filer.Stores, &HbaseStore{})
}
type HbaseStore struct {
Client gohbase.Client
table []byte
cfKv string
cfMetaDir string
column string
}
func (store *HbaseStore) GetName() string {
return "hbase"
}
func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"zkquorum"),
configuration.GetString(prefix+"table"),
)
}
func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
store.Client = gohbase.NewClient(zkquorum)
store.table = []byte(table)
store.cfKv = "kv"
store.cfMetaDir = "meta"
store.column = "a"
// check table exists
key := "whatever"
headers := map[string][]string{store.cfMetaDir: nil}
get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
if err != nil {
return fmt.Errorf("NewGet returned an error: %v", err)
}
_, err = store.Client.Get(get)
if err != gohbase.TableNotFound {
return nil
}
// create table
adminClient := gohbase.NewAdminClient(zkquorum)
cFamilies := []string{store.cfKv, store.cfMetaDir}
cf := make(map[string]map[string]string, len(cFamilies))
for _, f := range cFamilies {
cf[f] = nil
}
ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
if err := adminClient.CreateTable(ct); err != nil {
return err
}
return nil
}
func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
}
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
if err != nil {
if err == filer.ErrKvNotFound {
return nil, filer_pb.ErrNotFound
}
return nil, err
}
entry = &filer.Entry{
FullPath: path,
}
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
return store.doDelete(ctx, store.cfMetaDir, []byte(path))
}
func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(path.Child(""))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
return err
}
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
res, err := scanner.Next()
if err != nil {
break
}
if len(res.Cells) == 0 {
continue
}
cell := res.Cells[0]
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
break
}
fullpath := util.FullPath(cell.Row)
dir, _ := fullpath.DirAndName()
if dir != string(path) {
continue
}
err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
if err != nil {
break
}
}
return
}
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
return lastFileName, err
}
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
res, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
return lastFileName, err
}
if len(res.Cells) == 0 {
continue
}
cell := res.Cells[0]
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
break
}
fullpath := util.FullPath(cell.Row)
dir, fileName := fullpath.DirAndName()
if dir != string(dirPath) {
continue
}
value := cell.Value
if fileName == startFileName && !includeStartFile {
continue
}
limit--
if limit < 0 {
break
}
lastFileName = fileName
entry := &filer.Entry{
FullPath: fullpath,
}
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
break
}
}
return lastFileName, nil
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *HbaseStore) Shutdown() {
store.Client.Close()
}