Browse Source

test(volume_server/grpc): add ec batch delete success coverage

codex-rust-volume-server-bootstrap
Chris Lu 4 weeks ago
parent
commit
1bb40b6bc5
  1. 141
      test/volume_server/grpc/batch_delete_test.go

141
test/volume_server/grpc/batch_delete_test.go

@ -3,7 +3,10 @@ package volume_server_grpc_test
import (
"bytes"
"context"
"io"
"net"
"net/http"
"strconv"
"strings"
"testing"
"time"
@ -262,3 +265,141 @@ func TestBatchDeleteRejectsChunkManifestNeedles(t *testing.T) {
t.Fatalf("chunk manifest should not be deleted by BatchDelete reject path, got %d", readResp.StatusCode)
}
}
func TestBatchDeleteEcNeedleSuccess(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartSingleVolumeCluster(t, matrix.P1())
conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress())
defer conn.Close()
if stopProxy := maybeStartGrpcOffsetProxy(t, cluster.VolumeAdminAddress(), cluster.VolumeGRPCAddress()); stopProxy != nil {
t.Cleanup(stopProxy)
}
const volumeID = uint32(34)
const needleID = uint64(930001)
const cookie = uint32(0x6677BBCC)
framework.AllocateVolume(t, client, volumeID, "")
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, needleID, cookie)
uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(), fid, []byte("batch-delete-ec-success"))
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID,
Collection: "",
})
if err != nil {
t.Fatalf("VolumeEcShardsGenerate failed: %v", err)
}
_, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID,
Collection: "",
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
})
if err != nil {
t.Fatalf("VolumeEcShardsMount failed: %v", err)
}
deleteResp, err := client.BatchDelete(ctx, &volume_server_pb.BatchDeleteRequest{FileIds: []string{fid}})
if err != nil {
t.Fatalf("BatchDelete EC needle should return response, got grpc error: %v", err)
}
if len(deleteResp.GetResults()) != 1 {
t.Fatalf("BatchDelete EC needle expected one result, got %d", len(deleteResp.GetResults()))
}
if deleteResp.GetResults()[0].GetStatus() != http.StatusAccepted {
t.Fatalf("BatchDelete EC needle expected status 202, got %d error=%q", deleteResp.GetResults()[0].GetStatus(), deleteResp.GetResults()[0].GetError())
}
if deleteResp.GetResults()[0].GetSize() == 0 {
t.Fatalf("BatchDelete EC needle expected non-zero deleted size")
}
deletedStream, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: volumeID,
ShardId: 0,
FileKey: needleID,
Offset: 0,
Size: 1,
})
if err != nil {
t.Fatalf("VolumeEcShardRead deleted-check start failed: %v", err)
}
deletedMsg, err := deletedStream.Recv()
if err != nil {
t.Fatalf("VolumeEcShardRead deleted-check recv failed: %v", err)
}
if !deletedMsg.GetIsDeleted() {
t.Fatalf("VolumeEcShardRead expected IsDeleted=true after EC batch delete")
}
}
func maybeStartGrpcOffsetProxy(t testing.TB, adminAddr, actualGrpcAddr string) func() {
t.Helper()
host, portText, err := net.SplitHostPort(adminAddr)
if err != nil {
t.Fatalf("split admin address %q: %v", adminAddr, err)
}
port, err := strconv.Atoi(portText)
if err != nil {
t.Fatalf("parse admin port %q: %v", portText, err)
}
expectedGrpcAddr := net.JoinHostPort(host, strconv.Itoa(port+10000))
if expectedGrpcAddr == actualGrpcAddr {
return nil
}
listener, err := net.Listen("tcp", expectedGrpcAddr)
if err != nil {
t.Fatalf("listen grpc-offset proxy %s -> %s: %v", expectedGrpcAddr, actualGrpcAddr, err)
}
stopped := make(chan struct{})
go func() {
for {
conn, acceptErr := listener.Accept()
if acceptErr != nil {
select {
case <-stopped:
return
default:
continue
}
}
go proxyBidirectional(conn, actualGrpcAddr)
}
}()
return func() {
close(stopped)
_ = listener.Close()
}
}
func proxyBidirectional(inbound net.Conn, targetAddr string) {
outbound, err := net.Dial("tcp", targetAddr)
if err != nil {
_ = inbound.Close()
return
}
go func() {
_, _ = io.Copy(outbound, inbound)
_ = outbound.Close()
}()
_, _ = io.Copy(inbound, outbound)
_ = inbound.Close()
}
Loading…
Cancel
Save