You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							231 lines
						
					
					
						
							5.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							231 lines
						
					
					
						
							5.7 KiB
						
					
					
				| package hbase | |
| 
 | |
| import ( | |
| 	"bytes" | |
| 	"context" | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"github.com/tsuna/gohbase" | |
| 	"github.com/tsuna/gohbase/hrpc" | |
| 	"io" | |
| ) | |
| 
 | |
| func init() { | |
| 	filer.Stores = append(filer.Stores, &HbaseStore{}) | |
| } | |
| 
 | |
| type HbaseStore struct { | |
| 	Client    gohbase.Client | |
| 	table     []byte | |
| 	cfKv      string | |
| 	cfMetaDir string | |
| 	column    string | |
| } | |
| 
 | |
| func (store *HbaseStore) GetName() string { | |
| 	return "hbase" | |
| } | |
| 
 | |
| func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) { | |
| 	return store.initialize( | |
| 		configuration.GetString(prefix+"zkquorum"), | |
| 		configuration.GetString(prefix+"table"), | |
| 	) | |
| } | |
| 
 | |
| func (store *HbaseStore) initialize(zkquorum, table string) (err error) { | |
| 	store.Client = gohbase.NewClient(zkquorum) | |
| 	store.table = []byte(table) | |
| 	store.cfKv = "kv" | |
| 	store.cfMetaDir = "meta" | |
| 	store.column = "a" | |
| 
 | |
| 	// check table exists | |
| 	key := "whatever" | |
| 	headers := map[string][]string{store.cfMetaDir: nil} | |
| 	get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers)) | |
| 	if err != nil { | |
| 		return fmt.Errorf("NewGet returned an error: %v", err) | |
| 	} | |
| 	_, err = store.Client.Get(get) | |
| 	if err != gohbase.TableNotFound { | |
| 		return nil | |
| 	} | |
| 
 | |
| 	// create table | |
| 	adminClient := gohbase.NewAdminClient(zkquorum) | |
| 	cFamilies := []string{store.cfKv, store.cfMetaDir} | |
| 	cf := make(map[string]map[string]string, len(cFamilies)) | |
| 	for _, f := range cFamilies { | |
| 		cf[f] = nil | |
| 	} | |
| 	ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf) | |
| 	if err := adminClient.CreateTable(ct); err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { | |
| 	value, err := entry.EncodeAttributesAndChunks() | |
| 	if err != nil { | |
| 		return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) | |
| 	} | |
| 	if len(entry.Chunks) > filer.CountEntryChunksForGzip { | |
| 		value = util.MaybeGzipData(value) | |
| 	} | |
| 
 | |
| 	return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec) | |
| } | |
| 
 | |
| func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { | |
| 	return store.InsertEntry(ctx, entry) | |
| } | |
| 
 | |
| func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) { | |
| 	value, err := store.doGet(ctx, store.cfMetaDir, []byte(path)) | |
| 	if err != nil { | |
| 		if err == filer.ErrKvNotFound { | |
| 			return nil, filer_pb.ErrNotFound | |
| 		} | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	entry = &filer.Entry{ | |
| 		FullPath: path, | |
| 	} | |
| 	err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)) | |
| 	if err != nil { | |
| 		return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) | |
| 	} | |
| 	return entry, nil | |
| } | |
| 
 | |
| func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) { | |
| 	return store.doDelete(ctx, store.cfMetaDir, []byte(path)) | |
| } | |
| 
 | |
| func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { | |
| 
 | |
| 	family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} | |
| 	expectedPrefix := []byte(path.Child("")) | |
| 	scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	scanner := store.Client.Scan(scan) | |
| 	defer scanner.Close() | |
| 	for { | |
| 		res, err := scanner.Next() | |
| 		if err != nil { | |
| 			break | |
| 		} | |
| 		if len(res.Cells) == 0 { | |
| 			continue | |
| 		} | |
| 		cell := res.Cells[0] | |
| 
 | |
| 		if !bytes.HasPrefix(cell.Row, expectedPrefix) { | |
| 			break | |
| 		} | |
| 		fullpath := util.FullPath(cell.Row) | |
| 		dir, _ := fullpath.DirAndName() | |
| 		if dir != string(path) { | |
| 			continue | |
| 		} | |
| 
 | |
| 		err = store.doDelete(ctx, store.cfMetaDir, cell.Row) | |
| 		if err != nil { | |
| 			break | |
| 		} | |
| 
 | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { | |
| 	return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) | |
| } | |
| 
 | |
| func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { | |
| 	family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} | |
| 	expectedPrefix := []byte(dirPath.Child(prefix)) | |
| 	scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) | |
| 	if err != nil { | |
| 		return lastFileName, err | |
| 	} | |
| 
 | |
| 	scanner := store.Client.Scan(scan) | |
| 	defer scanner.Close() | |
| 	for { | |
| 		res, err := scanner.Next() | |
| 		if err == io.EOF { | |
| 			break | |
| 		} | |
| 		if err != nil { | |
| 			return lastFileName, err | |
| 		} | |
| 		if len(res.Cells) == 0 { | |
| 			continue | |
| 		} | |
| 		cell := res.Cells[0] | |
| 
 | |
| 		if !bytes.HasPrefix(cell.Row, expectedPrefix) { | |
| 			break | |
| 		} | |
| 
 | |
| 		fullpath := util.FullPath(cell.Row) | |
| 		dir, fileName := fullpath.DirAndName() | |
| 		if dir != string(dirPath) { | |
| 			continue | |
| 		} | |
| 
 | |
| 		value := cell.Value | |
| 
 | |
| 		if fileName == startFileName && !includeStartFile { | |
| 			continue | |
| 		} | |
| 
 | |
| 		limit-- | |
| 		if limit < 0 { | |
| 			break | |
| 		} | |
| 
 | |
| 		lastFileName = fileName | |
| 
 | |
| 		entry := &filer.Entry{ | |
| 			FullPath: fullpath, | |
| 		} | |
| 		if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil { | |
| 			err = decodeErr | |
| 			glog.V(0).Infof("list %s : %v", entry.FullPath, err) | |
| 			break | |
| 		} | |
| 		if !eachEntryFunc(entry) { | |
| 			break | |
| 		} | |
| 	} | |
| 
 | |
| 	return lastFileName, nil | |
| } | |
| 
 | |
| func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { | |
| 	return ctx, nil | |
| } | |
| 
 | |
| func (store *HbaseStore) CommitTransaction(ctx context.Context) error { | |
| 	return nil | |
| } | |
| 
 | |
| func (store *HbaseStore) RollbackTransaction(ctx context.Context) error { | |
| 	return nil | |
| } | |
| 
 | |
| func (store *HbaseStore) Shutdown() { | |
| 	store.Client.Close() | |
| }
 |