Browse Source

implement etcd changes

pull/1748/head
Chris Lu 4 years ago
parent
commit
5ef43b9b09
  1. 17
      weed/filer/etcd/etcd_store.go

17
weed/filer/etcd/etcd_store.go

@ -139,17 +139,17 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil return nil
} }
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed
func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
} }
func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int) (entries []*filer.Entry, hasMore bool, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, "")
resp, err := store.client.Get(ctx, string(directoryPrefix), resp, err := store.client.Get(ctx, string(directoryPrefix),
clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
if err != nil { if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
return nil, false, fmt.Errorf("list %s : %v", dirPath, err)
} }
for _, kv := range resp.Kvs { for _, kv := range resp.Kvs {
@ -157,15 +157,16 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
if fileName == "" { if fileName == "" {
continue continue
} }
if fileName == startFileName && !inclusive {
if fileName == startFileName && !includeStartFile {
continue continue
} }
limit-- limit--
if limit < 0 { if limit < 0 {
hasMore = true
break break
} }
entry := &filer.Entry{ entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
FullPath: weed_util.NewFullPath(string(dirPath), fileName),
} }
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil { if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr err = decodeErr
@ -175,7 +176,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
entries = append(entries, entry) entries = append(entries, entry)
} }
return entries, err
return entries, hasMore, err
} }
func genKey(dirPath, fileName string) (key []byte) { func genKey(dirPath, fileName string) (key []byte) {

Loading…
Cancel
Save