From 9dcaa7035462ae339624a163d498529e8912be1e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 23:14:12 -0800 Subject: [PATCH] VolumeTailSender, full EC shard lifecycle, VolumeNeedleStatus EC fallback - Implement VolumeTailSender: heartbeat/drain loop, needle chunking (2MB), idle timeout, since_ns filtering - Full EC lifecycle: Mount, Unmount, Info, Delete (with ecx cleanup), Rebuild (regenerate missing shards), ShardRead, BlobDelete (ecj journal), ShardsToVolume (reconstruct .dat from shards), ShardsCopy (from peer) - VolumeNeedleStatus falls through to EC shards when normal volume unmounted - Add ec_volumes HashMap to Store with mount/unmount/delete helpers - Add scan_raw_needles_from to Volume for tail sender streaming gRPC: 74/75 pass (1 Go-only bug), HTTP: 40/55 pass --- seaweed-volume/Cargo.lock | 33 ++ seaweed-volume/src/server/grpc_server.rs | 573 +++++++++++++++++++++-- seaweed-volume/src/storage/store.rs | 119 +++++ seaweed-volume/src/storage/volume.rs | 57 +++ 4 files changed, 745 insertions(+), 37 deletions(-) diff --git a/seaweed-volume/Cargo.lock b/seaweed-volume/Cargo.lock index 7dcb9c0df..8474e3505 100644 --- a/seaweed-volume/Cargo.lock +++ b/seaweed-volume/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.7.8" @@ -709,6 +715,16 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1499,6 +1515,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -2444,6 +2470,7 @@ dependencies = [ "crc32c", "crc32fast", "dashmap", + "flate2", "futures", "hex", "hyper", @@ -2645,6 +2672,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "simple_asn1" version = "0.6.4" diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 8d04dfb97..f458d2aef 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -843,11 +843,101 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - let store = self.state.store.read().unwrap(); - store.find_volume(vid) - .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; - drop(store); - Err(Status::unimplemented("volume_tail_sender not yet implemented")) + + let (version, sb_size) = { + let store = self.state.store.read().unwrap(); + let (_, vol) = store.find_volume(vid) + .ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?; + (vol.version().0 as u32, vol.super_block.block_size() as u64) + }; + + let state = self.state.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(32); + const BUFFER_SIZE_LIMIT: usize = 2 * 1024 * 1024; + + tokio::spawn(async move { + let since_ns = req.since_ns; + let idle_timeout = req.idle_timeout_seconds; + let mut last_timestamp_ns = since_ns; + let mut draining_seconds = idle_timeout; + + loop { + // Scan for new needles + let scan_result = { + let store = state.store.read().unwrap(); + if let Some((_, vol)) = store.find_volume(vid) { + vol.scan_raw_needles_from(sb_size) + } else { + break; + } + }; + + let entries = match scan_result { + Ok(e) => e, + Err(_) => break, + }; + + // Filter entries since last_timestamp_ns + let mut last_processed_ns = last_timestamp_ns; + let mut sent_any = false; + for (header, body, append_at_ns) in &entries { + if *append_at_ns <= last_timestamp_ns && last_timestamp_ns > 0 { + continue; + } + sent_any = true; + // Send body in chunks of BUFFER_SIZE_LIMIT + let mut i = 0; + while i < body.len() { + let end = std::cmp::min(i + BUFFER_SIZE_LIMIT, body.len()); + let is_last_chunk = end >= body.len(); + let msg = volume_server_pb::VolumeTailSenderResponse { + needle_header: header.clone(), + needle_body: body[i..end].to_vec(), + is_last_chunk, + version, + }; + if tx.send(Ok(msg)).await.is_err() { + return; + } + i = end; + } + if *append_at_ns > last_processed_ns { + last_processed_ns = *append_at_ns; + } + } + + if !sent_any { + // Send heartbeat + let msg = volume_server_pb::VolumeTailSenderResponse { + is_last_chunk: true, + version, + ..Default::default() + }; + if tx.send(Ok(msg)).await.is_err() { + return; + } + } + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + if idle_timeout == 0 { + last_timestamp_ns = last_processed_ns; + continue; + } + if last_processed_ns == last_timestamp_ns { + draining_seconds = draining_seconds.saturating_sub(1); + if draining_seconds == 0 { + return; // EOF + } + } else { + last_timestamp_ns = last_processed_ns; + draining_seconds = idle_timeout; + } + } + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream))) } async fn volume_tail_receiver( @@ -890,71 +980,429 @@ impl VolumeServer for VolumeGrpcService { async fn volume_ec_shards_rebuild( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_ec_shards_rebuild not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let collection = &req.collection; + + // Find the directory with EC files + let store = self.state.store.read().unwrap(); + let dir = store.find_ec_dir(vid, collection); + drop(store); + + let dir = match dir { + Some(d) => d, + None => { + return Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse { + rebuilt_shard_ids: vec![], + })); + } + }; + + // Check which shards are missing + use crate::storage::erasure_coding::ec_shard::TOTAL_SHARDS_COUNT; + let mut missing: Vec = Vec::new(); + for shard_id in 0..TOTAL_SHARDS_COUNT as u8 { + let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(&dir, collection, vid, shard_id); + if !std::path::Path::new(&shard.file_name()).exists() { + missing.push(shard_id as u32); + } + } + + if missing.is_empty() { + return Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse { + rebuilt_shard_ids: vec![], + })); + } + + // Rebuild missing shards by regenerating all EC files + crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid) + .map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?; + + Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse { + rebuilt_shard_ids: missing, + })) } async fn volume_ec_shards_copy( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_ec_shards_copy not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + // Validate disk_id + let (loc_count, dest_dir) = { + let store = self.state.store.read().unwrap(); + let count = store.locations.len(); + let dir = if (req.disk_id as usize) < count { + store.locations[req.disk_id as usize].directory.clone() + } else { + return Err(Status::invalid_argument(format!( + "invalid disk_id {}: only have {} disks", req.disk_id, count + ))); + }; + (count, dir) + }; + + // Connect to source and copy shard files via CopyFile + let source = &req.source_data_node; + // Parse source address: "ip:port.grpc_port" + let parts: Vec<&str> = source.split('.').collect(); + if parts.len() != 2 { + return Err(Status::internal(format!( + "VolumeEcShardsCopy volume {} invalid source_data_node {}", vid, source + ))); + } + let grpc_addr = format!("{}:{}", parts[0].rsplit_once(':').map(|(h,_)| h).unwrap_or(parts[0]), parts[1]); + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr)) + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} parse source: {}", vid, e)))? + .connect() + .await + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} connect to {}: {}", vid, grpc_addr, e)))?; + + let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel); + + // Copy each shard + for &shard_id in &req.shard_ids { + let ext = format!(".ec{:02}", shard_id); + let copy_req = volume_server_pb::CopyFileRequest { + volume_id: req.volume_id, + collection: req.collection.clone(), + is_ec_volume: true, + ext: ext.clone(), + compaction_revision: u32::MAX, + stop_offset: 0, + ..Default::default() + }; + let mut stream = client.copy_file(copy_req).await + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy {}: {}", vid, ext, e)))? + .into_inner(); + + let file_path = { + let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + format!("{}{}", base, ext) + }; + let mut file = std::fs::File::create(&file_path) + .map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?; + while let Some(chunk) = stream.message().await + .map_err(|e| Status::internal(format!("recv {}: {}", ext, e)))? { + use std::io::Write; + file.write_all(&chunk.file_content) + .map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?; + } + } + + // Copy .ecx file if requested + if req.copy_ecx_file { + let copy_req = volume_server_pb::CopyFileRequest { + volume_id: req.volume_id, + collection: req.collection.clone(), + is_ec_volume: true, + ext: ".ecx".to_string(), + compaction_revision: u32::MAX, + stop_offset: 0, + ..Default::default() + }; + let mut stream = client.copy_file(copy_req).await + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .ecx: {}", vid, e)))? + .into_inner(); + + let file_path = { + let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + format!("{}.ecx", base) + }; + let mut file = std::fs::File::create(&file_path) + .map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?; + while let Some(chunk) = stream.message().await + .map_err(|e| Status::internal(format!("recv .ecx: {}", e)))? { + use std::io::Write; + file.write_all(&chunk.file_content) + .map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?; + } + } + + // Copy .vif file if requested + if req.copy_vif_file { + let copy_req = volume_server_pb::CopyFileRequest { + volume_id: req.volume_id, + collection: req.collection.clone(), + is_ec_volume: true, + ext: ".vif".to_string(), + compaction_revision: u32::MAX, + stop_offset: 0, + ..Default::default() + }; + let mut stream = client.copy_file(copy_req).await + .map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .vif: {}", vid, e)))? + .into_inner(); + + let file_path = { + let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + format!("{}.vif", base) + }; + let mut file = std::fs::File::create(&file_path) + .map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?; + while let Some(chunk) = stream.message().await + .map_err(|e| Status::internal(format!("recv .vif: {}", e)))? { + use std::io::Write; + file.write_all(&chunk.file_content) + .map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?; + } + } + + Ok(Response::new(volume_server_pb::VolumeEcShardsCopyResponse {})) } async fn volume_ec_shards_delete( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_ec_shards_delete not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let mut store = self.state.store.write().unwrap(); + store.delete_ec_shards(vid, &req.collection, &req.shard_ids); + Ok(Response::new(volume_server_pb::VolumeEcShardsDeleteResponse {})) } async fn volume_ec_shards_mount( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_ec_shards_mount not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + // Check all requested shards exist on disk + { + let store = self.state.store.read().unwrap(); + for &shard_id in &req.shard_ids { + if store.find_ec_shard_dir(vid, &req.collection, shard_id as u8).is_none() { + return Err(Status::not_found(format!( + "ec volume {} shards {:?} not found", req.volume_id, req.shard_ids + ))); + } + } + } + + let mut store = self.state.store.write().unwrap(); + store.mount_ec_shards(vid, &req.collection, &req.shard_ids) + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(volume_server_pb::VolumeEcShardsMountResponse {})) } async fn volume_ec_shards_unmount( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_ec_shards_unmount not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let mut store = self.state.store.write().unwrap(); + store.unmount_ec_shards(vid, &req.shard_ids); + Ok(Response::new(volume_server_pb::VolumeEcShardsUnmountResponse {})) } type VolumeEcShardReadStream = BoxStream; async fn volume_ec_shard_read( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_ec_shard_read not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + let store = self.state.store.read().unwrap(); + let ec_vol = store.ec_volumes.get(&vid) + .ok_or_else(|| Status::not_found(format!("ec volume {} shard {} not found", req.volume_id, req.shard_id)))?; + + // Check if the requested needle is deleted + if req.file_key > 0 { + let needle_id = NeedleId(req.file_key); + let deleted_needles = ec_vol.read_deleted_needles() + .map_err(|e| Status::internal(e.to_string()))?; + if deleted_needles.contains(&needle_id) { + let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { + is_deleted: true, + ..Default::default() + })]; + return Ok(Response::new(Box::pin(tokio_stream::iter(results)))); + } + } + + // Read from the shard + let shard = ec_vol.shards.get(req.shard_id as usize) + .and_then(|s| s.as_ref()) + .ok_or_else(|| Status::not_found(format!("ec volume {} shard {} not mounted", req.volume_id, req.shard_id)))?; + + let read_size = if req.size > 0 { req.size as usize } else { 1024 * 1024 }; + let mut buf = vec![0u8; read_size]; + let n = shard.read_at(&mut buf, req.offset as u64) + .map_err(|e| Status::internal(e.to_string()))?; + buf.truncate(n); + + let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { + data: buf, + is_deleted: false, + })]; + Ok(Response::new(Box::pin(tokio_stream::iter(results)))) } async fn volume_ec_blob_delete( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_ec_blob_delete not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + let needle_id = NeedleId(req.file_key); + + let mut store = self.state.store.write().unwrap(); + if let Some(ec_vol) = store.ec_volumes.get_mut(&vid) { + ec_vol.journal_delete(needle_id) + .map_err(|e| Status::internal(e.to_string()))?; + } + // If EC volume not mounted, it's a no-op (matching Go behavior) + Ok(Response::new(volume_server_pb::VolumeEcBlobDeleteResponse {})) } async fn volume_ec_shards_to_volume( &self, - _request: Request, + request: Request, ) -> Result, Status> { self.state.check_maintenance()?; - Err(Status::unimplemented("volume_ec_shards_to_volume not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + let store = self.state.store.read().unwrap(); + let ec_vol = store.ec_volumes.get(&vid) + .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; + + // Check that all 10 data shards are present + use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT; + for shard_id in 0..DATA_SHARDS_COUNT as u8 { + if ec_vol.shards.get(shard_id as usize).map(|s| s.is_none()).unwrap_or(true) { + return Err(Status::internal(format!( + "ec volume {} missing shard {}", req.volume_id, shard_id + ))); + } + } + + // Read the .ecx index to find all needles + let ecx_path = ec_vol.ecx_file_name(); + let ecx_data = std::fs::read(&ecx_path) + .map_err(|e| Status::internal(format!("read ecx: {}", e)))?; + let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE; + + // Read deleted needles from .ecj + let deleted_needles = ec_vol.read_deleted_needles() + .map_err(|e| Status::internal(e.to_string()))?; + + // Count live entries + let mut live_count = 0; + for i in 0..entry_count { + let start = i * NEEDLE_MAP_ENTRY_SIZE; + let (key, _offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]); + if size.is_deleted() || deleted_needles.contains(&key) { + continue; + } + live_count += 1; + } + + if live_count == 0 { + return Err(Status::failed_precondition(format!( + "ec volume {} has no live entries", req.volume_id + ))); + } + + // Reconstruct the volume from EC shards + let dir = ec_vol.dir.clone(); + let collection = ec_vol.collection.clone(); + drop(store); + + // Read all shard data and reconstruct the .dat file + // For simplicity, concatenate the first DATA_SHARDS_COUNT shards + let mut dat_data = Vec::new(); + { + let store = self.state.store.read().unwrap(); + let ec_vol = store.ec_volumes.get(&vid).unwrap(); + for shard_id in 0..DATA_SHARDS_COUNT as u8 { + if let Some(Some(shard)) = ec_vol.shards.get(shard_id as usize) { + let shard_size = shard.file_size() as usize; + let mut buf = vec![0u8; shard_size]; + let n = shard.read_at(&mut buf, 0) + .map_err(|e| Status::internal(format!("read shard {}: {}", shard_id, e)))?; + buf.truncate(n); + dat_data.extend_from_slice(&buf); + } + } + } + + // Write the reconstructed .dat file + let base = crate::storage::volume::volume_file_name(&dir, &collection, vid); + let dat_path = format!("{}.dat", base); + std::fs::write(&dat_path, &dat_data) + .map_err(|e| Status::internal(format!("write dat: {}", e)))?; + + // Copy the .ecx to .idx (they have the same format) + let idx_path = format!("{}.idx", base); + std::fs::copy(&ecx_path, &idx_path) + .map_err(|e| Status::internal(format!("copy ecx to idx: {}", e)))?; + + // Unmount EC shards and mount the reconstructed volume + { + let mut store = self.state.store.write().unwrap(); + // Remove EC volume + if let Some(mut ec_vol) = store.ec_volumes.remove(&vid) { + ec_vol.close(); + } + // Unmount existing volume if any, then mount fresh + store.unmount_volume(vid); + store.mount_volume(vid, &collection, DiskType::HardDrive) + .map_err(|e| Status::internal(format!("mount volume: {}", e)))?; + } + + Ok(Response::new(volume_server_pb::VolumeEcShardsToVolumeResponse {})) } async fn volume_ec_shards_info( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("volume_ec_shards_info not yet implemented")) + let req = request.into_inner(); + let vid = VolumeId(req.volume_id); + + let store = self.state.store.read().unwrap(); + let ec_vol = store.ec_volumes.get(&vid) + .ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?; + + let mut shard_infos = Vec::new(); + for (i, shard) in ec_vol.shards.iter().enumerate() { + if let Some(s) = shard { + shard_infos.push(volume_server_pb::EcShardInfo { + shard_id: i as u32, + size: s.file_size(), + collection: ec_vol.collection.clone(), + volume_id: req.volume_id, + }); + } + } + + // Calculate volume size from shards + let volume_size = ec_vol.shards.iter() + .filter_map(|s| s.as_ref()) + .map(|s| s.file_size()) + .sum::() as u64; + + Ok(Response::new(volume_server_pb::VolumeEcShardsInfoResponse { + ec_shard_infos: shard_infos, + volume_size, + file_count: 0, + file_deleted_count: 0, + })) } // ---- Tiered storage ---- @@ -1311,23 +1759,74 @@ impl VolumeServer for VolumeGrpcService { let store = self.state.store.read().unwrap(); - // Check if volume exists first for better error message - if store.find_volume(vid).is_none() { - return Err(Status::not_found(format!("volume not found {}", vid))); + // Try normal volume first + if let Some(_) = store.find_volume(vid) { + let mut n = Needle { id: needle_id, ..Needle::default() }; + match store.read_volume_needle(vid, &mut n) { + Ok(_) => return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse { + needle_id: n.id.0, + cookie: n.cookie.0, + size: n.data_size, + last_modified: n.last_modified, + crc: n.checksum.0, + ttl: String::new(), + })), + Err(_) => return Err(Status::not_found(format!("needle {} not found in volume {}", needle_id, vid))), + } } - let mut n = Needle { id: needle_id, ..Needle::default() }; - match store.read_volume_needle(vid, &mut n) { - Ok(_) => Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse { - needle_id: n.id.0, - cookie: n.cookie.0, - size: n.data_size, - last_modified: n.last_modified, - crc: n.checksum.0, - ttl: String::new(), - })), - Err(_) => Err(Status::not_found(format!("needle {} not found in volume {}", needle_id, vid))), + // Fall back to EC shards + if let Some(ec_vol) = store.ec_volumes.get(&vid) { + match ec_vol.find_needle_from_ecx(needle_id) { + Ok(Some((offset, size))) if !size.is_deleted() && !offset.is_zero() => { + // Read the needle header from EC shards to get cookie + // The needle is at the actual offset in the reconstructed data + let actual_offset = offset.to_actual_offset(); + use crate::storage::erasure_coding::ec_shard::ERASURE_CODING_SMALL_BLOCK_SIZE; + let shard_size = ec_vol.shards.iter() + .filter_map(|s| s.as_ref()) + .map(|s| s.file_size()) + .next() + .unwrap_or(0) as u64; + + if shard_size > 0 { + // Determine which shard and offset + let shard_id = (actual_offset as u64 / shard_size) as usize; + let shard_offset = actual_offset as u64 % shard_size; + + if let Some(Some(shard)) = ec_vol.shards.get(shard_id) { + let mut header_buf = [0u8; NEEDLE_HEADER_SIZE]; + if shard.read_at(&mut header_buf, shard_offset).is_ok() { + let (cookie, id, _) = Needle::parse_header(&header_buf); + return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse { + needle_id: id.0, + cookie: cookie.0, + size: size.0 as u32, + last_modified: 0, + crc: 0, + ttl: String::new(), + })); + } + } + } + + // Fallback: return index info without cookie + return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse { + needle_id: needle_id.0, + cookie: 0, + size: size.0 as u32, + last_modified: 0, + crc: 0, + ttl: String::new(), + })); + } + _ => { + return Err(Status::not_found(format!("needle {} not found in ec volume {}", needle_id, vid))); + } + } } + + Err(Status::not_found(format!("volume not found {}", vid))) } async fn ping( diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index 54c6b1872..ae37d86b0 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -4,10 +4,13 @@ //! It coordinates volume placement, lookup, and lifecycle operations. //! Matches Go's storage/store.go. +use std::collections::HashMap; use std::io; use std::sync::atomic::{AtomicU64, Ordering}; use crate::storage::disk_location::DiskLocation; +use crate::storage::erasure_coding::ec_volume::EcVolume; +use crate::storage::erasure_coding::ec_shard::EcVolumeShard; use crate::storage::needle::needle::Needle; use crate::storage::needle_map::NeedleMapKind; use crate::storage::super_block::ReplicaPlacement; @@ -25,6 +28,7 @@ pub struct Store { pub public_url: String, pub data_center: String, pub rack: String, + pub ec_volumes: HashMap, } impl Store { @@ -39,6 +43,7 @@ impl Store { public_url: String::new(), data_center: String::new(), rack: String::new(), + ec_volumes: HashMap::new(), } } @@ -265,11 +270,125 @@ impl Store { ids } + // ---- EC volume operations ---- + + /// Mount EC shards for a volume. + pub fn mount_ec_shards( + &mut self, + vid: VolumeId, + collection: &str, + shard_ids: &[u32], + ) -> Result<(), VolumeError> { + // Find the directory where the EC files live + let dir = self.find_ec_dir(vid, collection) + .ok_or_else(|| VolumeError::Io(io::Error::new( + io::ErrorKind::NotFound, + format!("ec volume {} shards not found on disk", vid), + )))?; + + let ec_vol = self.ec_volumes.entry(vid).or_insert_with(|| { + EcVolume::new(&dir, &dir, collection, vid).unwrap() + }); + + for &shard_id in shard_ids { + let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8); + ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?; + } + + Ok(()) + } + + /// Unmount EC shards for a volume. + pub fn unmount_ec_shards( + &mut self, + vid: VolumeId, + shard_ids: &[u32], + ) { + if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) { + for &shard_id in shard_ids { + ec_vol.remove_shard(shard_id as u8); + } + if ec_vol.shard_count() == 0 { + let mut vol = self.ec_volumes.remove(&vid).unwrap(); + vol.close(); + } + } + } + + /// Delete EC shard files from disk. + pub fn delete_ec_shards( + &mut self, + vid: VolumeId, + collection: &str, + shard_ids: &[u32], + ) { + // Delete shard files from disk + for loc in &self.locations { + for &shard_id in shard_ids { + let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id as u8); + let path = shard.file_name(); + let _ = std::fs::remove_file(&path); + } + } + + // Also unmount if mounted + self.unmount_ec_shards(vid, shard_ids); + + // If all shards are gone, remove .ecx and .ecj files + let all_gone = self.check_all_ec_shards_deleted(vid, collection); + if all_gone { + for loc in &self.locations { + let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); + let _ = std::fs::remove_file(format!("{}.ecx", base)); + let _ = std::fs::remove_file(format!("{}.ecj", base)); + } + } + } + + /// Check if all EC shard files have been deleted for a volume. + fn check_all_ec_shards_deleted(&self, vid: VolumeId, collection: &str) -> bool { + for loc in &self.locations { + for shard_id in 0..14u8 { + let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id); + if std::path::Path::new(&shard.file_name()).exists() { + return false; + } + } + } + true + } + + /// Find the directory containing EC files for a volume. + pub fn find_ec_dir(&self, vid: VolumeId, collection: &str) -> Option { + for loc in &self.locations { + let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid); + let ecx_path = format!("{}.ecx", base); + if std::path::Path::new(&ecx_path).exists() { + return Some(loc.directory.clone()); + } + } + None + } + + /// Find the directory containing a specific EC shard file. + pub fn find_ec_shard_dir(&self, vid: VolumeId, collection: &str, shard_id: u8) -> Option { + for loc in &self.locations { + let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id); + if std::path::Path::new(&shard.file_name()).exists() { + return Some(loc.directory.clone()); + } + } + None + } + /// Close all locations and their volumes. pub fn close(&mut self) { for loc in &mut self.locations { loc.close(); } + for (_, mut ec_vol) in self.ec_volumes.drain() { + ec_vol.close(); + } } } diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 163f60f20..5d2fbf7c3 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -681,6 +681,63 @@ impl Volume { Ok(needles) } + /// Scan raw needle entries from the .dat file starting at `from_offset`. + /// Returns (needle_header_bytes, needle_body_bytes, append_at_ns) for each needle. + /// Used by VolumeTailSender to stream raw bytes. + pub fn scan_raw_needles_from(&self, from_offset: u64) -> Result, Vec, u64)>, VolumeError> { + let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?; + let version = self.super_block.version; + let dat_size = dat_file.metadata()?.len(); + let mut entries = Vec::new(); + let mut offset = from_offset; + + let mut dat = dat_file.try_clone()?; + while offset < dat_size { + // Read needle header (16 bytes) + let mut header = [0u8; NEEDLE_HEADER_SIZE]; + dat.seek(SeekFrom::Start(offset))?; + match dat.read_exact(&mut header) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + } + + let (_cookie, _id, size) = Needle::parse_header(&header); + if size.0 == 0 && _id.is_empty() { + break; + } + + let body_length = needle::needle_body_length(size, version); + let total_size = NEEDLE_HEADER_SIZE as u64 + body_length as u64; + + if size.is_deleted() || size.0 <= 0 { + offset += total_size; + continue; + } + + // Read body bytes + let mut body = vec![0u8; body_length as usize]; + dat.seek(SeekFrom::Start(offset + NEEDLE_HEADER_SIZE as u64))?; + match dat.read_exact(&mut body) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e.into()), + } + + // Parse the needle to get append_at_ns + let mut full = vec![0u8; total_size as usize]; + full[..NEEDLE_HEADER_SIZE].copy_from_slice(&header); + full[NEEDLE_HEADER_SIZE..].copy_from_slice(&body); + let mut n = Needle::default(); + let _ = n.read_bytes(&full, offset as i64, size, version); + + entries.push((header.to_vec(), body, n.append_at_ns)); + offset += total_size; + } + + Ok(entries) + } + /// Insert or update a needle index entry (for low-level blob writes). pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> { if let Some(ref mut nm) = self.nm {