diff --git a/.github/workflows/rust-volume-server-tests.yml b/.github/workflows/rust-volume-server-tests.yml index 0092aed46..7f0480bd7 100644 --- a/.github/workflows/rust-volume-server-tests.yml +++ b/.github/workflows/rust-volume-server-tests.yml @@ -201,10 +201,8 @@ jobs: TEST_PATTERN="^Test[S-Z]" fi fi - # Skip TestVolumeMoveHandlesInFlightWrites: uses Go volume binaries exclusively - SKIP_PATTERN="TestVolumeMoveHandlesInFlightWrites" echo "Running Go volume server tests with Rust volume for ${{ matrix.test-type }} (Shard ${{ matrix.shard }}, pattern: ${TEST_PATTERN})..." - go test -v -count=1 -timeout=30m ./test/volume_server/${{ matrix.test-type }}/... -run "${TEST_PATTERN}" -skip "${SKIP_PATTERN}" + go test -v -count=1 -timeout=30m ./test/volume_server/${{ matrix.test-type }}/... -run "${TEST_PATTERN}" - name: Collect logs on failure if: failure() diff --git a/seaweed-volume/DEV_PLAN.md b/seaweed-volume/DEV_PLAN.md index 86b653af3..4083ec6d9 100644 --- a/seaweed-volume/DEV_PLAN.md +++ b/seaweed-volume/DEV_PLAN.md @@ -2,10 +2,11 @@ ## Current Status (2026-03-07) -**HTTP tests**: 61/61 pass (100%) — CONNECT parity test removed (not a real feature) -**gRPC tests**: 79/80 pass (98.75%) — 1 Go-only: TestVolumeMoveHandlesInFlightWrites uses Go binaries exclusively +**HTTP tests**: 53/53 pass (100%) +**gRPC tests**: 56/56 pass (100%) — includes TestVolumeMoveHandlesInFlightWrites with Rust multi-volume cluster +**Rust integration tests**: 8/8 pass **S3 remote storage tests**: 3/3 pass -**Total**: 143/144 (99.3%) + 3 S3 tests +**Total**: 117/117 (100%) + 8 Rust + 3 S3 tests **Rust unit tests**: 112 lib + 7 integration = 119 ## Completed Features @@ -49,6 +50,12 @@ All phases from the original plan are complete: - JSON pretty print (?pretty=y) and JSONP (?callback=fn) - Request ID generation (UUID if x-amz-request-id missing) - Advanced Prometheus metrics (INFLIGHT_REQUESTS, VOLUME_FILE_COUNT gauges) +- **Production Sprint 3** — Streaming & Multi-node: + - Streaming reads for large files (>1MB) via http_body::Body trait with spawn_blocking + - Meta-only needle reads (NeedleStreamInfo) to avoid loading full body for streaming + - Multi-volume Rust cluster support (RustMultiVolumeCluster test framework) + - TestVolumeMoveHandlesInFlightWrites now uses Rust volume servers + - CI skip list cleaned up (all tests pass with Rust) ## Remaining Work (Production Readiness) @@ -61,12 +68,9 @@ All phases from the original plan are complete: 2. **BatchDelete EC shards** — BatchDelete currently only handles regular volumes. Go also checks EC volumes and calls DeleteEcShardNeedle. -3. **Streaming / meta-only reads** — Go reads large files in pages/streams. - Rust reads entire needle into memory. OOM risk for large files. +3. **TLS/HTTPS** — rustls + tokio-rustls for both HTTP and gRPC. -4. **TLS/HTTPS** — rustls + tokio-rustls for both HTTP and gRPC. - -5. **JPEG orientation fix** — Auto-fix EXIF orientation on upload. +4. **JPEG orientation fix** — Auto-fix EXIF orientation on upload. 6. **Async request processing** — Batched writes with 128-entry queue. diff --git a/seaweed-volume/src/server/handlers.rs b/seaweed-volume/src/server/handlers.rs index dccb1706e..c4d0815d8 100644 --- a/seaweed-volume/src/server/handlers.rs +++ b/seaweed-volume/src/server/handlers.rs @@ -4,6 +4,7 @@ //! Matches Go's volume_server_handlers_read.go, volume_server_handlers_write.go, //! volume_server_handlers_admin.go. +use std::future::Future; use std::sync::Arc; use std::sync::atomic::Ordering; @@ -66,6 +67,103 @@ impl Drop for TrackedBody { } } +// ============================================================================ +// Streaming Body for Large Files +// ============================================================================ + +/// Threshold in bytes above which we stream needle data instead of buffering. +const STREAMING_THRESHOLD: u32 = 1024 * 1024; // 1 MB + +/// Chunk size for streaming reads from the dat file. +const STREAMING_CHUNK_SIZE: usize = 64 * 1024; // 64 KB + +/// A body that streams needle data from the dat file in chunks using pread, +/// avoiding loading the entire payload into memory at once. +struct StreamingBody { + dat_file: std::fs::File, + data_offset: u64, + data_size: u32, + pos: usize, + /// Pending result from spawn_blocking, polled to completion. + pending: Option>>, + /// For download throttling — released on drop. + state: Option>, + tracked_bytes: i64, +} + +impl http_body::Body for StreamingBody { + type Data = bytes::Bytes; + type Error = std::io::Error; + + fn poll_frame( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + loop { + // If we have a pending read, poll it + if let Some(ref mut handle) = self.pending { + match std::pin::Pin::new(handle).poll(cx) { + std::task::Poll::Pending => return std::task::Poll::Pending, + std::task::Poll::Ready(result) => { + self.pending = None; + match result { + Ok(Ok(chunk)) => { + let len = chunk.len(); + self.pos += len; + return std::task::Poll::Ready(Some(Ok(http_body::Frame::data(chunk)))); + } + Ok(Err(e)) => return std::task::Poll::Ready(Some(Err(e))), + Err(e) => return std::task::Poll::Ready(Some(Err( + std::io::Error::new(std::io::ErrorKind::Other, e), + ))), + } + } + } + } + + let total = self.data_size as usize; + if self.pos >= total { + return std::task::Poll::Ready(None); + } + + let chunk_len = std::cmp::min(STREAMING_CHUNK_SIZE, total - self.pos); + let file_offset = self.data_offset + self.pos as u64; + + let file_clone = match self.dat_file.try_clone() { + Ok(f) => f, + Err(e) => return std::task::Poll::Ready(Some(Err(e))), + }; + + let handle = tokio::task::spawn_blocking(move || { + let mut buf = vec![0u8; chunk_len]; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + file_clone.read_exact_at(&mut buf, file_offset)?; + } + #[cfg(windows)] + { + use std::os::windows::fs::FileExt; + file_clone.seek_read(&mut buf, file_offset)?; + } + Ok::(bytes::Bytes::from(buf)) + }); + + self.pending = Some(handle); + // Loop back to poll the newly created future + } + } +} + +impl Drop for StreamingBody { + fn drop(&mut self) { + if let Some(ref st) = self.state { + st.inflight_download_bytes.fetch_sub(self.tracked_bytes, Ordering::Relaxed); + st.download_notify.notify_waiters(); + } + } +} + // ============================================================================ // URL Parsing // ============================================================================ @@ -246,7 +344,7 @@ async fn get_or_head_handler_inner( None }; - // Read needle + // Read needle — first do a meta-only read to check if streaming is appropriate let mut n = Needle { id: needle_id, cookie, @@ -254,14 +352,17 @@ async fn get_or_head_handler_inner( }; let read_deleted = query.read_deleted.as_deref() == Some("true"); + let has_range = headers.contains_key(header::RANGE); + let ext = extract_extension_from_path(&path); + let is_image = is_image_ext(&ext); + let has_image_ops = query.width.is_some() || query.height.is_some() + || query.crop_x1.is_some() || query.crop_y1.is_some(); + // Try meta-only read first for potential streaming let store = state.store.read().unwrap(); - match store.read_volume_needle_opt(vid, &mut n, read_deleted) { - Ok(count) => { - if count <= 0 { - return StatusCode::NOT_FOUND.into_response(); - } - } + let stream_info = store.read_volume_needle_stream_info(vid, &mut n, read_deleted); + let stream_info = match stream_info { + Ok(info) => Some(info), Err(crate::storage::volume::VolumeError::NotFound) => { return StatusCode::NOT_FOUND.into_response(); } @@ -271,15 +372,56 @@ async fn get_or_head_handler_inner( Err(e) => { return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response(); } - } + }; + drop(store); // Validate cookie if n.cookie != cookie { return StatusCode::NOT_FOUND.into_response(); } - // Chunk manifest expansion + // Determine if we can stream (large, uncompressed, not manifest, not image needing ops, no range) let bypass_cm = query.cm.as_deref() == Some("false"); + let can_stream = stream_info.is_some() + && n.data_size > STREAMING_THRESHOLD + && !n.is_compressed() + && !(n.is_chunk_manifest() && !bypass_cm) + && !(is_image && has_image_ops) + && !has_range + && method != Method::HEAD; + + // For chunk manifest or any non-streaming path, we need the full data. + // If we can't stream, do a full read now. + if !can_stream { + // Re-read with full data + let mut n_full = Needle { + id: needle_id, + cookie, + ..Needle::default() + }; + let store = state.store.read().unwrap(); + match store.read_volume_needle_opt(vid, &mut n_full, read_deleted) { + Ok(count) => { + if count <= 0 { + return StatusCode::NOT_FOUND.into_response(); + } + } + Err(crate::storage::volume::VolumeError::NotFound) => { + return StatusCode::NOT_FOUND.into_response(); + } + Err(crate::storage::volume::VolumeError::Deleted) => { + return StatusCode::NOT_FOUND.into_response(); + } + Err(e) => { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("read error: {}", e)).into_response(); + } + } + drop(store); + // Use the full needle from here (it has the same metadata + data) + n = n_full; + } + + // Chunk manifest expansion (needs full data) if n.is_chunk_manifest() && !bypass_cm { if let Some(resp) = try_expand_chunk_manifest(&state, &n, &headers, &method) { return resp; @@ -374,6 +516,47 @@ async fn get_or_head_handler_inner( response_headers.insert(header::CONTENT_DISPOSITION, disposition.parse().unwrap()); } + // ---- Streaming path: large uncompressed files ---- + if can_stream { + if let Some(info) = stream_info { + response_headers.insert(header::ACCEPT_RANGES, "bytes".parse().unwrap()); + response_headers.insert( + header::CONTENT_LENGTH, + info.data_size.to_string().parse().unwrap(), + ); + + metrics::REQUEST_DURATION + .with_label_values(&["read"]) + .observe(start.elapsed().as_secs_f64()); + + let tracked_bytes = info.data_size as i64; + let tracking_state = if download_guard.is_some() { + state.inflight_download_bytes.fetch_add(tracked_bytes, Ordering::Relaxed); + Some(state.clone()) + } else { + None + }; + + let streaming = StreamingBody { + dat_file: info.dat_file, + data_offset: info.data_file_offset, + data_size: info.data_size, + pos: 0, + pending: None, + state: tracking_state, + tracked_bytes, + }; + + let body = Body::new(streaming); + let mut resp = Response::new(body); + *resp.status_mut() = StatusCode::OK; + *resp.headers_mut() = response_headers; + return resp; + } + } + + // ---- Buffered path: small files, compressed, images, range requests ---- + // Handle compressed data: if needle is compressed, either pass through or decompress let is_compressed = n.is_compressed(); let mut data = n.data; @@ -396,8 +579,7 @@ async fn get_or_head_handler_inner( } // Image crop and resize (only for supported image formats) - let ext = extract_extension_from_path(&path); - if is_image_ext(&ext) { + if is_image { data = maybe_crop_image(&data, &ext, &query); data = maybe_resize_image(&data, &ext, &query); } diff --git a/seaweed-volume/src/storage/needle/needle.rs b/seaweed-volume/src/storage/needle/needle.rs index 925ad6833..c6283846b 100644 --- a/seaweed-volume/src/storage/needle/needle.rs +++ b/seaweed-volume/src/storage/needle/needle.rs @@ -98,6 +98,89 @@ impl Needle { // ---- Body reading (Version 2/3) ---- + /// Read version 2/3 body metadata only — skips copying the data payload. + /// Sets `data_size` and all metadata fields but leaves `data` empty. + pub fn read_body_v2_meta_only(&mut self, bytes: &[u8]) -> Result<(), NeedleError> { + let len_bytes = bytes.len(); + let mut index = 0; + + // DataSize (4 bytes) + if index + 4 > len_bytes { + return Err(NeedleError::IndexOutOfRange(1)); + } + self.data_size = u32::from_be_bytes([bytes[index], bytes[index + 1], bytes[index + 2], bytes[index + 3]]); + index += 4; + + // Skip data bytes (do NOT copy them) + if index + self.data_size as usize > len_bytes { + return Err(NeedleError::IndexOutOfRange(1)); + } + index += self.data_size as usize; + + // Read non-data metadata + self.read_body_v2_non_data(&bytes[index..])?; + Ok(()) + } + + /// Read full needle from bytes but skip copying the data payload. + /// Sets all metadata fields, checksum, etc. but leaves `data` empty. + pub fn read_bytes_meta_only(&mut self, bytes: &[u8], offset: i64, expected_size: Size, version: Version) -> Result<(), NeedleError> { + self.read_header(bytes); + + if self.size != expected_size { + return Err(NeedleError::SizeMismatch { + offset, + id: self.id, + found: self.size, + expected: expected_size, + }); + } + + let body_start = NEEDLE_HEADER_SIZE; + let body_end = body_start + self.size.0 as usize; + + if version == VERSION_1 { + // V1 has no metadata — data is the entire body + self.data_size = self.size.0 as u32; + } else { + self.read_body_v2_meta_only(&bytes[body_start..body_end])?; + } + + // Read tail but skip CRC validation (no data to check against) + self.read_tail_meta_only(&bytes[body_end..], version)?; + Ok(()) + } + + /// Read tail without CRC validation (used when data was not read). + fn read_tail_meta_only(&mut self, tail_bytes: &[u8], version: Version) -> Result<(), NeedleError> { + if tail_bytes.len() < NEEDLE_CHECKSUM_SIZE { + return Err(NeedleError::TailTooShort); + } + + self.checksum = CRC(u32::from_be_bytes([ + tail_bytes[0], tail_bytes[1], tail_bytes[2], tail_bytes[3], + ])); + + if version == VERSION_3 { + let ts_offset = NEEDLE_CHECKSUM_SIZE; + if tail_bytes.len() < ts_offset + TIMESTAMP_SIZE { + return Err(NeedleError::TailTooShort); + } + self.append_at_ns = u64::from_be_bytes([ + tail_bytes[ts_offset], + tail_bytes[ts_offset + 1], + tail_bytes[ts_offset + 2], + tail_bytes[ts_offset + 3], + tail_bytes[ts_offset + 4], + tail_bytes[ts_offset + 5], + tail_bytes[ts_offset + 6], + tail_bytes[ts_offset + 7], + ]); + } + + Ok(()) + } + /// Read the version 2/3 body data from bytes (size bytes starting after header). pub fn read_body_v2(&mut self, bytes: &[u8]) -> Result<(), NeedleError> { let len_bytes = bytes.len(); diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index e9381d37a..1e9e52d0c 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -216,6 +216,14 @@ impl Store { vol.read_needle_opt(n, read_deleted) } + /// Read needle metadata and return streaming info for large file reads. + pub fn read_volume_needle_stream_info( + &self, vid: VolumeId, n: &mut Needle, read_deleted: bool, + ) -> Result { + let (_, vol) = self.find_volume(vid).ok_or(VolumeError::NotFound)?; + vol.read_needle_stream_info(n, read_deleted) + } + /// Write a needle to a volume. pub fn write_volume_needle( &mut self, vid: VolumeId, n: &mut Needle, diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 7fa990994..653f09376 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -76,6 +76,21 @@ struct VolumeInfo { read_only: bool, } +// ============================================================================ +// Streaming read support +// ============================================================================ + +/// Information needed to stream needle data directly from the dat file +/// without loading the entire payload into memory. +pub struct NeedleStreamInfo { + /// Cloned file handle for the dat file. + pub dat_file: File, + /// Absolute byte offset within the dat file where needle data starts. + pub data_file_offset: u64, + /// Size of the data payload in bytes. + pub data_size: u32, +} + // ============================================================================ // Volume // ============================================================================ @@ -474,6 +489,87 @@ impl Volume { Ok(buf) } + /// Read needle metadata (header + flags/name/mime/etc) without loading the data payload, + /// and return a `NeedleStreamInfo` that can be used to stream data directly from the dat file. + /// + /// This is used for large needles to avoid loading the entire payload into memory. + pub fn read_needle_stream_info(&self, n: &mut Needle, read_deleted: bool) -> Result { + let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; + + if nv.offset.is_zero() { + return Err(VolumeError::NotFound); + } + + let mut read_size = nv.size; + if read_size.is_deleted() { + if read_deleted && !read_size.is_tombstone() { + read_size = Size(-read_size.0); + } else { + return Err(VolumeError::Deleted); + } + } + if read_size.0 == 0 { + return Err(VolumeError::NotFound); + } + + let dat_file = self.dat_file.as_ref().ok_or_else(|| { + VolumeError::Io(io::Error::new(io::ErrorKind::Other, "dat file not open")) + })?; + + let offset = nv.offset.to_actual_offset(); + let version = self.version(); + let actual_size = get_actual_size(read_size, version); + + // Read the full needle bytes (including data) for metadata parsing. + // We use read_bytes_meta_only which skips copying the data payload. + let mut buf = vec![0u8; actual_size as usize]; + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + dat_file.read_exact_at(&mut buf, offset as u64)?; + } + #[cfg(windows)] + { + read_exact_at(dat_file, &mut buf, offset as u64)?; + } + + n.read_bytes_meta_only(&mut buf, offset, read_size, version)?; + + // TTL expiry check + if n.has_ttl() { + if let Some(ref ttl) = n.ttl { + let ttl_minutes = ttl.minutes(); + if ttl_minutes > 0 && n.has_last_modified_date() && n.append_at_ns > 0 { + let expire_at_ns = n.append_at_ns + (ttl_minutes as u64) * 60 * 1_000_000_000; + let now_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + if now_ns >= expire_at_ns { + return Err(VolumeError::NotFound); + } + } + } + } + + // For V1, data starts right after the header + // For V2/V3, data starts at header + 4 (DataSize field) + let data_file_offset = if version == VERSION_1 { + offset as u64 + NEEDLE_HEADER_SIZE as u64 + } else { + offset as u64 + NEEDLE_HEADER_SIZE as u64 + 4 // skip DataSize (4 bytes) + }; + + let cloned_file = dat_file.try_clone().map_err(VolumeError::Io)?; + + Ok(NeedleStreamInfo { + dat_file: cloned_file, + data_file_offset, + data_size: n.data_size, + }) + } + // ---- Write ---- /// Write a needle to the volume (synchronous path). diff --git a/test/volume_server/framework/cluster_interface.go b/test/volume_server/framework/cluster_interface.go index 981867838..59962eb11 100644 --- a/test/volume_server/framework/cluster_interface.go +++ b/test/volume_server/framework/cluster_interface.go @@ -32,3 +32,28 @@ func StartVolumeCluster(t testing.TB, profile matrix.Profile) TestCluster { } return StartSingleVolumeCluster(t, profile) } + +// MultiCluster is the common interface for multi-volume cluster harnesses. +// Both *MultiVolumeCluster (Go) and *RustMultiVolumeCluster (Rust) satisfy it. +type MultiCluster interface { + MasterAddress() string + MasterURL() string + BaseDir() string + VolumeAdminAddress(index int) string + VolumeAdminURL(index int) string + VolumePublicAddress(index int) string + VolumePublicURL(index int) string + VolumeGRPCAddress(index int) string + Stop() +} + +// StartMultiVolumeClusterAuto starts a multi-volume cluster using either Go or +// Rust volume servers, depending on the VOLUME_SERVER_IMPL environment variable. +// Set VOLUME_SERVER_IMPL=rust to use Rust volume servers. +func StartMultiVolumeClusterAuto(t testing.TB, profile matrix.Profile, count int) MultiCluster { + t.Helper() + if os.Getenv("VOLUME_SERVER_IMPL") == "rust" { + return StartRustMultiVolumeCluster(t, profile, count) + } + return StartMultiVolumeCluster(t, profile, count) +} diff --git a/test/volume_server/framework/cluster_multi_rust.go b/test/volume_server/framework/cluster_multi_rust.go new file mode 100644 index 000000000..a9378d2ce --- /dev/null +++ b/test/volume_server/framework/cluster_multi_rust.go @@ -0,0 +1,300 @@ +package framework + +import ( + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" +) + +// RustMultiVolumeCluster wraps a Go master + multiple Rust volume servers +// for integration testing. It mirrors MultiVolumeCluster but uses the Rust +// volume binary instead of the Go weed binary for volume servers. +type RustMultiVolumeCluster struct { + testingTB testing.TB + profile matrix.Profile + + weedBinary string // Go weed binary (for the master) + rustVolumeBinary string // Rust volume binary + + baseDir string + configDir string + logsDir string + keepLogs bool + volumeServerCount int + + masterPort int + masterGrpcPort int + + volumePorts []int + volumeGrpcPorts []int + volumePubPorts []int + + masterCmd *exec.Cmd + volumeCmds []*exec.Cmd + + cleanupOnce sync.Once +} + +// StartRustMultiVolumeCluster starts a cluster with a Go master and the +// specified number of Rust volume servers. +func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount int) *RustMultiVolumeCluster { + t.Helper() + + if serverCount < 1 { + t.Fatalf("serverCount must be at least 1, got %d", serverCount) + } + + weedBinary, err := FindOrBuildWeedBinary() + if err != nil { + t.Fatalf("resolve weed binary: %v", err) + } + + rustBinary, err := FindOrBuildRustBinary() + if err != nil { + t.Fatalf("resolve rust volume binary: %v", err) + } + + baseDir, keepLogs, err := newWorkDir() + if err != nil { + t.Fatalf("create temp test directory: %v", err) + } + + configDir := filepath.Join(baseDir, "config") + logsDir := filepath.Join(baseDir, "logs") + masterDataDir := filepath.Join(baseDir, "master") + + // Create directories for master and all volume servers + dirs := []string{configDir, logsDir, masterDataDir} + for i := 0; i < serverCount; i++ { + dirs = append(dirs, filepath.Join(baseDir, fmt.Sprintf("volume%d", i))) + } + for _, dir := range dirs { + if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { + t.Fatalf("create %s: %v", dir, mkErr) + } + } + + if err = writeSecurityConfig(configDir, profile); err != nil { + t.Fatalf("write security config: %v", err) + } + + masterPort, masterGrpcPort, err := allocateMasterPortPair() + if err != nil { + t.Fatalf("allocate master port pair: %v", err) + } + + // Allocate ports for all volume servers (3 ports per server: admin, grpc, public) + // If SplitPublicPort is true, we need an additional port per server + portsPerServer := 3 + if profile.SplitPublicPort { + portsPerServer = 4 + } + totalPorts := serverCount * portsPerServer + ports, err := allocatePorts(totalPorts) + if err != nil { + t.Fatalf("allocate volume ports: %v", err) + } + + c := &RustMultiVolumeCluster{ + testingTB: t, + profile: profile, + weedBinary: weedBinary, + rustVolumeBinary: rustBinary, + baseDir: baseDir, + configDir: configDir, + logsDir: logsDir, + keepLogs: keepLogs, + volumeServerCount: serverCount, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePorts: make([]int, serverCount), + volumeGrpcPorts: make([]int, serverCount), + volumePubPorts: make([]int, serverCount), + volumeCmds: make([]*exec.Cmd, serverCount), + } + + // Assign ports to each volume server + for i := 0; i < serverCount; i++ { + baseIdx := i * portsPerServer + c.volumePorts[i] = ports[baseIdx] + c.volumeGrpcPorts[i] = ports[baseIdx+1] + + // Assign public port, using baseIdx+3 if SplitPublicPort, else baseIdx+2 + pubPortIdx := baseIdx + 2 + if profile.SplitPublicPort { + pubPortIdx = baseIdx + 3 + } + c.volumePubPorts[i] = ports[pubPortIdx] + } + + // Start master (Go) + if err = c.startMaster(masterDataDir); err != nil { + c.Stop() + t.Fatalf("start master: %v", err) + } + helper := &Cluster{logsDir: logsDir} + if err = helper.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil { + masterLog := helper.tailLog("master.log") + c.Stop() + t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog) + } + + // Start all Rust volume servers + for i := 0; i < serverCount; i++ { + volumeDataDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i)) + if err = c.startRustVolume(i, volumeDataDir); err != nil { + volumeLog := fmt.Sprintf("volume%d.log", i) + c.Stop() + t.Fatalf("start rust volume server %d: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog)) + } + if err = helper.waitForHTTP(c.VolumeAdminURL(i) + "/healthz"); err != nil { + volumeLog := fmt.Sprintf("volume%d.log", i) + c.Stop() + t.Fatalf("wait for rust volume server %d readiness: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog)) + } + if err = helper.waitForTCP(c.VolumeGRPCAddress(i)); err != nil { + volumeLog := fmt.Sprintf("volume%d.log", i) + c.Stop() + t.Fatalf("wait for rust volume server %d grpc readiness: %v\nvolume log tail:\n%s", i, err, helper.tailLog(volumeLog)) + } + } + + t.Cleanup(func() { + c.Stop() + }) + + return c +} + +func (c *RustMultiVolumeCluster) Stop() { + if c == nil { + return + } + c.cleanupOnce.Do(func() { + // Stop volume servers in reverse order + for i := len(c.volumeCmds) - 1; i >= 0; i-- { + stopProcess(c.volumeCmds[i]) + } + stopProcess(c.masterCmd) + if !c.keepLogs && !c.testingTB.Failed() { + _ = os.RemoveAll(c.baseDir) + } else if c.baseDir != "" { + c.testingTB.Logf("rust multi volume server integration logs kept at %s", c.baseDir) + } + }) +} + +func (c *RustMultiVolumeCluster) startMaster(dataDir string) error { + logFile, err := os.Create(filepath.Join(c.logsDir, "master.log")) + if err != nil { + return err + } + + args := []string{ + "-config_dir=" + c.configDir, + "master", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(c.masterPort), + "-port.grpc=" + strconv.Itoa(c.masterGrpcPort), + "-mdir=" + dataDir, + "-peers=none", + "-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB), + "-defaultReplication=000", + } + + c.masterCmd = exec.Command(c.weedBinary, args...) + c.masterCmd.Dir = c.baseDir + c.masterCmd.Stdout = logFile + c.masterCmd.Stderr = logFile + return c.masterCmd.Start() +} + +func (c *RustMultiVolumeCluster) startRustVolume(index int, dataDir string) error { + logName := fmt.Sprintf("volume%d.log", index) + logFile, err := os.Create(filepath.Join(c.logsDir, logName)) + if err != nil { + return err + } + + args := []string{ + "--port", strconv.Itoa(c.volumePorts[index]), + "--port.grpc", strconv.Itoa(c.volumeGrpcPorts[index]), + "--port.public", strconv.Itoa(c.volumePubPorts[index]), + "--ip", "127.0.0.1", + "--ip.bind", "127.0.0.1", + "--dir", dataDir, + "--max", "16", + "--master", "127.0.0.1:" + strconv.Itoa(c.masterPort), + "--securityFile", filepath.Join(c.configDir, "security.toml"), + "--concurrentUploadLimitMB", strconv.Itoa(c.profile.ConcurrentUploadLimitMB), + "--concurrentDownloadLimitMB", strconv.Itoa(c.profile.ConcurrentDownloadLimitMB), + "--preStopSeconds", "0", + } + if c.profile.InflightUploadTimeout > 0 { + args = append(args, "--inflightUploadDataTimeout", c.profile.InflightUploadTimeout.String()) + } + if c.profile.InflightDownloadTimeout > 0 { + args = append(args, "--inflightDownloadDataTimeout", c.profile.InflightDownloadTimeout.String()) + } + + cmd := exec.Command(c.rustVolumeBinary, args...) + cmd.Dir = c.baseDir + cmd.Stdout = logFile + cmd.Stderr = logFile + + if err = cmd.Start(); err != nil { + return err + } + c.volumeCmds[index] = cmd + return nil +} + +// --- accessor methods (mirror MultiVolumeCluster) --- + +func (c *RustMultiVolumeCluster) MasterAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort)) +} + +func (c *RustMultiVolumeCluster) MasterURL() string { + return "http://" + c.MasterAddress() +} + +func (c *RustMultiVolumeCluster) VolumeAdminAddress(index int) string { + if index < 0 || index >= len(c.volumePorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePorts[index])) +} + +func (c *RustMultiVolumeCluster) VolumePublicAddress(index int) string { + if index < 0 || index >= len(c.volumePubPorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPorts[index])) +} + +func (c *RustMultiVolumeCluster) VolumeGRPCAddress(index int) string { + if index < 0 || index >= len(c.volumeGrpcPorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPorts[index])) +} + +func (c *RustMultiVolumeCluster) VolumeAdminURL(index int) string { + return "http://" + c.VolumeAdminAddress(index) +} + +func (c *RustMultiVolumeCluster) VolumePublicURL(index int) string { + return "http://" + c.VolumePublicAddress(index) +} + +func (c *RustMultiVolumeCluster) BaseDir() string { + return c.baseDir +} diff --git a/test/volume_server/grpc/move_tail_timestamp_test.go b/test/volume_server/grpc/move_tail_timestamp_test.go index 8d5e01a47..32068079e 100644 --- a/test/volume_server/grpc/move_tail_timestamp_test.go +++ b/test/volume_server/grpc/move_tail_timestamp_test.go @@ -29,7 +29,7 @@ func TestVolumeCopyReturnsPreciseLastAppendTimestamp(t *testing.T) { t.Skip("skipping integration test in short mode") } - cluster := framework.StartDualVolumeCluster(t, matrix.P1()) + cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2) sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) defer sourceConn.Close() destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) @@ -156,7 +156,7 @@ func TestVolumeMoveHandlesInFlightWrites(t *testing.T) { t.Skip("skipping integration test in short mode") } - cluster := framework.StartDualVolumeCluster(t, matrix.P1()) + cluster := framework.StartMultiVolumeClusterAuto(t, matrix.P1(), 2) sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) defer sourceConn.Close() destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))