diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 4ab295322..e8c66a521 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1114,7 +1114,7 @@ impl VolumeServer for VolumeGrpcService { // Find last_append_at_ns from copied files let last_append_at_ns = if !has_remote_dat { - find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name)) + find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name), vol_info.version) .unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000) } else { vol_info.dat_file_timestamp_seconds * 1_000_000_000 @@ -1328,73 +1328,91 @@ impl VolumeServer for VolumeGrpcService { let mut stream = request.into_inner(); let mut target_file: Option = None; - let mut file_path; + let mut file_path: Option = None; let mut bytes_written: u64 = 0; - while let Some(req) = stream.message().await? { - match req.data { - Some(volume_server_pb::receive_file_request::Data::Info(info)) => { - // Determine file path - if info.is_ec_volume { - let store = self.state.store.read().unwrap(); - let dir = store - .locations - .first() - .map(|loc| loc.directory.clone()) - .unwrap_or_default(); - drop(store); - let ec_base = if info.collection.is_empty() { - format!("{}", info.volume_id) + let result: Result<(), Status> = async { + while let Some(req) = stream.message().await? { + match req.data { + Some(volume_server_pb::receive_file_request::Data::Info(info)) => { + // Determine file path + let path = if info.is_ec_volume { + let store = self.state.store.read().unwrap(); + let dir = store + .locations + .first() + .map(|loc| loc.directory.clone()) + .unwrap_or_default(); + drop(store); + let ec_base = if info.collection.is_empty() { + format!("{}", info.volume_id) + } else { + format!("{}_{}", info.collection, info.volume_id) + }; + format!("{}/{}{}", dir, ec_base, info.ext) } else { - format!("{}_{}", info.collection, info.volume_id) + let store = self.state.store.read().unwrap(); + let (_, v) = + store.find_volume(VolumeId(info.volume_id)).ok_or_else(|| { + Status::not_found(format!( + "volume {} not found", + info.volume_id + )) + })?; + let p = v.file_name(&info.ext); + drop(store); + p }; - file_path = format!("{}/{}{}", dir, ec_base, info.ext); - } else { - let store = self.state.store.read().unwrap(); - let (_, v) = - store.find_volume(VolumeId(info.volume_id)).ok_or_else(|| { - Status::not_found(format!("volume {} not found", info.volume_id)) + + target_file = + Some(std::fs::File::create(&path).map_err(|e| { + Status::internal(format!("failed to create file: {}", e)) + })?); + file_path = Some(path); + } + Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => { + if let Some(ref mut f) = target_file { + use std::io::Write; + let n = f.write(&content).map_err(|e| { + Status::internal(format!("failed to write file: {}", e)) })?; - file_path = v.file_name(&info.ext); - drop(store); + bytes_written += n as u64; + } else { + return Err(Status::invalid_argument( + "file info must be sent first", + )); + } } - - target_file = - Some(std::fs::File::create(&file_path).map_err(|e| { - Status::internal(format!("failed to create file: {}", e)) - })?); - } - Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => { - if let Some(ref mut f) = target_file { - use std::io::Write; - let n = f.write(&content).map_err(|e| { - Status::internal(format!("failed to write file: {}", e)) - })?; - bytes_written += n as u64; - } else { - return Ok(Response::new(volume_server_pb::ReceiveFileResponse { - error: "file info must be sent first".to_string(), - bytes_written: 0, - })); + None => { + return Err(Status::invalid_argument("unknown message type")); } } - None => { - return Ok(Response::new(volume_server_pb::ReceiveFileResponse { - error: "unknown message type".to_string(), - bytes_written: 0, - })); - } } + Ok(()) } + .await; - if let Some(ref f) = target_file { - let _ = f.sync_all(); + match result { + Ok(()) => { + if let Some(ref f) = target_file { + let _ = f.sync_all(); + } + Ok(Response::new(volume_server_pb::ReceiveFileResponse { + error: String::new(), + bytes_written, + })) + } + Err(e) => { + // Clean up partial file on stream error (Go parity: closes file, removes it) + if let Some(f) = target_file.take() { + drop(f); + } + if let Some(ref p) = file_path { + let _ = std::fs::remove_file(p); + } + Err(e) + } } - - Ok(Response::new(volume_server_pb::ReceiveFileResponse { - error: String::new(), - bytes_written, - })) } async fn read_needle_blob( @@ -1489,49 +1507,57 @@ impl VolumeServer for VolumeGrpcService { request: Request, ) -> Result, Status> { let req = request.into_inner(); - let mut results: Vec> = Vec::new(); + let state = self.state.clone(); - let store = self.state.store.read().unwrap(); - for &raw_vid in &req.volume_ids { - let vid = VolumeId(raw_vid); - let v = match store.find_volume(vid) { - Some((_, v)) => v, - None => { - // Push error as last item so previous volumes' data is streamed first - results.push(Err(Status::not_found(format!( - "not found volume id {}", - vid - )))); - break; - } - }; + let (tx, rx) = tokio::sync::mpsc::channel(32); - let needles = match v.read_all_needles() { - Ok(n) => n, - Err(e) => { - results.push(Err(Status::internal(e.to_string()))); - break; - } - }; + // Stream needles lazily via a blocking task (matches Go's scanner pattern) + tokio::task::spawn_blocking(move || { + let store = state.store.read().unwrap(); + for &raw_vid in &req.volume_ids { + let vid = VolumeId(raw_vid); + let v = match store.find_volume(vid) { + Some((_, v)) => v, + None => { + let _ = tx.blocking_send(Err(Status::not_found(format!( + "not found volume id {}", + vid + )))); + return; + } + }; - for n in needles { - let compressed = n.is_compressed(); - results.push(Ok(volume_server_pb::ReadAllNeedlesResponse { - volume_id: raw_vid, - needle_id: n.id.into(), - cookie: n.cookie.0, - needle_blob: n.data, - needle_blob_compressed: compressed, - last_modified: n.last_modified, - crc: n.checksum.0, - name: n.name, - mime: n.mime, - })); + let needles = match v.read_all_needles() { + Ok(n) => n, + Err(e) => { + let _ = tx.blocking_send(Err(Status::internal(e.to_string()))); + return; + } + }; + + for n in needles { + let compressed = n.is_compressed(); + if tx + .blocking_send(Ok(volume_server_pb::ReadAllNeedlesResponse { + volume_id: raw_vid, + needle_id: n.id.into(), + cookie: n.cookie.0, + needle_blob: n.data, + needle_blob_compressed: compressed, + last_modified: n.last_modified, + crc: n.checksum.0, + name: n.name, + mime: n.mime, + })) + .is_err() + { + return; // receiver dropped + } + } } - } - drop(store); + }); - let stream = tokio_stream::iter(results); + let stream = tokio_stream::wrappers::ReceiverStream::new(rx); Ok(Response::new(Box::pin(stream))) } @@ -1856,13 +1882,89 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); let collection = &req.collection; - // Find the directory with EC files + // Search ALL locations for shards, pick the best rebuild location + // (most shards + has .ecx), collect additional dirs. + // Matches Go's multi-location search in VolumeEcShardsRebuild. + let base_name = if collection.is_empty() { + format!("{}", vid.0) + } else { + format!("{}_{}", collection, vid.0) + }; + + struct LocInfo { + dir: String, + idx_dir: String, + shard_count: usize, + has_ecx: bool, + } + let store = self.state.store.read().unwrap(); - let dir = store.find_ec_dir(vid, collection); + let mut loc_infos: Vec = Vec::new(); + + for loc in &store.locations { + // Count shards in this location's directory + let mut shard_count = 0usize; + if let Ok(entries) = std::fs::read_dir(&loc.directory) { + for entry in entries.flatten() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + if name.starts_with(&format!("{}.ec", base_name)) { + let suffix = &name[base_name.len() + 3..]; + if suffix.len() == 2 && suffix.chars().all(|c| c.is_ascii_digit()) { + shard_count += 1; + } + } + } + } + + // Check for .ecx in idx_directory first, then data directory + let idx_base = format!("{}/{}", loc.idx_directory, base_name); + let data_base = format!("{}/{}", loc.directory, base_name); + let has_ecx = std::path::Path::new(&format!("{}.ecx", idx_base)).exists() + || (loc.idx_directory != loc.directory + && std::path::Path::new(&format!("{}.ecx", data_base)).exists()); + + if shard_count == 0 && !has_ecx { + continue; + } + + loc_infos.push(LocInfo { + dir: loc.directory.clone(), + idx_dir: loc.idx_directory.clone(), + shard_count, + has_ecx, + }); + } drop(store); - let dir = match dir { - Some(d) => d, + if loc_infos.is_empty() { + return Ok(Response::new( + volume_server_pb::VolumeEcShardsRebuildResponse { + rebuilt_shard_ids: vec![], + }, + )); + } + + // Pick rebuild location: has .ecx and most shards + let mut rebuild_loc_idx: Option = None; + let mut other_dirs: Vec = Vec::new(); + + for (i, info) in loc_infos.iter().enumerate() { + if info.has_ecx + && (rebuild_loc_idx.is_none() + || info.shard_count > loc_infos[rebuild_loc_idx.unwrap()].shard_count) + { + if let Some(prev) = rebuild_loc_idx { + other_dirs.push(loc_infos[prev].dir.clone()); + } + rebuild_loc_idx = Some(i); + } else { + other_dirs.push(info.dir.clone()); + } + } + + let rebuild_loc_idx = match rebuild_loc_idx { + Some(i) => i, None => { return Ok(Response::new( volume_server_pb::VolumeEcShardsRebuildResponse { @@ -1872,19 +1974,35 @@ impl VolumeServer for VolumeGrpcService { } }; - // Check which shards are missing + let rebuild_dir = loc_infos[rebuild_loc_idx].dir.clone(); + let rebuild_idx_dir = loc_infos[rebuild_loc_idx].idx_dir.clone(); + + // Determine data/parity shard config from rebuild dir let (data_shards, parity_shards) = crate::storage::erasure_coding::ec_volume::read_ec_shard_config( - &dir, collection, vid, + &rebuild_dir, collection, vid, ); let total_shards = data_shards + parity_shards; + // Check which shards are missing (check rebuild dir and all other dirs) let mut missing: Vec = Vec::new(); for shard_id in 0..total_shards as u8 { let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new( - &dir, collection, vid, shard_id, + &rebuild_dir, collection, vid, shard_id, ); - if !std::path::Path::new(&shard.file_name()).exists() { + let mut found = std::path::Path::new(&shard.file_name()).exists(); + if !found { + for other_dir in &other_dirs { + let other_shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new( + other_dir, collection, vid, shard_id, + ); + if std::path::Path::new(&other_shard.file_name()).exists() { + found = true; + break; + } + } + } + if !found { missing.push(shard_id as u32); } } @@ -1897,25 +2015,34 @@ impl VolumeServer for VolumeGrpcService { )); } - // Rebuild missing shards by regenerating all missing EC files via Reed-Solomon reconstruct + // Rebuild missing shards, searching all locations for input shards crate::storage::erasure_coding::ec_encoder::rebuild_ec_files( - &dir, + &rebuild_dir, collection, vid, &missing, data_shards as usize, parity_shards as usize, ) - .map_err(|e| Status::internal(format!("rebuild ec shards: {}", e)))?; + .map_err(|e| Status::internal(format!("RebuildEcFiles: {}", e)))?; + + // Rebuild .ecx; use idx_directory with fallback to data directory + let ecx_base = format!("{}/{}", rebuild_idx_dir, base_name); + let ecx_rebuild_dir = if std::path::Path::new(&format!("{}.ecx", ecx_base)).exists() { + rebuild_idx_dir + } else if rebuild_idx_dir != rebuild_dir { + rebuild_dir.clone() + } else { + rebuild_idx_dir + }; - // Rebuild the .ecx index file from the data shards (matching Go's RebuildEcxFile) crate::storage::erasure_coding::ec_encoder::rebuild_ecx_file( - &dir, + &ecx_rebuild_dir, collection, vid, data_shards as usize, ) - .map_err(|e| Status::internal(format!("rebuild ecx file: {}", e)))?; + .map_err(|e| Status::internal(format!("RebuildEcxFile: {}", e)))?; Ok(Response::new( volume_server_pb::VolumeEcShardsRebuildResponse { @@ -1932,19 +2059,47 @@ impl VolumeServer for VolumeGrpcService { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - // Validate disk_id - let (_loc_count, dest_dir) = { + // Select target location matching Go's 3-tier fallback: + // When disk_id > 0: use that specific location + // When disk_id == 0 (unset): (1) location with existing EC shards, (2) any HDD, (3) any + let (dest_dir, dest_idx_dir) = { let store = self.state.store.read().unwrap(); let count = store.locations.len(); - let dir = if (req.disk_id as usize) < count { - store.locations[req.disk_id as usize].directory.clone() + + if req.disk_id > 0 { + // Explicit disk selection + if (req.disk_id as usize) >= count { + return Err(Status::invalid_argument(format!( + "invalid disk_id {}: only have {} disks", + req.disk_id, count + ))); + } + let loc = &store.locations[req.disk_id as usize]; + (loc.directory.clone(), loc.idx_directory.clone()) } else { - return Err(Status::invalid_argument(format!( - "invalid disk_id {}: only have {} disks", - req.disk_id, count - ))); - }; - (count, dir) + // Auto-select: prefer location with existing EC shards for this volume + let loc_idx = store + .find_free_location_predicate(|loc| loc.has_ec_volume(vid)) + .or_else(|| { + // Fall back to any HDD location + store.find_free_location_predicate(|loc| { + loc.disk_type == DiskType::HardDrive + }) + }) + .or_else(|| { + // Fall back to any location + store.find_free_location_predicate(|_| true) + }); + match loc_idx { + Some(i) => { + let loc = &store.locations[i]; + (loc.directory.clone(), loc.idx_directory.clone()) + } + None => { + return Err(Status::internal("no space left".to_string())); + } + } + } }; // Connect to source and copy shard files via CopyFile @@ -2045,7 +2200,7 @@ impl VolumeServer for VolumeGrpcService { let file_path = { let base = - crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + crate::storage::volume::volume_file_name(&dest_idx_dir, &req.collection, vid); format!("{}.ecx", base) }; let mut file = std::fs::File::create(&file_path) @@ -2086,7 +2241,7 @@ impl VolumeServer for VolumeGrpcService { let file_path = { let base = - crate::storage::volume::volume_file_name(&dest_dir, &req.collection, vid); + crate::storage::volume::volume_file_name(&dest_idx_dir, &req.collection, vid); format!("{}.ecj", base) }; let mut file = std::fs::File::create(&file_path) @@ -2168,26 +2323,16 @@ impl VolumeServer for VolumeGrpcService { let req = request.into_inner(); let vid = VolumeId(req.volume_id); - // Check all requested shards exist on disk - { - let store = self.state.store.read().unwrap(); - for &shard_id in &req.shard_ids { - if store - .find_ec_shard_dir(vid, &req.collection, shard_id as u8) - .is_none() - { - return Err(Status::not_found(format!( - "ec volume {} shards {:?} not found", - req.volume_id, req.shard_ids - ))); - } - } - } - + // Mount one shard at a time, returning error on first failure. + // Matches Go: for _, shardId := range req.ShardIds { err = vs.store.MountEcShards(...) } let mut store = self.state.store.write().unwrap(); - store - .mount_ec_shards(vid, &req.collection, &req.shard_ids) - .map_err(|e| Status::internal(e.to_string()))?; + for &shard_id in &req.shard_ids { + store + .mount_ec_shard(vid, &req.collection, shard_id) + .map_err(|e| { + Status::internal(format!("mount {}.{}: {}", req.volume_id, shard_id, e)) + })?; + } Ok(Response::new( volume_server_pb::VolumeEcShardsMountResponse {}, @@ -2200,8 +2345,15 @@ impl VolumeServer for VolumeGrpcService { ) -> Result, Status> { let req = request.into_inner(); let vid = VolumeId(req.volume_id); + + // Unmount one shard at a time, returning error on first failure. + // Matches Go: for _, shardId := range req.ShardIds { err = vs.store.UnmountEcShards(...) } let mut store = self.state.store.write().unwrap(); - store.unmount_ec_shards(vid, &req.shard_ids); + for &shard_id in &req.shard_ids { + store.unmount_ec_shard(vid, shard_id).map_err(|e| { + Status::internal(format!("unmount {}.{}: {}", req.volume_id, shard_id, e)) + })?; + } Ok(Response::new( volume_server_pb::VolumeEcShardsUnmountResponse {}, )) @@ -2250,21 +2402,40 @@ impl VolumeServer for VolumeGrpcService { )) })?; - let read_size = if req.size > 0 { + let total_size = if req.size > 0 { req.size as usize } else { 1024 * 1024 }; - let mut buf = vec![0u8; read_size]; - let n = shard - .read_at(&mut buf, req.offset as u64) - .map_err(|e| Status::internal(e.to_string()))?; - buf.truncate(n); - let results = vec![Ok(volume_server_pb::VolumeEcShardReadResponse { - data: buf, - is_deleted: false, - })]; + // Stream in 2MB chunks (matching Go's BufferSizeLimit) + const BUFFER_SIZE_LIMIT: usize = 2 * 1024 * 1024; + let mut results: Vec> = + Vec::new(); + let mut bytes_read: usize = 0; + let mut current_offset = req.offset as u64; + + while bytes_read < total_size { + let chunk_size = std::cmp::min(BUFFER_SIZE_LIMIT, total_size - bytes_read); + let mut buf = vec![0u8; chunk_size]; + let n = shard + .read_at(&mut buf, current_offset) + .map_err(|e| Status::internal(e.to_string()))?; + if n == 0 { + break; + } + buf.truncate(n); + bytes_read += n; + current_offset += n as u64; + results.push(Ok(volume_server_pb::VolumeEcShardReadResponse { + data: buf, + is_deleted: false, + })); + if n < chunk_size { + break; // short read means EOF + } + } + Ok(Response::new(Box::pin(tokio_stream::iter(results)))) } @@ -2277,8 +2448,19 @@ impl VolumeServer for VolumeGrpcService { let vid = VolumeId(req.volume_id); let needle_id = NeedleId(req.file_key); + // Go checks if needle is already deleted (via ecx) before journaling. + // Search all locations for the EC volume. let mut store = self.state.store.write().unwrap(); if let Some(ec_vol) = store.find_ec_volume_mut(vid) { + // Check if already deleted via ecx index + if let Ok(Some((_offset, size))) = ec_vol.find_needle_from_ecx(needle_id) { + if size.is_deleted() { + // Already deleted, no-op + return Ok(Response::new( + volume_server_pb::VolumeEcBlobDeleteResponse {}, + )); + } + } ec_vol .journal_delete(needle_id) .map_err(|e| Status::internal(e.to_string()))?; @@ -2379,19 +2561,8 @@ impl VolumeServer for VolumeGrpcService { ) .map_err(|e| Status::internal(format!("WriteIdxFileFromEcIndex: {}", e)))?; - // Unmount EC shards and mount the reconstructed volume - { - let mut store = self.state.store.write().unwrap(); - // Remove EC volume - if let Some(mut ec_vol) = store.remove_ec_volume(vid) { - ec_vol.close(); - } - // Unmount existing volume if any, then mount fresh - store.unmount_volume(vid); - store - .mount_volume(vid, &collection, DiskType::HardDrive) - .map_err(|e| Status::internal(format!("mount volume: {}", e)))?; - } + // Go does NOT unmount EC shards or mount the volume here. + // The caller (ec.balance / ec.decode) handles mount/unmount separately. Ok(Response::new( volume_server_pb::VolumeEcShardsToVolumeResponse {}, @@ -3624,7 +3795,12 @@ fn check_copy_file_size(path: &str, expected: u64) -> Result<(), Status> { } /// Find the last append timestamp from copied .idx and .dat files. -fn find_last_append_at_ns(idx_path: &str, dat_path: &str) -> Option { +/// Go returns (0, nil) for versions < Version3 since timestamps only exist in V3. +fn find_last_append_at_ns(idx_path: &str, dat_path: &str, version: u32) -> Option { + // Only Version3 has the append timestamp in the needle tail + if version < VERSION_3.0 as u32 { + return None; + } use std::io::{Read, Seek, SeekFrom}; let mut idx_file = std::fs::File::open(idx_path).ok()?;