Browse Source

Match Go ReadAllNeedles scan order

rust-volume-server
Chris Lu 4 days ago
parent
commit
8dd2899bb6
  1. 87
      seaweed-volume/src/storage/volume.rs
  2. 66
      test/volume_server/grpc/data_stream_success_test.go

87
seaweed-volume/src/storage/volume.rs

@ -1412,20 +1412,52 @@ impl Volume {
/// Read all live needles from the volume (for ReadAllNeedles streaming RPC). /// Read all live needles from the volume (for ReadAllNeedles streaming RPC).
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> { pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
let version = self.super_block.version;
let dat_size = self.current_dat_file_size()? as i64;
let mut needles = Vec::new(); let mut needles = Vec::new();
for (key, nv) in nm.iter_entries() {
if !nv.size.is_valid() {
continue; // skip deleted
let mut offset = self.super_block.block_size() as i64;
while offset < dat_size {
let mut header = [0u8; NEEDLE_HEADER_SIZE];
match self.read_exact_at_backend(&mut header, offset as u64) {
Ok(()) => {}
Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
let (_cookie, key, size) = Needle::parse_header(&header);
if size.0 == 0 && key.is_empty() {
break;
}
let body_length = needle::needle_body_length(size, version);
let total_size = NEEDLE_HEADER_SIZE as i64 + body_length as i64;
if size.is_deleted() || size.0 <= 0 {
offset += total_size;
continue;
}
let Some(nv) = nm.get(key) else {
offset += total_size;
continue;
};
if nv.offset.to_actual_offset() != offset {
offset += total_size;
continue;
} }
let mut n = Needle { let mut n = Needle {
id: key, id: key,
..Needle::default() ..Needle::default()
}; };
if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size)
{
needles.push(n);
}
let mut read_option = ReadOption::default();
self.read_needle_data_at_unlocked(&mut n, offset, size, &mut read_option)?;
needles.push(n);
offset += total_size;
} }
Ok(needles) Ok(needles)
} }
@ -2833,6 +2865,47 @@ mod tests {
assert!(!Path::new(&idx_path).exists()); assert!(!Path::new(&idx_path).exists());
} }
#[test]
fn test_read_all_needles_uses_dat_order_for_live_offsets() {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
let mut first = Needle {
id: NeedleId(10),
cookie: Cookie(0x11223344),
data: b"first".to_vec(),
data_size: 5,
..Needle::default()
};
v.write_needle(&mut first, true).unwrap();
let mut second = Needle {
id: NeedleId(20),
cookie: Cookie(0x55667788),
data: b"second".to_vec(),
data_size: 6,
..Needle::default()
};
v.write_needle(&mut second, true).unwrap();
let mut first_overwrite = Needle {
id: NeedleId(10),
cookie: Cookie(0x11223344),
data: b"first-overwrite".to_vec(),
data_size: 15,
..Needle::default()
};
v.write_needle(&mut first_overwrite, true).unwrap();
let needles = v.read_all_needles().unwrap();
let ids: Vec<u64> = needles.iter().map(|n| u64::from(n.id)).collect();
let bodies: Vec<&[u8]> = needles.iter().map(|n| n.data.as_slice()).collect();
assert_eq!(ids, vec![20, 10]);
assert_eq!(bodies, vec![b"second".as_slice(), b"first-overwrite".as_slice()]);
}
#[test] #[test]
fn test_get_append_at_ns() { fn test_get_append_at_ns() {
let t1 = get_append_at_ns(0); let t1 = get_append_at_ns(0);

66
test/volume_server/grpc/data_stream_success_test.go

@ -3,6 +3,7 @@ package volume_server_grpc_test
import ( import (
"context" "context"
"io" "io"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -230,6 +231,71 @@ func TestReadAllNeedlesExistingThenMissingVolumeAbortsStream(t *testing.T) {
} }
} }
func TestReadAllNeedlesPreservesDatOrderAcrossOverwrite(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(86)
const firstNeedleID = uint64(444551)
const secondNeedleID = uint64(444552)
const firstCookie = uint32(0xAA22BB33)
const secondCookie = uint32(0xCC44DD55)
framework.AllocateVolume(t, grpcClient, volumeID, "")
client := framework.NewHTTPClient()
uploads := []struct {
fid string
body string
}{
{fid: framework.NewFileID(volumeID, firstNeedleID, firstCookie), body: "read-all-first"},
{fid: framework.NewFileID(volumeID, secondNeedleID, secondCookie), body: "read-all-second"},
{fid: framework.NewFileID(volumeID, firstNeedleID, firstCookie), body: "read-all-first-overwrite"},
}
for _, upload := range uploads {
resp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), upload.fid, []byte(upload.body))
_ = framework.ReadAllAndClose(t, resp)
if resp.StatusCode != 201 {
t.Fatalf("upload for %s expected 201, got %d", upload.fid, resp.StatusCode)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{VolumeIds: []uint32{volumeID}})
if err != nil {
t.Fatalf("ReadAllNeedles start failed: %v", err)
}
var orderedIDs []uint64
var orderedBodies []string
for {
msg, recvErr := stream.Recv()
if recvErr == io.EOF {
break
}
if recvErr != nil {
t.Fatalf("ReadAllNeedles recv failed: %v", recvErr)
}
orderedIDs = append(orderedIDs, msg.GetNeedleId())
orderedBodies = append(orderedBodies, string(msg.GetNeedleBlob()))
}
wantIDs := []uint64{secondNeedleID, firstNeedleID}
wantBodies := []string{"read-all-second", "read-all-first-overwrite"}
if !reflect.DeepEqual(orderedIDs, wantIDs) {
t.Fatalf("ReadAllNeedles order mismatch: got %v want %v", orderedIDs, wantIDs)
}
if !reflect.DeepEqual(orderedBodies, wantBodies) {
t.Fatalf("ReadAllNeedles bodies mismatch: got %v want %v", orderedBodies, wantBodies)
}
}
func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte { func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte {
t.Helper() t.Helper()

Loading…
Cancel
Save