diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index 6af5c725a..c18e5d34d 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -1412,20 +1412,52 @@ impl Volume { /// Read all live needles from the volume (for ReadAllNeedles streaming RPC). pub fn read_all_needles(&self) -> Result, VolumeError> { + let _guard = self.data_file_access_control.read_lock(); let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let version = self.super_block.version; + let dat_size = self.current_dat_file_size()? as i64; let mut needles = Vec::new(); - for (key, nv) in nm.iter_entries() { - if !nv.size.is_valid() { - continue; // skip deleted + let mut offset = self.super_block.block_size() as i64; + + while offset < dat_size { + let mut header = [0u8; NEEDLE_HEADER_SIZE]; + match self.read_exact_at_backend(&mut header, offset as u64) { + Ok(()) => {} + Err(VolumeError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e), + } + + let (_cookie, key, size) = Needle::parse_header(&header); + if size.0 == 0 && key.is_empty() { + break; + } + + let body_length = needle::needle_body_length(size, version); + let total_size = NEEDLE_HEADER_SIZE as i64 + body_length as i64; + + if size.is_deleted() || size.0 <= 0 { + offset += total_size; + continue; + } + + let Some(nv) = nm.get(key) else { + offset += total_size; + continue; + }; + if nv.offset.to_actual_offset() != offset { + offset += total_size; + continue; } + let mut n = Needle { id: key, ..Needle::default() }; - if let Ok(()) = self.read_needle_data_at(&mut n, nv.offset.to_actual_offset(), nv.size) - { - needles.push(n); - } + let mut read_option = ReadOption::default(); + self.read_needle_data_at_unlocked(&mut n, offset, size, &mut read_option)?; + needles.push(n); + + offset += total_size; } Ok(needles) } @@ -2833,6 +2865,47 @@ mod tests { assert!(!Path::new(&idx_path).exists()); } + #[test] + fn test_read_all_needles_uses_dat_order_for_live_offsets() { + let tmp = TempDir::new().unwrap(); + let dir = tmp.path().to_str().unwrap(); + let mut v = make_test_volume(dir); + + let mut first = Needle { + id: NeedleId(10), + cookie: Cookie(0x11223344), + data: b"first".to_vec(), + data_size: 5, + ..Needle::default() + }; + v.write_needle(&mut first, true).unwrap(); + + let mut second = Needle { + id: NeedleId(20), + cookie: Cookie(0x55667788), + data: b"second".to_vec(), + data_size: 6, + ..Needle::default() + }; + v.write_needle(&mut second, true).unwrap(); + + let mut first_overwrite = Needle { + id: NeedleId(10), + cookie: Cookie(0x11223344), + data: b"first-overwrite".to_vec(), + data_size: 15, + ..Needle::default() + }; + v.write_needle(&mut first_overwrite, true).unwrap(); + + let needles = v.read_all_needles().unwrap(); + let ids: Vec = needles.iter().map(|n| u64::from(n.id)).collect(); + let bodies: Vec<&[u8]> = needles.iter().map(|n| n.data.as_slice()).collect(); + + assert_eq!(ids, vec![20, 10]); + assert_eq!(bodies, vec![b"second".as_slice(), b"first-overwrite".as_slice()]); + } + #[test] fn test_get_append_at_ns() { let t1 = get_append_at_ns(0); diff --git a/test/volume_server/grpc/data_stream_success_test.go b/test/volume_server/grpc/data_stream_success_test.go index 1f54c1900..713b9ba45 100644 --- a/test/volume_server/grpc/data_stream_success_test.go +++ b/test/volume_server/grpc/data_stream_success_test.go @@ -3,6 +3,7 @@ package volume_server_grpc_test import ( "context" "io" + "reflect" "strings" "testing" "time" @@ -230,6 +231,71 @@ func TestReadAllNeedlesExistingThenMissingVolumeAbortsStream(t *testing.T) { } } +func TestReadAllNeedlesPreservesDatOrderAcrossOverwrite(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(86) + const firstNeedleID = uint64(444551) + const secondNeedleID = uint64(444552) + const firstCookie = uint32(0xAA22BB33) + const secondCookie = uint32(0xCC44DD55) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + client := framework.NewHTTPClient() + uploads := []struct { + fid string + body string + }{ + {fid: framework.NewFileID(volumeID, firstNeedleID, firstCookie), body: "read-all-first"}, + {fid: framework.NewFileID(volumeID, secondNeedleID, secondCookie), body: "read-all-second"}, + {fid: framework.NewFileID(volumeID, firstNeedleID, firstCookie), body: "read-all-first-overwrite"}, + } + for _, upload := range uploads { + resp := framework.UploadBytes(t, client, clusterHarness.VolumeAdminURL(), upload.fid, []byte(upload.body)) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != 201 { + t.Fatalf("upload for %s expected 201, got %d", upload.fid, resp.StatusCode) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := grpcClient.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{VolumeIds: []uint32{volumeID}}) + if err != nil { + t.Fatalf("ReadAllNeedles start failed: %v", err) + } + + var orderedIDs []uint64 + var orderedBodies []string + for { + msg, recvErr := stream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + t.Fatalf("ReadAllNeedles recv failed: %v", recvErr) + } + orderedIDs = append(orderedIDs, msg.GetNeedleId()) + orderedBodies = append(orderedBodies, string(msg.GetNeedleBlob())) + } + + wantIDs := []uint64{secondNeedleID, firstNeedleID} + wantBodies := []string{"read-all-second", "read-all-first-overwrite"} + if !reflect.DeepEqual(orderedIDs, wantIDs) { + t.Fatalf("ReadAllNeedles order mismatch: got %v want %v", orderedIDs, wantIDs) + } + if !reflect.DeepEqual(orderedBodies, wantBodies) { + t.Fatalf("ReadAllNeedles bodies mismatch: got %v want %v", orderedBodies, wantBodies) + } +} + func copyFileBytes(t testing.TB, grpcClient volume_server_pb.VolumeServerClient, req *volume_server_pb.CopyFileRequest) []byte { t.Helper()