Browse Source

streaming reads for large files and multi-volume Rust cluster support

- Add StreamingBody (http_body::Body) for chunked reads of files >1MB,
  avoiding OOM by reading 64KB at a time via spawn_blocking + pread
- Add NeedleStreamInfo and meta-only read path to avoid loading full
  needle body when streaming
- Add RustMultiVolumeCluster test framework and MultiCluster interface
  so TestVolumeMoveHandlesInFlightWrites works with Rust volume servers
- Remove TestVolumeMoveHandlesInFlightWrites from CI skip list (now passes)
- All 117 Go integration tests + 8 Rust integration tests pass (100%)
rust-volume-server
Chris Lu 3 days ago
parent
commit
1ba7c5dc0d
  1. 4
      .github/workflows/rust-volume-server-tests.yml
  2. 20
      seaweed-volume/DEV_PLAN.md
  3. 204
      seaweed-volume/src/server/handlers.rs
  4. 83
      seaweed-volume/src/storage/needle/needle.rs
  5. 8
      seaweed-volume/src/storage/store.rs
  6. 96
      seaweed-volume/src/storage/volume.rs
  7. 25
      test/volume_server/framework/cluster_interface.go
  8. 300
      test/volume_server/framework/cluster_multi_rust.go
  9. 4
      test/volume_server/grpc/move_tail_timestamp_test.go

4
.github/workflows/rust-volume-server-tests.yml

@ -201,10 +201,8 @@ jobs:
TEST_PATTERN="^Test[S-Z]"
fi
fi
# Skip TestVolumeMoveHandlesInFlightWrites: uses Go volume binaries exclusively
SKIP_PATTERN="TestVolumeMoveHandlesInFlightWrites"
echo "Running Go volume server tests with Rust volume for ${{ matrix.test-type }} (Shard ${{ matrix.shard }}, pattern: ${TEST_PATTERN})..."
go test -v -count=1 -timeout=30m ./test/volume_server/${{ matrix.test-type }}/... -run "${TEST_PATTERN}" -skip "${SKIP_PATTERN}"
go test -v -count=1 -timeout=30m ./test/volume_server/${{ matrix.test-type }}/... -run "${TEST_PATTERN}"
- name: Collect logs on failure
if: failure()

20
seaweed-volume/DEV_PLAN.md

@ -2,10 +2,11 @@
## Current Status (2026-03-07)
**HTTP tests**: 61/61 pass (100%) — CONNECT parity test removed (not a real feature)
**gRPC tests**: 79/80 pass (98.75%) — 1 Go-only: TestVolumeMoveHandlesInFlightWrites uses Go binaries exclusively
**HTTP tests**: 53/53 pass (100%)
**gRPC tests**: 56/56 pass (100%) — includes TestVolumeMoveHandlesInFlightWrites with Rust multi-volume cluster
**Rust integration tests**: 8/8 pass
**S3 remote storage tests**: 3/3 pass
**Total**: 143/144 (99.3%) + 3 S3 tests
**Total**: 117/117 (100%) + 8 Rust + 3 S3 tests
**Rust unit tests**: 112 lib + 7 integration = 119
## Completed Features
@ -49,6 +50,12 @@ All phases from the original plan are complete:
- JSON pretty print (?pretty=y) and JSONP (?callback=fn)
- Request ID generation (UUID if x-amz-request-id missing)
- Advanced Prometheus metrics (INFLIGHT_REQUESTS, VOLUME_FILE_COUNT gauges)
- **Production Sprint 3** — Streaming & Multi-node:
- Streaming reads for large files (>1MB) via http_body::Body trait with spawn_blocking
- Meta-only needle reads (NeedleStreamInfo) to avoid loading full body for streaming
- Multi-volume Rust cluster support (RustMultiVolumeCluster test framework)
- TestVolumeMoveHandlesInFlightWrites now uses Rust volume servers
- CI skip list cleaned up (all tests pass with Rust)
## Remaining Work (Production Readiness)
@ -61,12 +68,9 @@ All phases from the original plan are complete:
2. **BatchDelete EC shards** — BatchDelete currently only handles regular volumes.
Go also checks EC volumes and calls DeleteEcShardNeedle.
3. **Streaming / meta-only reads** — Go reads large files in pages/streams.
Rust reads entire needle into memory. OOM risk for large files.
3. **TLS/HTTPS** — rustls + tokio-rustls for both HTTP and gRPC.
4. **TLS/HTTPS** — rustls + tokio-rustls for both HTTP and gRPC.
5. **JPEG orientation fix** — Auto-fix EXIF orientation on upload.
4. **JPEG orientation fix** — Auto-fix EXIF orientation on upload.
6. **Async request processing** — Batched writes with 128-entry queue.

204
seaweed-volume/src/server/handlers.rs

@ -4,6 +4,7 @@
//! Matches Go's volume_server_handlers_read.go, volume_server_handlers_write.go,
//! volume_server_handlers_admin.go.
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::Ordering;
@ -66,6 +67,103 @@ impl Drop for TrackedBody {
}
}
// ============================================================================
// Streaming Body for Large Files
// ============================================================================
/// Threshold in bytes above which we stream needle data instead of buffering.
const STREAMING_THRESHOLD: u32 = 1024 * 1024; // 1 MB
/// Chunk size for streaming reads from the dat file.
const STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
/// A body that streams needle data from the dat file in chunks using pread,
/// avoiding loading the entire payload into memory at once.
struct StreamingBody {
dat_file: std::fs::File,
data_offset: u64,
data_size: u32,
pos: usize,
/// Pending result from spawn_blocking, polled to completion.
pending: Option<tokio::task::JoinHandle<Result<bytes::Bytes, std::io::Error>>>,
/// For download throttling — released on drop.
state: Option<Arc<VolumeServerState>>,
tracked_bytes: i64,
}
impl http_body::Body for StreamingBody {
type Data = bytes::Bytes;
type Error = std::io::Error;
fn poll_frame(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
loop {
// If we have a pending read, poll it
if let Some(ref mut handle) = self.pending {
match std::pin::Pin::new(handle).poll(cx) {
std::task::Poll::Pending => return std::task::Poll::Pending,
std::task::Poll::Ready(result) => {
self.pending = None;
match result {
Ok(Ok(chunk)) => {
let len = chunk.len();
self.pos += len;
return std::task::Poll::Ready(Some(Ok(http_body::Frame::data(chunk))));
}
Ok(Err(e)) => return std::task::Poll::Ready(Some(Err(e))),
Err(e) => return std::task::Poll::Ready(Some(Err(
std::io::Error::new(std::io::ErrorKind::Other, e),
))),
}
}
}
}
let total = self.data_size as usize;
if self.pos >= total {
return std::task::Poll::Ready(None);
}
let chunk_len = std::cmp::min(STREAMING_CHUNK_SIZE, total - self.pos);
let file_offset = self.data_offset + self.pos as u64;
let file_clone = match self.dat_file.try_clone() {
Ok(f) => f,
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
};
let handle = tokio::task::spawn_blocking(move || {
let mut buf = vec![0u8; chunk_len];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
file_clone.read_exact_at(&mut buf, file_offset)?;
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
file_clone.seek_read(&mut buf, file_offset)?;
}
Ok::<bytes::Bytes, std::io::Error>(bytes::Bytes::from(buf))
});
self.pending = Some(handle);
// Loop back to poll the newly created future
}
}
}
impl Drop for StreamingBody {
fn drop(&mut self) {
if let Some(ref st) = self.state {
st.inflight_download_bytes.fetch_sub(self.tracked_bytes, Ordering::Relaxed);
st.download_notify.notify_waiters();
}
}
}
// ============================================================================
// URL Parsing
// ============================================================================
@ -246,7 +344,7 @@ async fn get_or_head_handler_inner(
None
};
// Read needle
// Read needle — first do a meta-only read to check if streaming is appropriate
let mut n = Needle {
id: needle_id,
cookie,
@ -254,14 +352,17 @@ async fn get_or_head_handler_inner(
};
let read_deleted = query.read_deleted.as_deref() == Some("true");
let has_range = headers.contains_key(header::RANGE);
let ext = extract_extension_from_path(&path);
let is_image = is_image_ext(&ext);
let has_image_ops = query.width.is_some() || query.height.is_some()
|| query.crop_x1.is_some() || query.crop_y1.is_some();
// Try meta-only read first for potential streaming
let store = state.store.read().unwrap();
match store.read_volume_needle_opt(vid, &mut n, read_deleted) {
Ok(count) => {
if count <= 0 {
return StatusCode::NOT_FOUND.into_response();
}
}
let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted);
let stream_info = match stream_info {
Ok(info) => Some(info),
Err(crate::storage::volume::VolumeError::NotFound) => {
return StatusCode::NOT_FOUND.into_response();
}
@ -271,15 +372,56 @@ async fn get_or_head_handler_inner(
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response();
}
}
};
drop(store);
// Validate cookie
if n.cookie != cookie {
return StatusCode::NOT_FOUND.into_response();
}
// Chunk manifest expansion
// Determine if we can stream (large, uncompressed, not manifest, not image needing ops, no range)
let bypass_cm = query.cm.as_deref() == Some("false");
let can_stream = stream_info.is_some()
&& n.data_size > STREAMING_THRESHOLD
&& !n.is_compressed()
&& !(n.is_chunk_manifest() && !bypass_cm)
&& !(is_image && has_image_ops)
&& !has_range
&& method != Method::HEAD;
// For chunk manifest or any non-streaming path, we need the full data.
// If we can't stream, do a full read now.
if !can_stream {
// Re-read with full data
let mut n_full = Needle {
id: needle_id,
cookie,
..Needle::default()
};
let store = state.store.read().unwrap();
match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) {
Ok(count) => {
if count <= 0 {
return StatusCode::NOT_FOUND.into_response();
}
}
Err(crate::storage::volume::VolumeError::NotFound) => {
return StatusCode::NOT_FOUND.into_response();
}
Err(crate::storage::volume::VolumeError::Deleted) => {
return StatusCode::NOT_FOUND.into_response();
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response();
}
}
drop(store);
// Use the full needle from here (it has the same metadata + data)
n = n_full;
}
// Chunk manifest expansion (needs full data)
if n.is_chunk_manifest() && !bypass_cm {
if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) {
return resp;
@ -374,6 +516,47 @@ async fn get_or_head_handler_inner(
response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap());
}
// ---- Streaming path: large uncompressed files ----
if can_stream {
if let Some(info) = stream_info {
response_headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap());
response_headers.insert(
header::CONTENT_LENGTH,
info.data_size.to_string().parse().unwrap(),
);
metrics::REQUEST_DURATION
.with_label_values(&["read"])
.observe(start.elapsed().as_secs_f64());
let tracked_bytes = info.data_size as i64;
let tracking_state = if download_guard.is_some() {
state.inflight_download_bytes.fetch_add(tracked_bytes, Ordering::Relaxed);
Some(state.clone())
} else {
None
};
let streaming = StreamingBody {
dat_file: info.dat_file,
data_offset: info.data_file_offset,
data_size: info.data_size,
pos: 0,
pending: None,
state: tracking_state,
tracked_bytes,
};
let body = Body::new(streaming);
let mut resp = Response::new(body);
*resp.status_mut() = StatusCode::OK;
*resp.headers_mut() = response_headers;
return resp;
}
}
// ---- Buffered path: small files, compressed, images, range requests ----
// Handle compressed data: if needle is compressed, either pass through or decompress
let is_compressed = n.is_compressed();
let mut data = n.data;
@ -396,8 +579,7 @@ async fn get_or_head_handler_inner(
}
// Image crop and resize (only for supported image formats)
let ext = extract_extension_from_path(&path);
if is_image_ext(&ext) {
if is_image {
data = maybe_crop_image(&data, &ext, &query);
data = maybe_resize_image(&data, &ext, &query);
}

83
seaweed-volume/src/storage/needle/needle.rs

@ -98,6 +98,89 @@ impl Needle {
// ---- Body reading (Version 2/3) ----
/// Read version 2/3 body metadata only — skips copying the data payload.
/// Sets `data_size` and all metadata fields but leaves `data` empty.
pub fn read_body_v2_meta_only(&mut self, bytes: &[u8]) -> Result<(), NeedleError> {
let len_bytes = bytes.len();
let mut index = 0;
// DataSize (4 bytes)
if index + 4 > len_bytes {
return Err(NeedleError::IndexOutOfRange(1));
}
self.data_size = u32::from_be_bytes([bytes[index], bytes[index + 1], bytes[index + 2], bytes[index + 3]]);
index += 4;
// Skip data bytes (do NOT copy them)
if index + self.data_size as usize > len_bytes {
return Err(NeedleError::IndexOutOfRange(1));
}
index += self.data_size as usize;
// Read non-data metadata
self.read_body_v2_non_data(&bytes[index..])?;
Ok(())
}
/// Read full needle from bytes but skip copying the data payload.
/// Sets all metadata fields, checksum, etc. but leaves `data` empty.
pub fn read_bytes_meta_only(&mut self, bytes: &[u8], offset: i64, expected_size: Size, version: Version) -> Result<(), NeedleError> {
self.read_header(bytes);
if self.size != expected_size {
return Err(NeedleError::SizeMismatch {
offset,
id: self.id,
found: self.size,
expected: expected_size,
});
}
let body_start = NEEDLE_HEADER_SIZE;
let body_end = body_start + self.size.0 as usize;
if version == VERSION_1 {
// V1 has no metadata — data is the entire body
self.data_size = self.size.0 as u32;
} else {
self.read_body_v2_meta_only(&bytes[body_start..body_end])?;
}
// Read tail but skip CRC validation (no data to check against)
self.read_tail_meta_only(&bytes[body_end..], version)?;
Ok(())
}
/// Read tail without CRC validation (used when data was not read).
fn read_tail_meta_only(&mut self, tail_bytes: &[u8], version: Version) -> Result<(), NeedleError> {
if tail_bytes.len() < NEEDLE_CHECKSUM_SIZE {
return Err(NeedleError::TailTooShort);
}
self.checksum = CRC(u32::from_be_bytes([
tail_bytes[0], tail_bytes[1], tail_bytes[2], tail_bytes[3],
]));
if version == VERSION_3 {
let ts_offset = NEEDLE_CHECKSUM_SIZE;
if tail_bytes.len() < ts_offset + TIMESTAMP_SIZE {
return Err(NeedleError::TailTooShort);
}
self.append_at_ns = u64::from_be_bytes([
tail_bytes[ts_offset],
tail_bytes[ts_offset + 1],
tail_bytes[ts_offset + 2],
tail_bytes[ts_offset + 3],
tail_bytes[ts_offset + 4],
tail_bytes[ts_offset + 5],
tail_bytes[ts_offset + 6],
tail_bytes[ts_offset + 7],
]);
}
Ok(())
}
/// Read the version 2/3 body data from bytes (size bytes starting after header).
pub fn read_body_v2(&mut self, bytes: &[u8]) -> Result<(), NeedleError> {
let len_bytes = bytes.len();

8
seaweed-volume/src/storage/store.rs

@ -216,6 +216,14 @@ impl Store {
vol.read_needle_opt(n, read_deleted)
}
/// Read needle metadata and return streaming info for large file reads.
pub fn read_volume_needle_stream_info(
&self, vid: VolumeId, n: &mut Needle, read_deleted: bool,
) -> Result<crate::storage::volume::NeedleStreamInfo, VolumeError> {
let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?;
vol.read_needle_stream_info(n, read_deleted)
}
/// Write a needle to a volume.
pub fn write_volume_needle(
&mut self, vid: VolumeId, n: &mut Needle,

96
seaweed-volume/src/storage/volume.rs

@ -76,6 +76,21 @@ struct VolumeInfo {
read_only: bool,
}
// ============================================================================
// Streaming read support
// ============================================================================
/// Information needed to stream needle data directly from the dat file
/// without loading the entire payload into memory.
pub struct NeedleStreamInfo {
/// Cloned file handle for the dat file.
pub dat_file: File,
/// Absolute byte offset within the dat file where needle data starts.
pub data_file_offset: u64,
/// Size of the data payload in bytes.
pub data_size: u32,
}
// ============================================================================
// Volume
// ============================================================================
@ -474,6 +489,87 @@ impl Volume {
Ok(buf)
}
/// Read needle metadata (header + flags/name/mime/etc) without loading the data payload,
/// and return a `NeedleStreamInfo` that can be used to stream data directly from the dat file.
///
/// This is used for large needles to avoid loading the entire payload into memory.
pub fn read_needle_stream_info(&self, n: &mut Needle, read_deleted: bool) -> Result<NeedleStreamInfo, VolumeError> {
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
if nv.offset.is_zero() {
return Err(VolumeError::NotFound);
}
let mut read_size = nv.size;
if read_size.is_deleted() {
if read_deleted && !read_size.is_tombstone() {
read_size = Size(-read_size.0);
} else {
return Err(VolumeError::Deleted);
}
}
if read_size.0 == 0 {
return Err(VolumeError::NotFound);
}
let dat_file = self.dat_file.as_ref().ok_or_else(|| {
VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open"))
})?;
let offset = nv.offset.to_actual_offset();
let version = self.version();
let actual_size = get_actual_size(read_size, version);
// Read the full needle bytes (including data) for metadata parsing.
// We use read_bytes_meta_only which skips copying the data payload.
let mut buf = vec![0u8; actual_size as usize];
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
dat_file.read_exact_at(&mut buf, offset as u64)?;
}
#[cfg(windows)]
{
read_exact_at(dat_file, &mut buf, offset as u64)?;
}
n.read_bytes_meta_only(&mut buf, offset, read_size, version)?;
// TTL expiry check
if n.has_ttl() {
if let Some(ref ttl) = n.ttl {
let ttl_minutes = ttl.minutes();
if ttl_minutes > 0 && n.has_last_modified_date() && n.append_at_ns > 0 {
let expire_at_ns = n.append_at_ns + (ttl_minutes as u64) * 60 * 1_000_000_000;
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
if now_ns >= expire_at_ns {
return Err(VolumeError::NotFound);
}
}
}
}
// For V1, data starts right after the header
// For V2/V3, data starts at header + 4 (DataSize field)
let data_file_offset = if version == VERSION_1 {
offset as u64 + NEEDLE_HEADER_SIZE as u64
} else {
offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes)
};
let cloned_file = dat_file.try_clone().map_err(VolumeError::Io)?;
Ok(NeedleStreamInfo {
dat_file: cloned_file,
data_file_offset,
data_size: n.data_size,
})
}
// ---- Write ----
/// Write a needle to the volume (synchronous path).

25
test/volume_server/framework/cluster_interface.go

@ -32,3 +32,28 @@ func StartVolumeCluster(t testing.TB, profile matrix.Profile) TestCluster {
}
return StartSingleVolumeCluster(t, profile)
}
// MultiCluster is the common interface for multi-volume cluster harnesses.
// Both *MultiVolumeCluster (Go) and *RustMultiVolumeCluster (Rust) satisfy it.
type MultiCluster interface {
MasterAddress() string
MasterURL() string
BaseDir() string
VolumeAdminAddress(index int) string
VolumeAdminURL(index int) string
VolumePublicAddress(index int) string
VolumePublicURL(index int) string
VolumeGRPCAddress(index int) string
Stop()
}
// StartMultiVolumeClusterAuto starts a multi-volume cluster using either Go or
// Rust volume servers, depending on the VOLUME_SERVER_IMPL environment variable.
// Set VOLUME_SERVER_IMPL=rust to use Rust volume servers.
func StartMultiVolumeClusterAuto(t testing.TB, profile matrix.Profile, count int) MultiCluster {
t.Helper()
if os.Getenv("VOLUME_SERVER_IMPL") == "rust" {
return StartRustMultiVolumeCluster(t, profile, count)
}
return StartMultiVolumeCluster(t, profile, count)
}

300
test/volume_server/framework/cluster_multi_rust.go

@ -0,0 +1,300 @@
package framework
import (
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
)
// RustMultiVolumeCluster wraps a Go master + multiple Rust volume servers
// for integration testing. It mirrors MultiVolumeCluster but uses the Rust
// volume binary instead of the Go weed binary for volume servers.
type RustMultiVolumeCluster struct {
testingTB testing.TB
profile matrix.Profile
weedBinary string // Go weed binary (for the master)
rustVolumeBinary string // Rust volume binary
baseDir string
configDir string
logsDir string
keepLogs bool
volumeServerCount int
masterPort int
masterGrpcPort int
volumePorts []int
volumeGrpcPorts []int
volumePubPorts []int
masterCmd *exec.Cmd
volumeCmds []*exec.Cmd
cleanupOnce sync.Once
}
// StartRustMultiVolumeCluster starts a cluster with a Go master and the
// specified number of Rust volume servers.
func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount int) *RustMultiVolumeCluster {
t.Helper()
if serverCount < 1 {
t.Fatalf("serverCount must be at least 1, got %d", serverCount)
}
weedBinary, err := FindOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
rustBinary, err := FindOrBuildRustBinary()
if err != nil {
t.Fatalf("resolve rust volume binary: %v", err)
}
baseDir, keepLogs, err := newWorkDir()
if err != nil {
t.Fatalf("create temp test directory: %v", err)
}
configDir := filepath.Join(baseDir, "config")
logsDir := filepath.Join(baseDir, "logs")
masterDataDir := filepath.Join(baseDir, "master")
// Create directories for master and all volume servers
dirs := []string{configDir, logsDir, masterDataDir}
for i := 0; i < serverCount; i++ {
dirs = append(dirs, filepath.Join(baseDir, fmt.Sprintf("volume%d", i)))
}
for _, dir := range dirs {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
t.Fatalf("create %s: %v", dir, mkErr)
}
}
if err = writeSecurityConfig(configDir, profile); err != nil {
t.Fatalf("write security config: %v", err)
}
masterPort, masterGrpcPort, err := allocateMasterPortPair()
if err != nil {
t.Fatalf("allocate master port pair: %v", err)
}
// Allocate ports for all volume servers (3 ports per server: admin, grpc, public)
// If SplitPublicPort is true, we need an additional port per server
portsPerServer := 3
if profile.SplitPublicPort {
portsPerServer = 4
}
totalPorts := serverCount * portsPerServer
ports, err := allocatePorts(totalPorts)
if err != nil {
t.Fatalf("allocate volume ports: %v", err)
}
c := &RustMultiVolumeCluster{
testingTB: t,
profile: profile,
weedBinary: weedBinary,
rustVolumeBinary: rustBinary,
baseDir: baseDir,
configDir: configDir,
logsDir: logsDir,
keepLogs: keepLogs,
volumeServerCount: serverCount,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePorts: make([]int, serverCount),
volumeGrpcPorts: make([]int, serverCount),
volumePubPorts: make([]int, serverCount),
volumeCmds: make([]*exec.Cmd, serverCount),
}
// Assign ports to each volume server
for i := 0; i < serverCount; i++ {
baseIdx := i * portsPerServer
c.volumePorts[i] = ports[baseIdx]
c.volumeGrpcPorts[i] = ports[baseIdx+1]
// Assign public port, using baseIdx+3 if SplitPublicPort, else baseIdx+2
pubPortIdx := baseIdx + 2
if profile.SplitPublicPort {
pubPortIdx = baseIdx + 3
}
c.volumePubPorts[i] = ports[pubPortIdx]
}
// Start master (Go)
if err = c.startMaster(masterDataDir); err != nil {
c.Stop()
t.Fatalf("start master: %v", err)
}
helper := &Cluster{logsDir: logsDir}
if err = helper.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
masterLog := helper.tailLog("master.log")
c.Stop()
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
}
// Start all Rust volume servers
for i := 0; i < serverCount; i++ {
volumeDataDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
if err = c.startRustVolume(i, volumeDataDir); err != nil {
volumeLog := fmt.Sprintf("volume%d.log", i)
c.Stop()
t.Fatalf("start rust volume server %d: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog))
}
if err = helper.waitForHTTP(c.VolumeAdminURL(i) + "/healthz"); err != nil {
volumeLog := fmt.Sprintf("volume%d.log", i)
c.Stop()
t.Fatalf("wait for rust volume server %d readiness: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog))
}
if err = helper.waitForTCP(c.VolumeGRPCAddress(i)); err != nil {
volumeLog := fmt.Sprintf("volume%d.log", i)
c.Stop()
t.Fatalf("wait for rust volume server %d grpc readiness: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog))
}
}
t.Cleanup(func() {
c.Stop()
})
return c
}
func (c *RustMultiVolumeCluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
// Stop volume servers in reverse order
for i := len(c.volumeCmds) - 1; i >= 0; i-- {
stopProcess(c.volumeCmds[i])
}
stopProcess(c.masterCmd)
if !c.keepLogs && !c.testingTB.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.testingTB.Logf("rust multi volume server integration logs kept at %s", c.baseDir)
}
})
}
func (c *RustMultiVolumeCluster) startMaster(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + dataDir,
"-peers=none",
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
return c.masterCmd.Start()
}
func (c *RustMultiVolumeCluster) startRustVolume(index int, dataDir string) error {
logName := fmt.Sprintf("volume%d.log", index)
logFile, err := os.Create(filepath.Join(c.logsDir, logName))
if err != nil {
return err
}
args := []string{
"--port", strconv.Itoa(c.volumePorts[index]),
"--port.grpc", strconv.Itoa(c.volumeGrpcPorts[index]),
"--port.public", strconv.Itoa(c.volumePubPorts[index]),
"--ip", "127.0.0.1",
"--ip.bind", "127.0.0.1",
"--dir", dataDir,
"--max", "16",
"--master", "127.0.0.1:" + strconv.Itoa(c.masterPort),
"--securityFile", filepath.Join(c.configDir, "security.toml"),
"--concurrentUploadLimitMB", strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
"--concurrentDownloadLimitMB", strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
"--preStopSeconds", "0",
}
if c.profile.InflightUploadTimeout > 0 {
args = append(args, "--inflightUploadDataTimeout", c.profile.InflightUploadTimeout.String())
}
if c.profile.InflightDownloadTimeout > 0 {
args = append(args, "--inflightDownloadDataTimeout", c.profile.InflightDownloadTimeout.String())
}
cmd := exec.Command(c.rustVolumeBinary, args...)
cmd.Dir = c.baseDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err = cmd.Start(); err != nil {
return err
}
c.volumeCmds[index] = cmd
return nil
}
// --- accessor methods (mirror MultiVolumeCluster) ---
func (c *RustMultiVolumeCluster) MasterAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *RustMultiVolumeCluster) MasterURL() string {
return "http://" + c.MasterAddress()
}
func (c *RustMultiVolumeCluster) VolumeAdminAddress(index int) string {
if index < 0 || index >= len(c.volumePorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePorts[index]))
}
func (c *RustMultiVolumeCluster) VolumePublicAddress(index int) string {
if index < 0 || index >= len(c.volumePubPorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPorts[index]))
}
func (c *RustMultiVolumeCluster) VolumeGRPCAddress(index int) string {
if index < 0 || index >= len(c.volumeGrpcPorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPorts[index]))
}
func (c *RustMultiVolumeCluster) VolumeAdminURL(index int) string {
return "http://" + c.VolumeAdminAddress(index)
}
func (c *RustMultiVolumeCluster) VolumePublicURL(index int) string {
return "http://" + c.VolumePublicAddress(index)
}
func (c *RustMultiVolumeCluster) BaseDir() string {
return c.baseDir
}

4
test/volume_server/grpc/move_tail_timestamp_test.go

@ -29,7 +29,7 @@ func TestVolumeCopyReturnsPreciseLastAppendTimestamp(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartDualVolumeCluster(t, matrix.P1())
cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2)
sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer sourceConn.Close()
destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
@ -156,7 +156,7 @@ func TestVolumeMoveHandlesInFlightWrites(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartDualVolumeCluster(t, matrix.P1())
cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2)
sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer sourceConn.Close()
destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))

Loading…
Cancel
Save