|
|
|
@ -4,6 +4,7 @@ |
|
|
|
package foundationdb |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"strings" |
|
|
|
@ -20,7 +21,6 @@ import ( |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
DIR_FILE_SEPARATOR = byte(0x00) |
|
|
|
// FoundationDB transaction size limit is 10MB
|
|
|
|
FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024 |
|
|
|
) |
|
|
|
@ -94,7 +94,7 @@ func (store *FoundationDBStore) GetName() string { |
|
|
|
func (store *FoundationDBStore) Initialize(configuration util.Configuration, prefix string) error { |
|
|
|
// Set default configuration values
|
|
|
|
configuration.SetDefault(prefix+"cluster_file", "/etc/foundationdb/fdb.cluster") |
|
|
|
configuration.SetDefault(prefix+"api_version", 630) |
|
|
|
configuration.SetDefault(prefix+"api_version", 740) |
|
|
|
configuration.SetDefault(prefix+"timeout", "5s") |
|
|
|
configuration.SetDefault(prefix+"max_retry_delay", "1s") |
|
|
|
configuration.SetDefault(prefix+"directory_prefix", "seaweedfs") |
|
|
|
@ -210,6 +210,12 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En |
|
|
|
value = util.MaybeGzipData(value) |
|
|
|
} |
|
|
|
|
|
|
|
// Check transaction size limit
|
|
|
|
if len(value) > FDB_TRANSACTION_SIZE_LIMIT { |
|
|
|
return fmt.Errorf("entry %s exceeds FoundationDB transaction size limit (%d > %d bytes)", |
|
|
|
entry.FullPath, len(value), FDB_TRANSACTION_SIZE_LIMIT) |
|
|
|
} |
|
|
|
|
|
|
|
// Check if there's a transaction in context
|
|
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists { |
|
|
|
tx.Set(key, value) |
|
|
|
@ -323,25 +329,43 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context |
|
|
|
limit = 1000 |
|
|
|
} |
|
|
|
|
|
|
|
// Construct tight range based on directory and prefix
|
|
|
|
directoryPrefix := store.genDirectoryKeyPrefix(string(dirPath), prefix) |
|
|
|
startKey := store.genDirectoryKeyPrefix(string(dirPath), startFileName) |
|
|
|
|
|
|
|
if !includeStartFile { |
|
|
|
startKey = append(startKey, 0x00) |
|
|
|
// Determine the actual start key
|
|
|
|
var startKey fdb.Key |
|
|
|
if startFileName != "" { |
|
|
|
// Start from the specified file within the prefix range
|
|
|
|
startKey = store.genDirectoryKeyPrefix(string(dirPath), startFileName) |
|
|
|
if !includeStartFile { |
|
|
|
startKey = append(startKey, 0x00) |
|
|
|
} |
|
|
|
// Ensure startKey is within the prefix range
|
|
|
|
if !bytes.HasPrefix(startKey, directoryPrefix) { |
|
|
|
// If startFileName is before the prefix, start at prefix
|
|
|
|
if bytes.Compare(startKey, directoryPrefix) < 0 { |
|
|
|
startKey = directoryPrefix |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
startKey = directoryPrefix |
|
|
|
} |
|
|
|
|
|
|
|
// End key is the prefix range end
|
|
|
|
endKey := prefixEndFallback(directoryPrefix) |
|
|
|
|
|
|
|
var kvs []fdb.KeyValue |
|
|
|
var rangeErr error |
|
|
|
// Check if there's a transaction in context
|
|
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists { |
|
|
|
kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEndFallback(directoryPrefix)} |
|
|
|
kr := fdb.KeyRange{Begin: startKey, End: endKey} |
|
|
|
kvs, rangeErr = tx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError() |
|
|
|
if rangeErr != nil { |
|
|
|
return "", fmt.Errorf("scanning %s: %v", dirPath, rangeErr) |
|
|
|
} |
|
|
|
} else { |
|
|
|
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { |
|
|
|
kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEndFallback(directoryPrefix)} |
|
|
|
kr := fdb.KeyRange{Begin: startKey, End: endKey} |
|
|
|
kvSlice, err := rtr.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError() |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
@ -360,7 +384,8 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if !strings.HasPrefix(fileName, prefix) { |
|
|
|
// Prefix filtering is now handled by the range scan, but double-check for safety
|
|
|
|
if prefix != "" && !strings.HasPrefix(fileName, prefix) { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
|