@ -29,6 +29,10 @@ pub struct WriteRequest {
/// Maximum number of write requests to batch together.
/// Maximum number of write requests to batch together.
const MAX_BATCH_SIZE : usize = 128 ;
const MAX_BATCH_SIZE : usize = 128 ;
/// Maximum bytes to accumulate per batch before breaking (matches Go's 4MB limit).
/// This prevents large writes from accumulating unbounded latency.
const MAX_BATCH_BYTES : usize = 4 * 1024 * 1024 ;
/// Handle for submitting write requests to the background worker.
/// Handle for submitting write requests to the background worker.
#[ derive(Clone) ]
#[ derive(Clone) ]
pub struct WriteQueue {
pub struct WriteQueue {
@ -97,13 +101,18 @@ impl WriteQueueWorker {
}
}
} ;
} ;
// Drain as many additional requests as available, up to MAX_BATCH_SIZE.
// Drain as many additional requests as available, up to MAX_BATCH_SIZE
// or MAX_BATCH_BYTES (matches Go: 128 requests or 4MB, whichever comes first).
let mut batch = Vec ::with_capacity ( MAX_BATCH_SIZE ) ;
let mut batch = Vec ::with_capacity ( MAX_BATCH_SIZE ) ;
let mut batch_bytes : usize = first . needle . data . len ( ) ;
batch . push ( first ) ;
batch . push ( first ) ;
while batch . len ( ) < MAX_BATCH_SIZE {
while batch . len ( ) < MAX_BATCH_SIZE & & batch_bytes < MAX_BATCH_BYTES {
match self . rx . try_recv ( ) {
match self . rx . try_recv ( ) {
Ok ( req ) = > batch . push ( req ) ,
Ok ( req ) = > {
batch_bytes + = req . needle . data . len ( ) ;
batch . push ( req ) ;
}
Err ( _ ) = > break ,
Err ( _ ) = > break ,
}
}
}
}