Chris Lu
4 years ago
6 changed files with 324 additions and 0 deletions
-
1go.mod
-
15go.sum
-
5weed/command/scaffold.go
-
227weed/filer/hbase/hbase_store.go
-
75weed/filer/hbase/hbase_store_kv.go
-
1weed/server/filer_server.go
@ -0,0 +1,227 @@ |
|||
package hbase |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/tsuna/gohbase" |
|||
"github.com/tsuna/gohbase/hrpc" |
|||
"io" |
|||
) |
|||
|
|||
func init() { |
|||
filer.Stores = append(filer.Stores, &HbaseStore{}) |
|||
} |
|||
|
|||
type HbaseStore struct { |
|||
Client gohbase.Client |
|||
table []byte |
|||
cfKv string |
|||
cfMetaDir string |
|||
column string |
|||
} |
|||
|
|||
func (store *HbaseStore) GetName() string { |
|||
return "hbase" |
|||
} |
|||
|
|||
func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) { |
|||
return store.initialize( |
|||
configuration.GetString(prefix+"zkquorum"), |
|||
configuration.GetString(prefix+"table"), |
|||
) |
|||
} |
|||
|
|||
func (store *HbaseStore) initialize(zkquorum, table string) (err error) { |
|||
store.Client = gohbase.NewClient(zkquorum) |
|||
store.table = []byte(table) |
|||
store.cfKv = "kv" |
|||
store.cfMetaDir = "meta" |
|||
store.column = "a" |
|||
|
|||
// check table exists
|
|||
key := "whatever" |
|||
headers := map[string][]string{store.cfMetaDir: nil} |
|||
get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers)) |
|||
if err != nil { |
|||
return fmt.Errorf("NewGet returned an error: %v", err) |
|||
} |
|||
_, err = store.Client.Get(get) |
|||
if err != gohbase.TableNotFound { |
|||
return nil |
|||
} |
|||
|
|||
// create table
|
|||
adminClient := gohbase.NewAdminClient(zkquorum) |
|||
cFamilies := []string{store.cfKv, store.cfMetaDir} |
|||
cf := make(map[string]map[string]string, len(cFamilies)) |
|||
for _, f := range cFamilies { |
|||
cf[f] = nil |
|||
} |
|||
ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf) |
|||
if err := adminClient.CreateTable(ct); err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { |
|||
value, err := entry.EncodeAttributesAndChunks() |
|||
if err != nil { |
|||
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) |
|||
} |
|||
if len(entry.Chunks) > 50 { |
|||
value = util.MaybeGzipData(value) |
|||
} |
|||
|
|||
return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec) |
|||
} |
|||
|
|||
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
|||
return store.InsertEntry(ctx, entry) |
|||
} |
|||
|
|||
func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) { |
|||
value, err := store.doGet(ctx, store.cfMetaDir, []byte(path)) |
|||
if err != nil { |
|||
if err == filer.ErrKvNotFound { |
|||
return nil, filer_pb.ErrNotFound |
|||
} |
|||
return nil, err |
|||
} |
|||
|
|||
entry = &filer.Entry{ |
|||
FullPath: path, |
|||
} |
|||
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)) |
|||
if err != nil { |
|||
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) |
|||
} |
|||
return entry, nil |
|||
} |
|||
|
|||
func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) { |
|||
return store.doDelete(ctx, store.cfMetaDir, []byte(path)) |
|||
} |
|||
|
|||
func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { |
|||
|
|||
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} |
|||
expectedPrefix := []byte(path.Child("")) |
|||
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
scanner := store.Client.Scan(scan) |
|||
defer scanner.Close() |
|||
for { |
|||
res, err := scanner.Next() |
|||
if err != nil { |
|||
break |
|||
} |
|||
if len(res.Cells) == 0 { |
|||
continue |
|||
} |
|||
cell := res.Cells[0] |
|||
|
|||
if !bytes.HasPrefix(cell.Row, expectedPrefix) { |
|||
break |
|||
} |
|||
fullpath := util.FullPath(cell.Row) |
|||
dir, _ := fullpath.DirAndName() |
|||
if dir != string(path) { |
|||
continue |
|||
} |
|||
|
|||
err = store.doDelete(ctx, store.cfMetaDir, cell.Row) |
|||
if err != nil { |
|||
break |
|||
} |
|||
|
|||
} |
|||
return |
|||
} |
|||
|
|||
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) { |
|||
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") |
|||
} |
|||
|
|||
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) { |
|||
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} |
|||
expectedPrefix := []byte(dirPath.Child(prefix)) |
|||
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
var entries []*filer.Entry |
|||
scanner := store.Client.Scan(scan) |
|||
defer scanner.Close() |
|||
for { |
|||
res, err := scanner.Next() |
|||
if err == io.EOF { |
|||
break |
|||
} |
|||
if err != nil { |
|||
return entries, err |
|||
} |
|||
if len(res.Cells) == 0 { |
|||
continue |
|||
} |
|||
cell := res.Cells[0] |
|||
|
|||
if !bytes.HasPrefix(cell.Row, expectedPrefix) { |
|||
break |
|||
} |
|||
|
|||
fullpath := util.FullPath(cell.Row) |
|||
dir, fileName := fullpath.DirAndName() |
|||
if dir != string(dirPath) { |
|||
continue |
|||
} |
|||
|
|||
value := cell.Value |
|||
|
|||
if fileName == startFileName && !includeStartFile { |
|||
continue |
|||
} |
|||
|
|||
limit-- |
|||
if limit < 0 { |
|||
break |
|||
} |
|||
entry := &filer.Entry{ |
|||
FullPath: fullpath, |
|||
} |
|||
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil { |
|||
err = decodeErr |
|||
glog.V(0).Infof("list %s : %v", entry.FullPath, err) |
|||
break |
|||
} |
|||
entries = append(entries, entry) |
|||
} |
|||
|
|||
return entries, nil |
|||
} |
|||
|
|||
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { |
|||
return ctx, nil |
|||
} |
|||
|
|||
func (store *HbaseStore) CommitTransaction(ctx context.Context) error { |
|||
return nil |
|||
} |
|||
|
|||
func (store *HbaseStore) RollbackTransaction(ctx context.Context) error { |
|||
return nil |
|||
} |
|||
|
|||
func (store *HbaseStore) Shutdown() { |
|||
store.Client.Close() |
|||
} |
@ -0,0 +1,75 @@ |
|||
package hbase |
|||
|
|||
import ( |
|||
"context" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/tsuna/gohbase/hrpc" |
|||
"time" |
|||
) |
|||
|
|||
const( |
|||
COLUMN_NAME = "a" |
|||
) |
|||
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
|||
return store.doPut(ctx, store.cfKv, key, value, 0) |
|||
} |
|||
|
|||
func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
|||
return store.doGet(ctx, store.cfKv, key) |
|||
} |
|||
|
|||
func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) { |
|||
return store.doDelete(ctx, store.cfKv, key) |
|||
} |
|||
|
|||
func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) { |
|||
if ttlSecond > 0 { |
|||
return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second)) |
|||
} |
|||
return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal)) |
|||
} |
|||
|
|||
func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) { |
|||
values := map[string]map[string][]byte{cf: map[string][]byte{}} |
|||
values[cf][COLUMN_NAME] = value |
|||
putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = store.Client.Put(putRequest) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (value []byte, err error) { |
|||
family := map[string][]string{cf: {COLUMN_NAME}} |
|||
getRequest, err := hrpc.NewGet(context.Background(), store.table, key, hrpc.Families(family)) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
getResp, err := store.Client.Get(getRequest) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if len(getResp.Cells) == 0 { |
|||
return nil, filer.ErrKvNotFound |
|||
} |
|||
|
|||
return getResp.Cells[0].Value, nil |
|||
} |
|||
|
|||
func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { |
|||
values := map[string]map[string][]byte{cf: map[string][]byte{}} |
|||
values[cf][COLUMN_NAME] = nil |
|||
deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
_, err = store.Client.Delete(deleteRequest) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue