Browse Source

fix return eachEntryFunc

pull/7485/head
Roman Tamarov 2 months ago
parent
commit
9ebc27270c
  1. 10
      weed/filer/elastic/v7/elastic_store.go
  2. 20
      weed/filer/rocksdb/rocksdb_store.go
  3. 9
      weed/filer/tarantool/tarantool_store.go
  4. 18
      weed/filer/tikv/tikv_store.go
  5. 7
      weed/filer/ydb/ydb_store.go

10
weed/filer/elastic/v7/elastic_store.go

@ -258,9 +258,17 @@ func (store *ElasticStore) listDirectoryEntries(
if fileName == startFileName && !inclusive {
continue
}
if !eachEntryFunc(esEntry.Entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(esEntry.Entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
return lastFileName, fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
}
if !resEachEntryFunc {
break
}
lastFileName = fileName
}
}

20
weed/filer/rocksdb/rocksdb_store.go

@ -251,6 +251,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
defer ro.Destroy()
ro.SetFillCache(false)
var callbackErr error
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool {
@ -269,11 +270,28 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
return false
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
callbackErr = resEachEntryFuncErr
return false
}
if !resEachEntryFunc {
return false
}
return true
})
if callbackErr != nil {
return lastFileName, fmt.Errorf(
"failed to process eachEntryFunc for dir %q, entry %q: %w",
dirPath, lastFileName, callbackErr,
)
}
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}

9
weed/filer/tarantool/tarantool_store.go

@ -305,7 +305,14 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
if !resEachEntryFunc {
break
}
}

18
weed/filer/tikv/tikv_store.go

@ -223,6 +223,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
if err != nil {
return lastFileName, err
}
var callbackErr error
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
iter, err := txn.Iter(lastFileStart, nil)
if err != nil {
@ -283,12 +284,27 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
// Only increment counter for non-expired entries
i++
if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
callbackErr = resEachEntryFuncErr
break
}
if err := iter.Next(); !resEachEntryFunc || err != nil {
break
}
}
return err
})
if callbackErr != nil {
return lastFileName, fmt.Errorf(
"failed to process eachEntryFunc for dir %q, entry %q: %w",
dirPath, lastFileName, callbackErr,
)
}
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}

7
weed/filer/ydb/ydb_store.go

@ -313,7 +313,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr)
}
if !eachEntryFunc(entry) {
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
return fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
}
if !resEachEntryFunc {
return nil
}

Loading…
Cancel
Save