Browse Source

grpc: notify master after volume lifecycle changes

rust-volume-server
Chris Lu 3 days ago
parent
commit
573a42eb93
  1. 119
      seaweed-volume/src/server/grpc_server.rs
  2. 63
      test/volume_server/http/replication_lifecycle_test.go

119
seaweed-volume/src/server/grpc_server.rs

@ -509,6 +509,7 @@ impl VolumeServer for VolumeGrpcService {
version,
)
.map_err(|e| Status::internal(e.to_string()))?;
self.state.volume_state_notify.notify_one();
Ok(Response::new(volume_server_pb::AllocateVolumeResponse {}))
}
@ -630,6 +631,7 @@ impl VolumeServer for VolumeGrpcService {
store
.mount_volume_by_id(vid)
.map_err(|e| Status::internal(e.to_string()))?;
self.state.volume_state_notify.notify_one();
Ok(Response::new(volume_server_pb::VolumeMountResponse {}))
}
@ -641,7 +643,9 @@ impl VolumeServer for VolumeGrpcService {
let vid = VolumeId(request.into_inner().volume_id);
let mut store = self.state.store.write().unwrap();
// Go returns nil when volume is not found (idempotent unmount)
store.unmount_volume(vid);
if store.unmount_volume(vid) {
self.state.volume_state_notify.notify_one();
}
Ok(Response::new(volume_server_pb::VolumeUnmountResponse {}))
}
@ -664,6 +668,7 @@ impl VolumeServer for VolumeGrpcService {
store
.delete_volume(vid, req.only_empty)
.map_err(|e| Status::internal(e.to_string()))?;
self.state.volume_state_notify.notify_one();
Ok(Response::new(volume_server_pb::VolumeDeleteResponse {}))
}
@ -807,6 +812,7 @@ impl VolumeServer for VolumeGrpcService {
error: format!("volume configure mount {}: {}", vid, e),
}));
}
self.state.volume_state_notify.notify_one();
Ok(Response::new(volume_server_pb::VolumeConfigureResponse {
error: String::new(),
@ -1018,16 +1024,11 @@ impl VolumeServer for VolumeGrpcService {
{
Ok(resp) => {
if resp.volume_preallocate {
preallocate_size =
resp.volume_size_limit_m_b as i64 * 1024 * 1024;
preallocate_size = resp.volume_size_limit_m_b as i64 * 1024 * 1024;
}
}
Err(e) => {
tracing::warn!(
"get master {} configuration: {}",
state.master_url,
e
);
tracing::warn!("get master {} configuration: {}", state.master_url, e);
}
}
@ -1040,10 +1041,7 @@ impl VolumeServer for VolumeGrpcService {
))
})?;
file.set_len(preallocate_size as u64).map_err(|e| {
Status::internal(format!(
"preallocate volume file {}: {}",
dat_path, e
))
Status::internal(format!("preallocate volume file {}: {}", dat_path, e))
})?;
}
}
@ -1136,8 +1134,12 @@ impl VolumeServer for VolumeGrpcService {
// Find last_append_at_ns from copied files
let last_append_at_ns = if !has_remote_dat {
find_last_append_at_ns(&idx_path, &format!("{}.dat", data_base_name), vol_info.version)
.unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000)
find_last_append_at_ns(
&idx_path,
&format!("{}.dat", data_base_name),
vol_info.version,
)
.unwrap_or(vol_info.dat_file_timestamp_seconds * 1_000_000_000)
} else {
vol_info.dat_file_timestamp_seconds * 1_000_000_000
};
@ -1404,10 +1406,9 @@ impl VolumeServer for VolumeGrpcService {
p
};
target_file =
Some(std::fs::File::create(&path).map_err(|e| {
Status::internal(format!("failed to create file: {}", e))
})?);
target_file = Some(std::fs::File::create(&path).map_err(|e| {
Status::internal(format!("failed to create file: {}", e))
})?);
file_path = Some(path);
}
Some(volume_server_pb::receive_file_request::Data::FileContent(content)) => {
@ -1481,9 +1482,12 @@ impl VolumeServer for VolumeGrpcService {
.find_volume(vid)
.ok_or_else(|| Status::not_found(format!("not found volume id {}", vid)))?;
let blob = vol
.read_needle_blob(offset, size)
.map_err(|e| Status::internal(format!("read needle blob offset {} size {}: {}", offset, size.0, e)))?;
let blob = vol.read_needle_blob(offset, size).map_err(|e| {
Status::internal(format!(
"read needle blob offset {} size {}: {}",
offset, size.0, e
))
})?;
Ok(Response::new(volume_server_pb::ReadNeedleBlobResponse {
needle_blob: blob,
@ -1896,9 +1900,7 @@ impl VolumeServer for VolumeGrpcService {
// Check existing .vif for EC shard config (matching Go's MaybeLoadVolumeInfo)
let (data_shards, parity_shards) =
crate::storage::erasure_coding::ec_volume::read_ec_shard_config(
&dir, collection, vid,
);
crate::storage::erasure_coding::ec_volume::read_ec_shard_config(&dir, collection, vid);
if let Err(e) = crate::storage::erasure_coding::ec_encoder::write_ec_files(
&dir,
@ -2051,7 +2053,9 @@ impl VolumeServer for VolumeGrpcService {
// Determine data/parity shard config from rebuild dir
let (data_shards, parity_shards) =
crate::storage::erasure_coding::ec_volume::read_ec_shard_config(
&rebuild_dir, collection, vid,
&rebuild_dir,
collection,
vid,
);
let total_shards = data_shards + parity_shards;
@ -2059,7 +2063,10 @@ impl VolumeServer for VolumeGrpcService {
let mut missing: Vec<u32> = Vec::new();
for shard_id in 0..total_shards as u8 {
let shard = crate::storage::erasure_coding::ec_shard::EcVolumeShard::new(
&rebuild_dir, collection, vid, shard_id,
&rebuild_dir,
collection,
vid,
shard_id,
);
let mut found = std::path::Path::new(&shard.file_name()).exists();
if !found {
@ -2687,8 +2694,9 @@ impl VolumeServer for VolumeGrpcService {
}
// Walk .ecx index to compute file counts and total size (matching Go's WalkIndex)
let (file_count, file_deleted_count, volume_size) =
ec_vol.walk_ecx_stats().map_err(|e| Status::internal(e.to_string()))?;
let (file_count, file_deleted_count, volume_size) = ec_vol
.walk_ecx_stats()
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(
volume_server_pb::VolumeEcShardsInfoResponse {
@ -3158,9 +3166,7 @@ impl VolumeServer for VolumeGrpcService {
match local_handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(Status::internal(e)),
Err(e) => {
return Err(Status::internal(format!("local write task failed: {}", e)))
}
Err(e) => return Err(Status::internal(format!("local write task failed: {}", e))),
}
let e_tag = n.etag();
@ -3170,9 +3176,7 @@ impl VolumeServer for VolumeGrpcService {
match handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(Status::internal(e)),
Err(e) => {
return Err(Status::internal(format!("replication task failed: {}", e)))
}
Err(e) => return Err(Status::internal(format!("replication task failed: {}", e))),
}
}
@ -3285,9 +3289,9 @@ impl VolumeServer for VolumeGrpcService {
let mut details: Vec<String> = Vec::new();
for vid in &vids {
let ecv = store.find_ec_volume(*vid).ok_or_else(|| {
Status::not_found(format!("EC volume id {} not found", vid.0))
})?;
let ecv = store
.find_ec_volume(*vid)
.ok_or_else(|| Status::not_found(format!("EC volume id {} not found", vid.0)))?;
let collection = ecv.collection.clone();
match mode {
@ -3319,7 +3323,9 @@ impl VolumeServer for VolumeGrpcService {
total_files += files;
let (data_shards, parity_shards) =
crate::storage::erasure_coding::ec_volume::read_ec_shard_config(
&dir, &collection, *vid,
&dir,
&collection,
*vid,
);
match crate::storage::erasure_coding::ec_encoder::verify_ec_shards(
@ -3373,8 +3379,7 @@ impl VolumeServer for VolumeGrpcService {
let mut stripes: Vec<Result<volume_server_pb::QueriedStripe, Status>> = Vec::new();
for fid_str in &req.from_file_ids {
let file_id =
needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?;
let file_id = needle::FileId::parse(fid_str).map_err(|e| Status::internal(e))?;
let mut n = Needle {
id: file_id.key,
@ -3391,7 +3396,10 @@ impl VolumeServer for VolumeGrpcService {
// Cookie mismatch: log and return empty stream (matching Go behavior where err is nil)
if n.cookie != original_cookie {
tracing::info!("volume query failed to read fid cookie {}: cookie mismatch", fid_str);
tracing::info!(
"volume query failed to read fid cookie {}: cookie mismatch",
fid_str
);
let stream = tokio_stream::iter(stripes);
return Ok(Response::new(Box::pin(stream)));
}
@ -3544,12 +3552,7 @@ impl VolumeServer for VolumeGrpcService {
},
));
}
Err(_) => {
return Err(Status::not_found(format!(
"needle not found {}",
needle_id
)))
}
Err(_) => return Err(Status::not_found(format!("needle not found {}", needle_id))),
}
}
@ -3573,10 +3576,7 @@ impl VolumeServer for VolumeGrpcService {
));
}
Ok(None) => {
return Err(Status::not_found(format!(
"needle not found {}",
needle_id
)));
return Err(Status::not_found(format!("needle not found {}", needle_id)));
}
Err(e) => {
return Err(Status::internal(format!(
@ -4131,8 +4131,7 @@ mod tests {
(format!("http://{}", addr), shutdown_tx)
}
fn make_remote_only_service()
-> (
fn make_remote_only_service() -> (
VolumeGrpcService,
TempDir,
tokio::sync::oneshot::Sender<()>,
@ -4273,7 +4272,13 @@ mod tests {
cli_white_list: vec![],
});
(VolumeGrpcService { state }, tmp, shutdown_tx, dat_bytes, super_block_size)
(
VolumeGrpcService { state },
tmp,
shutdown_tx,
dat_bytes,
super_block_size,
)
}
fn make_local_service_with_volume(
@ -4417,10 +4422,8 @@ mod tests {
.unwrap();
let vif_path = tmp.path().join("ttl_1.vif");
let vif: crate::storage::volume::VifVolumeInfo = serde_json::from_str(
&std::fs::read_to_string(vif_path).unwrap(),
)
.unwrap();
let vif: crate::storage::volume::VifVolumeInfo =
serde_json::from_str(&std::fs::read_to_string(vif_path).unwrap()).unwrap();
assert!(vif.expire_at_sec >= before + ttl.to_seconds());
assert!(vif.expire_at_sec <= before + ttl.to_seconds() + 5);
}

63
test/volume_server/http/replication_lifecycle_test.go

@ -0,0 +1,63 @@
package volume_server_http_test
import (
"context"
"net/http"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func TestReplicatedUploadSucceedsImmediatelyAfterAllocate(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartDualVolumeCluster(t, matrix.P1())
conn0, grpc0 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(0))
defer conn0.Close()
conn1, grpc1 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress(1))
defer conn1.Close()
const volumeID = uint32(115)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req := &volume_server_pb.AllocateVolumeRequest{
VolumeId: volumeID,
Replication: "001",
Version: uint32(needle.GetCurrentVersion()),
}
if _, err := grpc0.AllocateVolume(ctx, req); err != nil {
t.Fatalf("allocate replicated volume on node0: %v", err)
}
if _, err := grpc1.AllocateVolume(ctx, req); err != nil {
t.Fatalf("allocate replicated volume on node1: %v", err)
}
client := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 881001, 0x0B0C0D0E)
payload := []byte("replicated-upload-after-allocate")
uploadResp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(0), fid, payload)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("replicated upload expected 201, got %d", uploadResp.StatusCode)
}
replicaReadURL := clusterHarness.VolumeAdminURL(1) + "/" + fid
var replicaBody []byte
if !waitForHTTPStatus(t, client, replicaReadURL, http.StatusOK, 10*time.Second, func(resp *http.Response) {
replicaBody = framework.ReadAllAndClose(t, resp)
}) {
t.Fatalf("replica did not become readable within deadline")
}
if string(replicaBody) != string(payload) {
t.Fatalf("replica body mismatch: got %q want %q", string(replicaBody), string(payload))
}
}
Loading…
Cancel
Save