diff --git a/go.mod b/go.mod index 0f1e08b21..4c1f38570 100644 --- a/go.mod +++ b/go.mod @@ -123,7 +123,7 @@ require ( github.com/Jille/raft-grpc-transport v1.6.1 github.com/ThreeDotsLabs/watermill v1.5.1 github.com/a-h/templ v0.3.943 - github.com/apple/foundationdb/bindings/go v0.0.0-20240515141816-262c6fe778ad + github.com/apple/foundationdb/bindings/go v0.0.0-20250911184653-27f7192f47c3 github.com/arangodb/go-driver v1.6.9 github.com/armon/go-metrics v0.4.1 github.com/aws/aws-sdk-go-v2 v1.40.1 diff --git a/go.sum b/go.sum index 7db73b0f9..3349d8a68 100644 --- a/go.sum +++ b/go.sum @@ -653,6 +653,8 @@ github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0I github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apple/foundationdb/bindings/go v0.0.0-20240515141816-262c6fe778ad h1:fQBkhYv86zyW95PWhzBlkgz3NoY1ue0L+8oYBaoCMbg= github.com/apple/foundationdb/bindings/go v0.0.0-20240515141816-262c6fe778ad/go.mod h1:OMVSB21p9+xQUIqlGizHPZfjK+SHws1ht+ZytVDoz9U= +github.com/apple/foundationdb/bindings/go v0.0.0-20250911184653-27f7192f47c3 h1:WZaTKNHCfcw7fWSR6/RKnCldVzvYZC+Y20Su4lffEIg= +github.com/apple/foundationdb/bindings/go v0.0.0-20250911184653-27f7192f47c3/go.mod h1:OMVSB21p9+xQUIqlGizHPZfjK+SHws1ht+ZytVDoz9U= github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc h1:LoL75er+LKDHDUfU5tRvFwxH0LjPpZN8OoG8Ll+liGU= github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc/go.mod h1:w648aMHEgFYS6xb0KVMMtZ2uMeemhiKCuD2vj6gY52A= github.com/arangodb/go-driver v1.6.9 h1:zckB+xuA16NmHUuYOX7INCJTIyIkoBQjAGqNpiyf2HQ= diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go index 720afd7bc..852ad2701 100644 --- a/weed/filer/foundationdb/foundationdb_store.go +++ b/weed/filer/foundationdb/foundationdb_store.go @@ -22,6 +22,7 @@ import ( "bytes" "context" "fmt" + "sync" "time" "github.com/apple/foundationdb/bindings/go/src/fdb" @@ -37,15 +38,146 @@ import ( const ( // FoundationDB transaction size limit is 10MB FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024 + // Safe limit for batch size (leave margin for FDB overhead) + FDB_BATCH_SIZE_LIMIT = 8 * 1024 * 1024 // Maximum number of entries to return in a single directory listing // Large batches can cause transaction timeouts and increase memory pressure MAX_DIRECTORY_LIST_LIMIT = 1000 + + // Write batching defaults + DEFAULT_BATCH_SIZE = 100 + DEFAULT_BATCH_INTERVAL = 5 * time.Millisecond ) func init() { filer.Stores = append(filer.Stores, &FoundationDBStore{}) } +// writeOp represents a pending write operation +type writeOp struct { + key fdb.Key + value []byte // nil for delete + done chan error +} + +// opSize returns the approximate size of an operation in bytes +func (op *writeOp) size() int { + return len(op.key) + len(op.value) +} + +// writeBatcher batches multiple writes into single transactions +type writeBatcher struct { + store *FoundationDBStore + ops chan *writeOp + stop chan struct{} + wg sync.WaitGroup + size int + interval time.Duration +} + +func newWriteBatcher(store *FoundationDBStore, size int, interval time.Duration) *writeBatcher { + b := &writeBatcher{ + store: store, + ops: make(chan *writeOp, size*10), + stop: make(chan struct{}), + size: size, + interval: interval, + } + b.wg.Add(1) + go b.run() + return b +} + +func (b *writeBatcher) run() { + defer b.wg.Done() + batch := make([]*writeOp, 0, b.size) + batchBytes := 0 // Track cumulative size of batch + timer := time.NewTimer(b.interval) + defer timer.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + _, err := b.store.database.Transact(func(tr fdb.Transaction) (interface{}, error) { + for _, op := range batch { + if op.value != nil { + tr.Set(op.key, op.value) + } else { + tr.Clear(op.key) + } + } + return nil, nil + }) + for _, op := range batch { + if op.done != nil { + op.done <- err + close(op.done) + } + } + batch = batch[:0] + batchBytes = 0 + } + + resetTimer := func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(b.interval) + } + + for { + select { + case op := <-b.ops: + batch = append(batch, op) + batchBytes += op.size() + // Flush when batch count or size limit is reached + if len(batch) >= b.size || batchBytes >= FDB_BATCH_SIZE_LIMIT { + flush() + resetTimer() + } + case <-timer.C: + flush() + // Timer already fired, safe to reset directly + timer.Reset(b.interval) + case <-b.stop: + for { + select { + case op := <-b.ops: + batch = append(batch, op) + default: + flush() + return + } + } + } + } +} + +func (b *writeBatcher) submit(key fdb.Key, value []byte, wait bool) error { + op := &writeOp{key: key, value: value} + if wait { + op.done = make(chan error, 1) + } + select { + case b.ops <- op: + if wait { + return <-op.done + } + return nil + case <-b.stop: + return fmt.Errorf("batcher stopped") + } +} + +func (b *writeBatcher) shutdown() { + close(b.stop) + b.wg.Wait() +} + type FoundationDBStore struct { database fdb.Database seaweedfsDir directory.DirectorySubspace @@ -53,6 +185,10 @@ type FoundationDBStore struct { directoryPrefix string timeout time.Duration maxRetryDelay time.Duration + // Write batching + batcher *writeBatcher + batchSize int + batchInterval time.Duration } // Context key type for storing transactions @@ -89,6 +225,8 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre configuration.SetDefault(prefix+"timeout", "5s") configuration.SetDefault(prefix+"max_retry_delay", "1s") configuration.SetDefault(prefix+"directory_prefix", "seaweedfs") + configuration.SetDefault(prefix+"batch_size", DEFAULT_BATCH_SIZE) + configuration.SetDefault(prefix+"batch_interval", DEFAULT_BATCH_INTERVAL.String()) clusterFile := configuration.GetString(prefix + "cluster_file") apiVersion := configuration.GetInt(prefix + "api_version") @@ -108,6 +246,18 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre return fmt.Errorf("invalid max_retry_delay duration %s: %w", maxRetryDelayStr, err) } + // Parse batch configuration + store.batchSize = configuration.GetInt(prefix + "batch_size") + if store.batchSize <= 0 { + store.batchSize = DEFAULT_BATCH_SIZE + } + batchIntervalStr := configuration.GetString(prefix + "batch_interval") + store.batchInterval, err = time.ParseDuration(batchIntervalStr) + if err != nil { + glog.Warningf("invalid %sbatch_interval duration %q, using default %v: %v", prefix, batchIntervalStr, DEFAULT_BATCH_INTERVAL, err) + store.batchInterval = DEFAULT_BATCH_INTERVAL + } + return store.initialize(clusterFile, apiVersion) } @@ -138,6 +288,11 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e return fmt.Errorf("failed to create/open kv directory: %w", err) } + // Start write batcher for improved throughput + store.batcher = newWriteBatcher(store, store.batchSize, store.batchInterval) + glog.V(0).Infof("FoundationDB: write batching enabled (batch_size=%d, batch_interval=%v)", + store.batchSize, store.batchInterval) + glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix) return nil } @@ -215,7 +370,12 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En return nil } - // Execute in a new transaction if not in an existing one + // Use write batcher for better throughput + if store.batcher != nil { + return store.batcher.submit(key, value, true) + } + + // Fallback: execute in a new transaction _, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) { tr.Set(key, value) return nil, nil @@ -276,7 +436,12 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F return nil } - // Execute in a new transaction if not in an existing one + // Use write batcher for better throughput (nil value = delete) + if store.batcher != nil { + return store.batcher.submit(key, nil, true) + } + + // Fallback: execute in a new transaction _, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) { tr.Clear(key) return nil, nil @@ -556,6 +721,11 @@ func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error } func (store *FoundationDBStore) Shutdown() { + // Stop write batcher + if store.batcher != nil { + store.batcher.shutdown() + store.batcher = nil + } // FoundationDB doesn't have an explicit close method for Database glog.V(0).Infof("FoundationDB store shutdown") }