Browse Source

use the more idiomatic FoundationDB KeySelectors

pull/7178/head
chrislu 2 months ago
parent
commit
da121f2a4b
  1. 29
      weed/filer/foundationdb/foundationdb_store.go

29
weed/filer/foundationdb/foundationdb_store.go

@ -307,47 +307,52 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context
return "", fmt.Errorf("creating prefix range for %s: %v", dirPath, err)
}
// Determine start key based on startFileName and prefix
// Determine start key and selector based on startFileName and prefix
var startKey fdb.Key
var beginSelector fdb.KeySelector
if startFileName != "" {
// Start from the specified file
if !includeStartFile {
// Use next possible tuple to exclude start key (append \x00 to fileName in tuple, not to packed bytes)
startKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), startFileName + "\x00"})
startKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), startFileName})
// Use KeySelector for idiomatic FDB range scanning
if includeStartFile {
beginSelector = fdb.FirstGreaterOrEqual(startKey)
} else {
startKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), startFileName})
beginSelector = fdb.FirstGreaterThan(startKey)
}
// If prefix is specified and startFileName doesn't match, adjust
if prefix != "" && !strings.HasPrefix(startFileName, prefix) {
if startFileName < prefix {
// Start from prefix if startFileName is before it
startKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), prefix})
beginSelector = fdb.FirstGreaterOrEqual(startKey)
}
}
} else if prefix != "" {
// Start from prefix
startKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), prefix})
beginSelector = fdb.FirstGreaterOrEqual(startKey)
} else {
// Start from beginning of directory
startKey = dirRange.Begin
beginSelector = fdb.FirstGreaterOrEqual(dirRange.Begin)
}
// End key is from the directory range
endKey := dirRange.End
// End selector is from the directory range
endSelector := fdb.FirstGreaterOrEqual(dirRange.End)
var kvs []fdb.KeyValue
var rangeErr error
// Check if there's a transaction in context
if tx, exists := store.getTransactionFromContext(ctx); exists {
kr := fdb.KeyRange{Begin: startKey, End: endKey}
kvs, rangeErr = tx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
kvs, rangeErr = tx.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
if rangeErr != nil {
return "", fmt.Errorf("scanning %s: %v", dirPath, rangeErr)
}
} else {
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
kr := fdb.KeyRange{Begin: startKey, End: endKey}
kvSlice, err := rtr.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
kvSlice, err := rtr.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
if err != nil {
return nil, err
}

Loading…
Cancel
Save