|
|
|
@ -77,12 +77,12 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre |
|
|
|
var err error |
|
|
|
store.timeout, err = time.ParseDuration(timeoutStr) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("invalid timeout duration %s: %v", timeoutStr, err) |
|
|
|
return fmt.Errorf("invalid timeout duration %s: %w", timeoutStr, err) |
|
|
|
} |
|
|
|
|
|
|
|
store.maxRetryDelay, err = time.ParseDuration(maxRetryDelayStr) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("invalid max_retry_delay duration %s: %v", maxRetryDelayStr, err) |
|
|
|
return fmt.Errorf("invalid max_retry_delay duration %s: %w", maxRetryDelayStr, err) |
|
|
|
} |
|
|
|
|
|
|
|
return store.initialize(clusterFile, apiVersion) |
|
|
|
@ -98,19 +98,19 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e |
|
|
|
var err error |
|
|
|
store.database, err = fdb.OpenDatabase(clusterFile) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to open FoundationDB database: %v", err) |
|
|
|
return fmt.Errorf("failed to open FoundationDB database: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Create/open seaweedfs directory
|
|
|
|
store.seaweedfsDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix}, nil) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to create/open seaweedfs directory: %v", err) |
|
|
|
return fmt.Errorf("failed to create/open seaweedfs directory: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Create/open kv subdirectory for key-value operations
|
|
|
|
store.kvDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix, "kv"}, nil) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to create/open kv directory: %v", err) |
|
|
|
return fmt.Errorf("failed to create/open kv directory: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix) |
|
|
|
@ -126,7 +126,7 @@ func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.C |
|
|
|
// Create a new transaction
|
|
|
|
tx, err := store.database.CreateTransaction() |
|
|
|
if err != nil { |
|
|
|
return ctx, fmt.Errorf("failed to create transaction: %v", err) |
|
|
|
return ctx, fmt.Errorf("failed to create transaction: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Store the transaction in context and return the new context
|
|
|
|
@ -144,7 +144,7 @@ func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error { |
|
|
|
// Commit the transaction
|
|
|
|
err := tx.Commit().Get() |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to commit transaction: %v", err) |
|
|
|
return fmt.Errorf("failed to commit transaction: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
@ -171,7 +171,7 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En |
|
|
|
|
|
|
|
value, err := entry.EncodeAttributesAndChunks() |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) |
|
|
|
return fmt.Errorf("encoding %s %+v: %w", entry.FullPath, entry.Attr, err) |
|
|
|
} |
|
|
|
|
|
|
|
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { |
|
|
|
@ -197,7 +197,7 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("persisting %s: %v", entry.FullPath, err) |
|
|
|
return fmt.Errorf("persisting %s: %w", entry.FullPath, err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
@ -223,7 +223,7 @@ func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.Ful |
|
|
|
} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return nil, filer_pb.ErrNotFound |
|
|
|
return nil, fmt.Errorf("find entry %s: %w", fullpath, err) |
|
|
|
} |
|
|
|
|
|
|
|
if len(data) == 0 { |
|
|
|
@ -236,7 +236,7 @@ func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.Ful |
|
|
|
|
|
|
|
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)) |
|
|
|
if err != nil { |
|
|
|
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) |
|
|
|
return entry, fmt.Errorf("decode %s : %w", entry.FullPath, err) |
|
|
|
} |
|
|
|
|
|
|
|
return entry, nil |
|
|
|
@ -258,7 +258,7 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("deleting %s: %v", fullpath, err) |
|
|
|
return fmt.Errorf("deleting %s: %w", fullpath, err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
@ -269,7 +269,7 @@ func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpa |
|
|
|
prefixBytes := store.seaweedfsDir.Pack(tuple.Tuple{string(fullpath)}) |
|
|
|
kr, err := fdb.PrefixRange(prefixBytes) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("creating prefix range for %s: %v", fullpath, err) |
|
|
|
return fmt.Errorf("creating prefix range for %s: %w", fullpath, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Check if there's a transaction in context
|
|
|
|
@ -285,7 +285,7 @@ func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpa |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("deleting folder children %s: %v", fullpath, err) |
|
|
|
return fmt.Errorf("deleting folder children %s: %w", fullpath, err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
@ -296,6 +296,8 @@ func (store *FoundationDBStore) ListDirectoryEntries(ctx context.Context, dirPat |
|
|
|
} |
|
|
|
|
|
|
|
func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { |
|
|
|
// Cap limit at 1000 for optimal FoundationDB performance
|
|
|
|
// Large batches can cause transaction timeouts and increase memory pressure
|
|
|
|
if limit > 1000 { |
|
|
|
limit = 1000 |
|
|
|
} |
|
|
|
@ -304,7 +306,7 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context |
|
|
|
dirPrefixBytes := store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath)}) |
|
|
|
dirRange, err := fdb.PrefixRange(dirPrefixBytes) |
|
|
|
if err != nil { |
|
|
|
return "", fmt.Errorf("creating prefix range for %s: %v", dirPath, err) |
|
|
|
return "", fmt.Errorf("creating prefix range for %s: %w", dirPath, err) |
|
|
|
} |
|
|
|
|
|
|
|
// Determine start key and selector based on startFileName and prefix
|
|
|
|
@ -347,7 +349,7 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context |
|
|
|
sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector} |
|
|
|
kvs, rangeErr = tx.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError() |
|
|
|
if rangeErr != nil { |
|
|
|
return "", fmt.Errorf("scanning %s: %v", dirPath, rangeErr) |
|
|
|
return "", fmt.Errorf("scanning %s: %w", dirPath, rangeErr) |
|
|
|
} |
|
|
|
} else { |
|
|
|
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { |
|
|
|
@ -359,7 +361,7 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context |
|
|
|
return kvSlice, nil |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return "", fmt.Errorf("scanning %s: %v", dirPath, err) |
|
|
|
return "", fmt.Errorf("scanning %s: %w", dirPath, err) |
|
|
|
} |
|
|
|
kvs = result.([]fdb.KeyValue) |
|
|
|
} |
|
|
|
@ -434,10 +436,10 @@ func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if err != nil || len(data) == 0 { |
|
|
|
if err != nil { |
|
|
|
glog.V(2).Infof("KvGet error for key %v: %v", key, err) |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("kv get %s: %w", string(key), err) |
|
|
|
} |
|
|
|
if len(data) == 0 { |
|
|
|
return nil, filer.ErrKvNotFound |
|
|
|
} |
|
|
|
|
|
|
|
|