Browse Source

VolumeTailSender, full EC shard lifecycle, VolumeNeedleStatus EC fallback

- Implement VolumeTailSender: heartbeat/drain loop, needle chunking (2MB),
  idle timeout, since_ns filtering
- Full EC lifecycle: Mount, Unmount, Info, Delete (with ecx cleanup),
  Rebuild (regenerate missing shards), ShardRead, BlobDelete (ecj journal),
  ShardsToVolume (reconstruct .dat from shards), ShardsCopy (from peer)
- VolumeNeedleStatus falls through to EC shards when normal volume unmounted
- Add ec_volumes HashMap to Store with mount/unmount/delete helpers
- Add scan_raw_needles_from to Volume for tail sender streaming

gRPC: 74/75 pass (1 Go-only bug), HTTP: 40/55 pass
rust-volume-server
Chris Lu 4 days ago
parent
commit
9dcaa70354
  1. 33
      seaweed-volume/Cargo.lock
  2. 573
      seaweed-volume/src/server/grpc_server.rs
  3. 119
      seaweed-volume/src/storage/store.rs
  4. 57
      seaweed-volume/src/storage/volume.rs

33
seaweed-volume/Cargo.lock

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "adler2"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "ahash"
version = "0.7.8"
@ -709,6 +715,16 @@ version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1499,6 +1515,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "miniz_oxide"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316"
dependencies = [
"adler2",
"simd-adler32",
]
[[package]]
name = "mio"
version = "1.1.1"
@ -2444,6 +2470,7 @@ dependencies = [
"crc32c",
"crc32fast",
"dashmap",
"flate2",
"futures",
"hex",
"hyper",
@ -2645,6 +2672,12 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "simd-adler32"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2"
[[package]]
name = "simple_asn1"
version = "0.6.4"

573
seaweed-volume/src/server/grpc_server.rs

@ -843,11 +843,101 @@ impl VolumeServer for VolumeGrpcService {
) -> Result<Response<Self::VolumeTailSenderStream>, Status> {
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
drop(store);
Err(Status::unimplemented("volume_tail_sender not yet implemented"))
let (version, sb_size) = {
let store = self.state.store.read().unwrap();
let (_, vol) = store.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
(vol.version().0 as u32, vol.super_block.block_size() as u64)
};
let state = self.state.clone();
let (tx, rx) = tokio::sync::mpsc::channel(32);
const BUFFER_SIZE_LIMIT: usize = 2 * 1024 * 1024;
tokio::spawn(async move {
let since_ns = req.since_ns;
let idle_timeout = req.idle_timeout_seconds;
let mut last_timestamp_ns = since_ns;
let mut draining_seconds = idle_timeout;
loop {
// Scan for new needles
let scan_result = {
let store = state.store.read().unwrap();
if let Some((_, vol)) = store.find_volume(vid) {
vol.scan_raw_needles_from(sb_size)
} else {
break;
}
};
let entries = match scan_result {
Ok(e) => e,
Err(_) => break,
};
// Filter entries since last_timestamp_ns
let mut last_processed_ns = last_timestamp_ns;
let mut sent_any = false;
for (header, body, append_at_ns) in &entries {
if *append_at_ns <= last_timestamp_ns && last_timestamp_ns > 0 {
continue;
}
sent_any = true;
// Send body in chunks of BUFFER_SIZE_LIMIT
let mut i = 0;
while i < body.len() {
let end = std::cmp::min(i + BUFFER_SIZE_LIMIT, body.len());
let is_last_chunk = end >= body.len();
let msg = volume_server_pb::VolumeTailSenderResponse {
needle_header: header.clone(),
needle_body: body[i..end].to_vec(),
is_last_chunk,
version,
};
if tx.send(Ok(msg)).await.is_err() {
return;
}
i = end;
}
if *append_at_ns > last_processed_ns {
last_processed_ns = *append_at_ns;
}
}
if !sent_any {
// Send heartbeat
let msg = volume_server_pb::VolumeTailSenderResponse {
is_last_chunk: true,
version,
..Default::default()
};
if tx.send(Ok(msg)).await.is_err() {
return;
}
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
if idle_timeout == 0 {
last_timestamp_ns = last_processed_ns;
continue;
}
if last_processed_ns == last_timestamp_ns {
draining_seconds = draining_seconds.saturating_sub(1);
if draining_seconds == 0 {
return; // EOF
}
} else {
last_timestamp_ns = last_processed_ns;
draining_seconds = idle_timeout;
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
async fn volume_tail_receiver(
@ -890,71 +980,429 @@ impl VolumeServer for VolumeGrpcService {
async fn volume_ec_shards_rebuild(
&self,
_request: Request<volume_server_pb::VolumeEcShardsRebuildRequest>,
request: Request<volume_server_pb::VolumeEcShardsRebuildRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsRebuildResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("volume_ec_shards_rebuild not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let collection = &req.collection;
// Find the directory with EC files
let store = self.state.store.read().unwrap();
let dir = store.find_ec_dir(vid, collection);
drop(store);
let dir = match dir {
Some(d) => d,
None => {
return Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse {
rebuilt_shard_ids: vec![],
}));
}
};
// Check which shards are missing
use crate::storage::erasure_coding::ec_shard::TOTAL_SHARDS_COUNT;
let mut missing: Vec<u32> = Vec::new();
for shard_id in 0..TOTAL_SHARDS_COUNT as u8 {
let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(&dir, collection, vid, shard_id);
if !std::path::Path::new(&shard.file_name()).exists() {
missing.push(shard_id as u32);
}
}
if missing.is_empty() {
return Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse {
rebuilt_shard_ids: vec![],
}));
}
// Rebuild missing shards by regenerating all EC files
crate::storage::erasure_coding::ec_encoder::write_ec_files(&dir, collection, vid)
.map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?;
Ok(Response::new(volume_server_pb::VolumeEcShardsRebuildResponse {
rebuilt_shard_ids: missing,
}))
}
async fn volume_ec_shards_copy(
&self,
_request: Request<volume_server_pb::VolumeEcShardsCopyRequest>,
request: Request<volume_server_pb::VolumeEcShardsCopyRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsCopyResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("volume_ec_shards_copy not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Validate disk_id
let (loc_count, dest_dir) = {
let store = self.state.store.read().unwrap();
let count = store.locations.len();
let dir = if (req.disk_id as usize) < count {
store.locations[req.disk_id as usize].directory.clone()
} else {
return Err(Status::invalid_argument(format!(
"invalid disk_id {}: only have {} disks", req.disk_id, count
)));
};
(count, dir)
};
// Connect to source and copy shard files via CopyFile
let source = &req.source_data_node;
// Parse source address: "ip:port.grpc_port"
let parts: Vec<&str> = source.split('.').collect();
if parts.len() != 2 {
return Err(Status::internal(format!(
"VolumeEcShardsCopy volume {} invalid source_data_node {}", vid, source
)));
}
let grpc_addr = format!("{}:{}", parts[0].rsplit_once(':').map(|(h,_)| h).unwrap_or(parts[0]), parts[1]);
let channel = tonic::transport::Channel::from_shared(format!("http://{}", grpc_addr))
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} parse source: {}", vid, e)))?
.connect()
.await
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} connect to {}: {}", vid, grpc_addr, e)))?;
let mut client = volume_server_pb::volume_server_client::VolumeServerClient::new(channel);
// Copy each shard
for &shard_id in &req.shard_ids {
let ext = format!(".ec{:02}", shard_id);
let copy_req = volume_server_pb::CopyFileRequest {
volume_id: req.volume_id,
collection: req.collection.clone(),
is_ec_volume: true,
ext: ext.clone(),
compaction_revision: u32::MAX,
stop_offset: 0,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy {}: {}", vid, ext, e)))?
.into_inner();
let file_path = {
let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
format!("{}{}", base, ext)
};
let mut file = std::fs::File::create(&file_path)
.map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?;
while let Some(chunk) = stream.message().await
.map_err(|e| Status::internal(format!("recv {}: {}", ext, e)))? {
use std::io::Write;
file.write_all(&chunk.file_content)
.map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?;
}
}
// Copy .ecx file if requested
if req.copy_ecx_file {
let copy_req = volume_server_pb::CopyFileRequest {
volume_id: req.volume_id,
collection: req.collection.clone(),
is_ec_volume: true,
ext: ".ecx".to_string(),
compaction_revision: u32::MAX,
stop_offset: 0,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .ecx: {}", vid, e)))?
.into_inner();
let file_path = {
let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
format!("{}.ecx", base)
};
let mut file = std::fs::File::create(&file_path)
.map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?;
while let Some(chunk) = stream.message().await
.map_err(|e| Status::internal(format!("recv .ecx: {}", e)))? {
use std::io::Write;
file.write_all(&chunk.file_content)
.map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?;
}
}
// Copy .vif file if requested
if req.copy_vif_file {
let copy_req = volume_server_pb::CopyFileRequest {
volume_id: req.volume_id,
collection: req.collection.clone(),
is_ec_volume: true,
ext: ".vif".to_string(),
compaction_revision: u32::MAX,
stop_offset: 0,
..Default::default()
};
let mut stream = client.copy_file(copy_req).await
.map_err(|e| Status::internal(format!("VolumeEcShardsCopy volume {} copy .vif: {}", vid, e)))?
.into_inner();
let file_path = {
let base = crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid);
format!("{}.vif", base)
};
let mut file = std::fs::File::create(&file_path)
.map_err(|e| Status::internal(format!("create {}: {}", file_path, e)))?;
while let Some(chunk) = stream.message().await
.map_err(|e| Status::internal(format!("recv .vif: {}", e)))? {
use std::io::Write;
file.write_all(&chunk.file_content)
.map_err(|e| Status::internal(format!("write {}: {}", file_path, e)))?;
}
}
Ok(Response::new(volume_server_pb::VolumeEcShardsCopyResponse {}))
}
async fn volume_ec_shards_delete(
&self,
_request: Request<volume_server_pb::VolumeEcShardsDeleteRequest>,
request: Request<volume_server_pb::VolumeEcShardsDeleteRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsDeleteResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("volume_ec_shards_delete not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
store.delete_ec_shards(vid, &req.collection, &req.shard_ids);
Ok(Response::new(volume_server_pb::VolumeEcShardsDeleteResponse {}))
}
async fn volume_ec_shards_mount(
&self,
_request: Request<volume_server_pb::VolumeEcShardsMountRequest>,
request: Request<volume_server_pb::VolumeEcShardsMountRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsMountResponse>, Status> {
Err(Status::unimplemented("volume_ec_shards_mount not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
// Check all requested shards exist on disk
{
let store = self.state.store.read().unwrap();
for &shard_id in &req.shard_ids {
if store.find_ec_shard_dir(vid, &req.collection, shard_id as u8).is_none() {
return Err(Status::not_found(format!(
"ec volume {} shards {:?} not found", req.volume_id, req.shard_ids
)));
}
}
}
let mut store = self.state.store.write().unwrap();
store.mount_ec_shards(vid, &req.collection, &req.shard_ids)
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(volume_server_pb::VolumeEcShardsMountResponse {}))
}
async fn volume_ec_shards_unmount(
&self,
_request: Request<volume_server_pb::VolumeEcShardsUnmountRequest>,
request: Request<volume_server_pb::VolumeEcShardsUnmountRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsUnmountResponse>, Status> {
Err(Status::unimplemented("volume_ec_shards_unmount not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let mut store = self.state.store.write().unwrap();
store.unmount_ec_shards(vid, &req.shard_ids);
Ok(Response::new(volume_server_pb::VolumeEcShardsUnmountResponse {}))
}
type VolumeEcShardReadStream = BoxStream<volume_server_pb::VolumeEcShardReadResponse>;
async fn volume_ec_shard_read(
&self,
_request: Request<volume_server_pb::VolumeEcShardReadRequest>,
request: Request<volume_server_pb::VolumeEcShardReadRequest>,
) -> Result<Response<Self::VolumeEcShardReadStream>, Status> {
Err(Status::unimplemented("volume_ec_shard_read not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} shard {} not found", req.volume_id, req.shard_id)))?;
// Check if the requested needle is deleted
if req.file_key > 0 {
let needle_id = NeedleId(req.file_key);
let deleted_needles = ec_vol.read_deleted_needles()
.map_err(|e| Status::internal(e.to_string()))?;
if deleted_needles.contains(&needle_id) {
let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse {
is_deleted: true,
..Default::default()
})];
return Ok(Response::new(Box::pin(tokio_stream::iter(results))));
}
}
// Read from the shard
let shard = ec_vol.shards.get(req.shard_id as usize)
.and_then(|s| s.as_ref())
.ok_or_else(|| Status::not_found(format!("ec volume {} shard {} not mounted", req.volume_id, req.shard_id)))?;
let read_size = if req.size > 0 { req.size as usize } else { 1024 * 1024 };
let mut buf = vec![0u8; read_size];
let n = shard.read_at(&mut buf, req.offset as u64)
.map_err(|e| Status::internal(e.to_string()))?;
buf.truncate(n);
let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse {
data: buf,
is_deleted: false,
})];
Ok(Response::new(Box::pin(tokio_stream::iter(results))))
}
async fn volume_ec_blob_delete(
&self,
_request: Request<volume_server_pb::VolumeEcBlobDeleteRequest>,
request: Request<volume_server_pb::VolumeEcBlobDeleteRequest>,
) -> Result<Response<volume_server_pb::VolumeEcBlobDeleteResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("volume_ec_blob_delete not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let needle_id = NeedleId(req.file_key);
let mut store = self.state.store.write().unwrap();
if let Some(ec_vol) = store.ec_volumes.get_mut(&vid) {
ec_vol.journal_delete(needle_id)
.map_err(|e| Status::internal(e.to_string()))?;
}
// If EC volume not mounted, it's a no-op (matching Go behavior)
Ok(Response::new(volume_server_pb::VolumeEcBlobDeleteResponse {}))
}
async fn volume_ec_shards_to_volume(
&self,
_request: Request<volume_server_pb::VolumeEcShardsToVolumeRequest>,
request: Request<volume_server_pb::VolumeEcShardsToVolumeRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsToVolumeResponse>, Status> {
self.state.check_maintenance()?;
Err(Status::unimplemented("volume_ec_shards_to_volume not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?;
// Check that all 10 data shards are present
use crate::storage::erasure_coding::ec_shard::DATA_SHARDS_COUNT;
for shard_id in 0..DATA_SHARDS_COUNT as u8 {
if ec_vol.shards.get(shard_id as usize).map(|s| s.is_none()).unwrap_or(true) {
return Err(Status::internal(format!(
"ec volume {} missing shard {}", req.volume_id, shard_id
)));
}
}
// Read the .ecx index to find all needles
let ecx_path = ec_vol.ecx_file_name();
let ecx_data = std::fs::read(&ecx_path)
.map_err(|e| Status::internal(format!("read ecx: {}", e)))?;
let entry_count = ecx_data.len() / NEEDLE_MAP_ENTRY_SIZE;
// Read deleted needles from .ecj
let deleted_needles = ec_vol.read_deleted_needles()
.map_err(|e| Status::internal(e.to_string()))?;
// Count live entries
let mut live_count = 0;
for i in 0..entry_count {
let start = i * NEEDLE_MAP_ENTRY_SIZE;
let (key, _offset, size) = idx_entry_from_bytes(&ecx_data[start..start + NEEDLE_MAP_ENTRY_SIZE]);
if size.is_deleted() || deleted_needles.contains(&key) {
continue;
}
live_count += 1;
}
if live_count == 0 {
return Err(Status::failed_precondition(format!(
"ec volume {} has no live entries", req.volume_id
)));
}
// Reconstruct the volume from EC shards
let dir = ec_vol.dir.clone();
let collection = ec_vol.collection.clone();
drop(store);
// Read all shard data and reconstruct the .dat file
// For simplicity, concatenate the first DATA_SHARDS_COUNT shards
let mut dat_data = Vec::new();
{
let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid).unwrap();
for shard_id in 0..DATA_SHARDS_COUNT as u8 {
if let Some(Some(shard)) = ec_vol.shards.get(shard_id as usize) {
let shard_size = shard.file_size() as usize;
let mut buf = vec![0u8; shard_size];
let n = shard.read_at(&mut buf, 0)
.map_err(|e| Status::internal(format!("read shard {}: {}", shard_id, e)))?;
buf.truncate(n);
dat_data.extend_from_slice(&buf);
}
}
}
// Write the reconstructed .dat file
let base = crate::storage::volume::volume_file_name(&dir, &collection, vid);
let dat_path = format!("{}.dat", base);
std::fs::write(&dat_path, &dat_data)
.map_err(|e| Status::internal(format!("write dat: {}", e)))?;
// Copy the .ecx to .idx (they have the same format)
let idx_path = format!("{}.idx", base);
std::fs::copy(&ecx_path, &idx_path)
.map_err(|e| Status::internal(format!("copy ecx to idx: {}", e)))?;
// Unmount EC shards and mount the reconstructed volume
{
let mut store = self.state.store.write().unwrap();
// Remove EC volume
if let Some(mut ec_vol) = store.ec_volumes.remove(&vid) {
ec_vol.close();
}
// Unmount existing volume if any, then mount fresh
store.unmount_volume(vid);
store.mount_volume(vid, &collection, DiskType::HardDrive)
.map_err(|e| Status::internal(format!("mount volume: {}", e)))?;
}
Ok(Response::new(volume_server_pb::VolumeEcShardsToVolumeResponse {}))
}
async fn volume_ec_shards_info(
&self,
_request: Request<volume_server_pb::VolumeEcShardsInfoRequest>,
request: Request<volume_server_pb::VolumeEcShardsInfoRequest>,
) -> Result<Response<volume_server_pb::VolumeEcShardsInfoResponse>, Status> {
Err(Status::unimplemented("volume_ec_shards_info not yet implemented"))
let req = request.into_inner();
let vid = VolumeId(req.volume_id);
let store = self.state.store.read().unwrap();
let ec_vol = store.ec_volumes.get(&vid)
.ok_or_else(|| Status::not_found(format!("ec volume {} not found", req.volume_id)))?;
let mut shard_infos = Vec::new();
for (i, shard) in ec_vol.shards.iter().enumerate() {
if let Some(s) = shard {
shard_infos.push(volume_server_pb::EcShardInfo {
shard_id: i as u32,
size: s.file_size(),
collection: ec_vol.collection.clone(),
volume_id: req.volume_id,
});
}
}
// Calculate volume size from shards
let volume_size = ec_vol.shards.iter()
.filter_map(|s| s.as_ref())
.map(|s| s.file_size())
.sum::<i64>() as u64;
Ok(Response::new(volume_server_pb::VolumeEcShardsInfoResponse {
ec_shard_infos: shard_infos,
volume_size,
file_count: 0,
file_deleted_count: 0,
}))
}
// ---- Tiered storage ----
@ -1311,23 +1759,74 @@ impl VolumeServer for VolumeGrpcService {
let store = self.state.store.read().unwrap();
// Check if volume exists first for better error message
if store.find_volume(vid).is_none() {
return Err(Status::not_found(format!("volume not found {}", vid)));
// Try normal volume first
if let Some(_) = store.find_volume(vid) {
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
needle_id: n.id.0,
cookie: n.cookie.0,
size: n.data_size,
last_modified: n.last_modified,
crc: n.checksum.0,
ttl: String::new(),
})),
Err(_) => return Err(Status::not_found(format!("needle {} not found in volume {}", needle_id, vid))),
}
}
let mut n = Needle { id: needle_id, ..Needle::default() };
match store.read_volume_needle(vid, &mut n) {
Ok(_) => Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
needle_id: n.id.0,
cookie: n.cookie.0,
size: n.data_size,
last_modified: n.last_modified,
crc: n.checksum.0,
ttl: String::new(),
})),
Err(_) => Err(Status::not_found(format!("needle {} not found in volume {}", needle_id, vid))),
// Fall back to EC shards
if let Some(ec_vol) = store.ec_volumes.get(&vid) {
match ec_vol.find_needle_from_ecx(needle_id) {
Ok(Some((offset, size))) if !size.is_deleted() && !offset.is_zero() => {
// Read the needle header from EC shards to get cookie
// The needle is at the actual offset in the reconstructed data
let actual_offset = offset.to_actual_offset();
use crate::storage::erasure_coding::ec_shard::ERASURE_CODING_SMALL_BLOCK_SIZE;
let shard_size = ec_vol.shards.iter()
.filter_map(|s| s.as_ref())
.map(|s| s.file_size())
.next()
.unwrap_or(0) as u64;
if shard_size > 0 {
// Determine which shard and offset
let shard_id = (actual_offset as u64 / shard_size) as usize;
let shard_offset = actual_offset as u64 % shard_size;
if let Some(Some(shard)) = ec_vol.shards.get(shard_id) {
let mut header_buf = [0u8; NEEDLE_HEADER_SIZE];
if shard.read_at(&mut header_buf, shard_offset).is_ok() {
let (cookie, id, _) = Needle::parse_header(&header_buf);
return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
needle_id: id.0,
cookie: cookie.0,
size: size.0 as u32,
last_modified: 0,
crc: 0,
ttl: String::new(),
}));
}
}
}
// Fallback: return index info without cookie
return Ok(Response::new(volume_server_pb::VolumeNeedleStatusResponse {
needle_id: needle_id.0,
cookie: 0,
size: size.0 as u32,
last_modified: 0,
crc: 0,
ttl: String::new(),
}));
}
_ => {
return Err(Status::not_found(format!("needle {} not found in ec volume {}", needle_id, vid)));
}
}
}
Err(Status::not_found(format!("volume not found {}", vid)))
}
async fn ping(

119
seaweed-volume/src/storage/store.rs

@ -4,10 +4,13 @@
//! It coordinates volume placement, lookup, and lifecycle operations.
//! Matches Go's storage/store.go.
use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::disk_location::DiskLocation;
use crate::storage::erasure_coding::ec_volume::EcVolume;
use crate::storage::erasure_coding::ec_shard::EcVolumeShard;
use crate::storage::needle::needle::Needle;
use crate::storage::needle_map::NeedleMapKind;
use crate::storage::super_block::ReplicaPlacement;
@ -25,6 +28,7 @@ pub struct Store {
pub public_url: String,
pub data_center: String,
pub rack: String,
pub ec_volumes: HashMap<VolumeId, EcVolume>,
}
impl Store {
@ -39,6 +43,7 @@ impl Store {
public_url: String::new(),
data_center: String::new(),
rack: String::new(),
ec_volumes: HashMap::new(),
}
}
@ -265,11 +270,125 @@ impl Store {
ids
}
// ---- EC volume operations ----
/// Mount EC shards for a volume.
pub fn mount_ec_shards(
&mut self,
vid: VolumeId,
collection: &str,
shard_ids: &[u32],
) -> Result<(), VolumeError> {
// Find the directory where the EC files live
let dir = self.find_ec_dir(vid, collection)
.ok_or_else(|| VolumeError::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("ec volume {} shards not found on disk", vid),
)))?;
let ec_vol = self.ec_volumes.entry(vid).or_insert_with(|| {
EcVolume::new(&dir, &dir, collection, vid).unwrap()
});
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&dir, collection, vid, shard_id as u8);
ec_vol.add_shard(shard).map_err(|e| VolumeError::Io(e))?;
}
Ok(())
}
/// Unmount EC shards for a volume.
pub fn unmount_ec_shards(
&mut self,
vid: VolumeId,
shard_ids: &[u32],
) {
if let Some(ec_vol) = self.ec_volumes.get_mut(&vid) {
for &shard_id in shard_ids {
ec_vol.remove_shard(shard_id as u8);
}
if ec_vol.shard_count() == 0 {
let mut vol = self.ec_volumes.remove(&vid).unwrap();
vol.close();
}
}
}
/// Delete EC shard files from disk.
pub fn delete_ec_shards(
&mut self,
vid: VolumeId,
collection: &str,
shard_ids: &[u32],
) {
// Delete shard files from disk
for loc in &self.locations {
for &shard_id in shard_ids {
let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id as u8);
let path = shard.file_name();
let _ = std::fs::remove_file(&path);
}
}
// Also unmount if mounted
self.unmount_ec_shards(vid, shard_ids);
// If all shards are gone, remove .ecx and .ecj files
let all_gone = self.check_all_ec_shards_deleted(vid, collection);
if all_gone {
for loc in &self.locations {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let _ = std::fs::remove_file(format!("{}.ecx", base));
let _ = std::fs::remove_file(format!("{}.ecj", base));
}
}
}
/// Check if all EC shard files have been deleted for a volume.
fn check_all_ec_shards_deleted(&self, vid: VolumeId, collection: &str) -> bool {
for loc in &self.locations {
for shard_id in 0..14u8 {
let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id);
if std::path::Path::new(&shard.file_name()).exists() {
return false;
}
}
}
true
}
/// Find the directory containing EC files for a volume.
pub fn find_ec_dir(&self, vid: VolumeId, collection: &str) -> Option<String> {
for loc in &self.locations {
let base = crate::storage::volume::volume_file_name(&loc.directory, collection, vid);
let ecx_path = format!("{}.ecx", base);
if std::path::Path::new(&ecx_path).exists() {
return Some(loc.directory.clone());
}
}
None
}
/// Find the directory containing a specific EC shard file.
pub fn find_ec_shard_dir(&self, vid: VolumeId, collection: &str, shard_id: u8) -> Option<String> {
for loc in &self.locations {
let shard = EcVolumeShard::new(&loc.directory, collection, vid, shard_id);
if std::path::Path::new(&shard.file_name()).exists() {
return Some(loc.directory.clone());
}
}
None
}
/// Close all locations and their volumes.
pub fn close(&mut self) {
for loc in &mut self.locations {
loc.close();
}
for (_, mut ec_vol) in self.ec_volumes.drain() {
ec_vol.close();
}
}
}

57
seaweed-volume/src/storage/volume.rs

@ -681,6 +681,63 @@ impl Volume {
Ok(needles)
}
/// Scan raw needle entries from the .dat file starting at `from_offset`.
/// Returns (needle_header_bytes, needle_body_bytes, append_at_ns) for each needle.
/// Used by VolumeTailSender to stream raw bytes.
pub fn scan_raw_needles_from(&self, from_offset: u64) -> Result<Vec<(Vec<u8>, Vec<u8>, u64)>, VolumeError> {
let dat_file = self.dat_file.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version;
let dat_size = dat_file.metadata()?.len();
let mut entries = Vec::new();
let mut offset = from_offset;
let mut dat = dat_file.try_clone()?;
while offset < dat_size {
// Read needle header (16 bytes)
let mut header = [0u8; NEEDLE_HEADER_SIZE];
dat.seek(SeekFrom::Start(offset))?;
match dat.read_exact(&mut header) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let (_cookie, _id, size) = Needle::parse_header(&header);
if size.0 == 0 && _id.is_empty() {
break;
}
let body_length = needle::needle_body_length(size, version);
let total_size = NEEDLE_HEADER_SIZE as u64 + body_length as u64;
if size.is_deleted() || size.0 <= 0 {
offset += total_size;
continue;
}
// Read body bytes
let mut body = vec![0u8; body_length as usize];
dat.seek(SeekFrom::Start(offset + NEEDLE_HEADER_SIZE as u64))?;
match dat.read_exact(&mut body) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
// Parse the needle to get append_at_ns
let mut full = vec![0u8; total_size as usize];
full[..NEEDLE_HEADER_SIZE].copy_from_slice(&header);
full[NEEDLE_HEADER_SIZE..].copy_from_slice(&body);
let mut n = Needle::default();
let _ = n.read_bytes(&full, offset as i64, size, version);
entries.push((header.to_vec(), body, n.append_at_ns));
offset += total_size;
}
Ok(entries)
}
/// Insert or update a needle index entry (for low-level blob writes).
pub fn put_needle_index(&mut self, key: NeedleId, offset: Offset, size: Size) -> Result<(), VolumeError> {
if let Some(ref mut nm) = self.nm {

Loading…
Cancel
Save