You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							231 lines
						
					
					
						
							5.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							231 lines
						
					
					
						
							5.7 KiB
						
					
					
				
								package hbase
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"bytes"
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"github.com/tsuna/gohbase"
							 | 
						|
									"github.com/tsuna/gohbase/hrpc"
							 | 
						|
									"io"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func init() {
							 | 
						|
									filer.Stores = append(filer.Stores, &HbaseStore{})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type HbaseStore struct {
							 | 
						|
									Client    gohbase.Client
							 | 
						|
									table     []byte
							 | 
						|
									cfKv      string
							 | 
						|
									cfMetaDir string
							 | 
						|
									column    string
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) GetName() string {
							 | 
						|
									return "hbase"
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
							 | 
						|
									return store.initialize(
							 | 
						|
										configuration.GetString(prefix+"zkquorum"),
							 | 
						|
										configuration.GetString(prefix+"table"),
							 | 
						|
									)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
							 | 
						|
									store.Client = gohbase.NewClient(zkquorum)
							 | 
						|
									store.table = []byte(table)
							 | 
						|
									store.cfKv = "kv"
							 | 
						|
									store.cfMetaDir = "meta"
							 | 
						|
									store.column = "a"
							 | 
						|
								
							 | 
						|
									// check table exists
							 | 
						|
									key := "whatever"
							 | 
						|
									headers := map[string][]string{store.cfMetaDir: nil}
							 | 
						|
									get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("NewGet returned an error: %w", err)
							 | 
						|
									}
							 | 
						|
									_, err = store.Client.Get(get)
							 | 
						|
									if err != gohbase.TableNotFound {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// create table
							 | 
						|
									adminClient := gohbase.NewAdminClient(zkquorum)
							 | 
						|
									cFamilies := []string{store.cfKv, store.cfMetaDir}
							 | 
						|
									cf := make(map[string]map[string]string, len(cFamilies))
							 | 
						|
									for _, f := range cFamilies {
							 | 
						|
										cf[f] = nil
							 | 
						|
									}
							 | 
						|
									ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
							 | 
						|
									if err := adminClient.CreateTable(ct); err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
							 | 
						|
									value, err := entry.EncodeAttributesAndChunks()
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
							 | 
						|
									}
							 | 
						|
									if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
							 | 
						|
										value = util.MaybeGzipData(value)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
							 | 
						|
									return store.InsertEntry(ctx, entry)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
							 | 
						|
									value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
							 | 
						|
									if err != nil {
							 | 
						|
										if err == filer.ErrKvNotFound {
							 | 
						|
											return nil, filer_pb.ErrNotFound
							 | 
						|
										}
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									entry = &filer.Entry{
							 | 
						|
										FullPath: path,
							 | 
						|
									}
							 | 
						|
									err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
							 | 
						|
									if err != nil {
							 | 
						|
										return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
							 | 
						|
									}
							 | 
						|
									return entry, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
							 | 
						|
									return store.doDelete(ctx, store.cfMetaDir, []byte(path))
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
							 | 
						|
								
							 | 
						|
									family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
							 | 
						|
									expectedPrefix := []byte(path.Child(""))
							 | 
						|
									scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									scanner := store.Client.Scan(scan)
							 | 
						|
									defer scanner.Close()
							 | 
						|
									for {
							 | 
						|
										res, err := scanner.Next()
							 | 
						|
										if err != nil {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										if len(res.Cells) == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										cell := res.Cells[0]
							 | 
						|
								
							 | 
						|
										if !bytes.HasPrefix(cell.Row, expectedPrefix) {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										fullpath := util.FullPath(cell.Row)
							 | 
						|
										dir, _ := fullpath.DirAndName()
							 | 
						|
										if dir != string(path) {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
							 | 
						|
										if err != nil {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
									}
							 | 
						|
									return
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
							 | 
						|
									return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
							 | 
						|
									family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
							 | 
						|
									expectedPrefix := []byte(dirPath.Child(prefix))
							 | 
						|
									scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
							 | 
						|
									if err != nil {
							 | 
						|
										return lastFileName, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									scanner := store.Client.Scan(scan)
							 | 
						|
									defer scanner.Close()
							 | 
						|
									for {
							 | 
						|
										res, err := scanner.Next()
							 | 
						|
										if err == io.EOF {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										if err != nil {
							 | 
						|
											return lastFileName, err
							 | 
						|
										}
							 | 
						|
										if len(res.Cells) == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										cell := res.Cells[0]
							 | 
						|
								
							 | 
						|
										if !bytes.HasPrefix(cell.Row, expectedPrefix) {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										fullpath := util.FullPath(cell.Row)
							 | 
						|
										dir, fileName := fullpath.DirAndName()
							 | 
						|
										if dir != string(dirPath) {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										value := cell.Value
							 | 
						|
								
							 | 
						|
										if fileName == startFileName && !includeStartFile {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										limit--
							 | 
						|
										if limit < 0 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										lastFileName = fileName
							 | 
						|
								
							 | 
						|
										entry := &filer.Entry{
							 | 
						|
											FullPath: fullpath,
							 | 
						|
										}
							 | 
						|
										if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
							 | 
						|
											err = decodeErr
							 | 
						|
											glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										if !eachEntryFunc(entry) {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return lastFileName, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
							 | 
						|
									return ctx, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (store *HbaseStore) Shutdown() {
							 | 
						|
									store.Client.Close()
							 | 
						|
								}
							 |