diff --git a/seaweed-volume/src/server/write_queue.rs b/seaweed-volume/src/server/write_queue.rs index 6986f7ab9..3e1861bb2 100644 --- a/seaweed-volume/src/server/write_queue.rs +++ b/seaweed-volume/src/server/write_queue.rs @@ -29,6 +29,10 @@ pub struct WriteRequest { /// Maximum number of write requests to batch together. const MAX_BATCH_SIZE: usize = 128; +/// Maximum bytes to accumulate per batch before breaking (matches Go's 4MB limit). +/// This prevents large writes from accumulating unbounded latency. +const MAX_BATCH_BYTES: usize = 4 * 1024 * 1024; + /// Handle for submitting write requests to the background worker. #[derive(Clone)] pub struct WriteQueue { @@ -97,13 +101,18 @@ impl WriteQueueWorker { } }; - // Drain as many additional requests as available, up to MAX_BATCH_SIZE. + // Drain as many additional requests as available, up to MAX_BATCH_SIZE + // or MAX_BATCH_BYTES (matches Go: 128 requests or 4MB, whichever comes first). let mut batch = Vec::with_capacity(MAX_BATCH_SIZE); + let mut batch_bytes: usize = first.needle.data.len(); batch.push(first); - while batch.len() < MAX_BATCH_SIZE { + while batch.len() < MAX_BATCH_SIZE && batch_bytes < MAX_BATCH_BYTES { match self.rx.try_recv() { - Ok(req) => batch.push(req), + Ok(req) => { + batch_bytes += req.needle.data.len(); + batch.push(req); + } Err(_) => break, } }