From 6d7cd2b73d72768e8b9916718b7dada04f2984b8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 28 Aug 2025 20:26:49 -0700 Subject: [PATCH] Update foundationdb_store.go --- weed/filer/foundationdb/foundationdb_store.go | 136 +++++++++--------- 1 file changed, 68 insertions(+), 68 deletions(-) diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go index 728246f03..da2ed551c 100644 --- a/weed/filer/foundationdb/foundationdb_store.go +++ b/weed/filer/foundationdb/foundationdb_store.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/apple/foundationdb/bindings/go/src/fdb" @@ -61,9 +60,23 @@ type FoundationDBStore struct { directoryPrefix string timeout time.Duration maxRetryDelay time.Duration - txMu sync.RWMutex - isInTransaction bool - currentTx fdb.Transaction +} + +// Context key type for storing transactions +type contextKey string + +const transactionKey contextKey = "fdb_transaction" + +// Helper functions for context-scoped transactions +func (store *FoundationDBStore) getTransactionFromContext(ctx context.Context) (fdb.Transaction, bool) { + if tx, ok := ctx.Value(transactionKey).(fdb.Transaction); ok { + return tx, true + } + return nil, false +} + +func (store *FoundationDBStore) setTransactionInContext(ctx context.Context, tx fdb.Transaction) context.Context { + return context.WithValue(ctx, transactionKey, tx) } func (store *FoundationDBStore) GetName() string { @@ -132,44 +145,47 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e } func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { - store.txMu.Lock() - defer store.txMu.Unlock() - - if store.isInTransaction { - return ctx, fmt.Errorf("transaction already in progress") + // Check if there's already a transaction in this context + if _, exists := store.getTransactionFromContext(ctx); exists { + return ctx, fmt.Errorf("transaction already in progress for this context") } - store.currentTx, _ = store.database.CreateTransaction() - store.isInTransaction = true + // Create a new transaction + tx, err := store.database.CreateTransaction() + if err != nil { + return ctx, fmt.Errorf("failed to create transaction: %v", err) + } - return ctx, nil + // Store the transaction in context and return the new context + newCtx := store.setTransactionInContext(ctx, tx) + return newCtx, nil } func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error { - store.txMu.Lock() - defer store.txMu.Unlock() - - if !store.isInTransaction { - return fmt.Errorf("no transaction in progress") + // Get transaction from context + tx, exists := store.getTransactionFromContext(ctx) + if !exists { + return fmt.Errorf("no transaction in progress for this context") } - err := store.currentTx.Commit().Get() - store.isInTransaction = false + // Commit the transaction + err := tx.Commit().Get() + if err != nil { + return fmt.Errorf("failed to commit transaction: %v", err) + } - return err + return nil } func (store *FoundationDBStore) RollbackTransaction(ctx context.Context) error { - store.txMu.Lock() - defer store.txMu.Unlock() - - if !store.isInTransaction { - return fmt.Errorf("no transaction in progress") + // Get transaction from context + tx, exists := store.getTransactionFromContext(ctx) + if !exists { + return fmt.Errorf("no transaction in progress for this context") } - store.currentTx.Cancel() - store.isInTransaction = false - + // Cancel the transaction + tx.Cancel() return nil } @@ -189,11 +205,9 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En value = util.MaybeGzipData(value) } - store.txMu.RLock() - defer store.txMu.RUnlock() - - if store.isInTransaction { - store.currentTx.Set(key, value) + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + tx.Set(key, value) return nil } @@ -213,12 +227,10 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { key := store.genKey(util.FullPath(fullpath).DirAndName()) - store.txMu.RLock() - defer store.txMu.RUnlock() - var data []byte - if store.isInTransaction { - data, err = store.currentTx.Get(key).Get() + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + data, err = tx.Get(key).Get() } else { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { return rtr.Get(key).Get() @@ -253,11 +265,9 @@ func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.Ful func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { key := store.genKey(util.FullPath(fullpath).DirAndName()) - store.txMu.RLock() - defer store.txMu.RUnlock() - - if store.isInTransaction { - store.currentTx.Clear(key) + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + tx.Clear(key) return nil } @@ -277,12 +287,10 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { directoryPrefix := store.genDirectoryKeyPrefix(string(fullpath), "") - store.txMu.RLock() - defer store.txMu.RUnlock() - - if store.isInTransaction { + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { kr := fdb.KeyRange{Begin: directoryPrefix, End: prefixEnd(directoryPrefix)} - store.currentTx.ClearRange(kr) + tx.ClearRange(kr) return nil } @@ -316,13 +324,11 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context startKey = append(startKey, 0x00) } - store.txMu.RLock() - defer store.txMu.RUnlock() - var kvs []fdb.KeyValue - if store.isInTransaction { + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)} - kvs = store.currentTx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceOrPanic() + kvs = tx.GetRange(kr, fdb.RangeOptions{Limit: int(limit)}).GetSliceOrPanic() } else { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { kr := fdb.KeyRange{Begin: fdb.Key(startKey), End: prefixEnd(directoryPrefix)} @@ -366,11 +372,9 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error { fdbKey := store.kvDir.Pack(tuple.Tuple{key}) - store.txMu.RLock() - defer store.txMu.RUnlock() - - if store.isInTransaction { - store.currentTx.Set(fdbKey, value) + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + tx.Set(fdbKey, value) return nil } @@ -385,14 +389,12 @@ func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []b func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { fdbKey := store.kvDir.Pack(tuple.Tuple{key}) - store.txMu.RLock() - defer store.txMu.RUnlock() - var data []byte var err error - if store.isInTransaction { - data, err = store.currentTx.Get(fdbKey).Get() + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + data, err = tx.Get(fdbKey).Get() } else { result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { return rtr.Get(fdbKey).Get() @@ -414,11 +416,9 @@ func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error { fdbKey := store.kvDir.Pack(tuple.Tuple{key}) - store.txMu.RLock() - defer store.txMu.RUnlock() - - if store.isInTransaction { - store.currentTx.Clear(fdbKey) + // Check if there's a transaction in context + if tx, exists := store.getTransactionFromContext(ctx); exists { + tx.Clear(fdbKey) return nil }