Browse Source

implement hbase changes

pull/1748/head
Chris Lu 4 years ago
parent
commit
54527f0326
  1. 12
      weed/filer/hbase/hbase_store.go

12
weed/filer/hbase/hbase_store.go

@ -148,18 +148,19 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful
return return
} }
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, bool, error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
} }
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, bool, error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix)) expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil { if err != nil {
return nil, err
return nil, false, err
} }
var hasMore bool
var entries []*filer.Entry var entries []*filer.Entry
scanner := store.Client.Scan(scan) scanner := store.Client.Scan(scan)
defer scanner.Close() defer scanner.Close()
@ -169,7 +170,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
break break
} }
if err != nil { if err != nil {
return entries, err
return entries, hasMore, err
} }
if len(res.Cells) == 0 { if len(res.Cells) == 0 {
continue continue
@ -194,6 +195,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
limit-- limit--
if limit < 0 { if limit < 0 {
hasMore = true
break break
} }
entry := &filer.Entry{ entry := &filer.Entry{
@ -207,7 +209,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
entries = append(entries, entry) entries = append(entries, entry)
} }
return entries, nil
return entries, hasMore, nil
} }
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {

Loading…
Cancel
Save