From 1bb40b6bc5d62f9bd85c714c2297cff656382af9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Feb 2026 17:55:29 -0800 Subject: [PATCH] test(volume_server/grpc): add ec batch delete success coverage --- test/volume_server/grpc/batch_delete_test.go | 141 +++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/test/volume_server/grpc/batch_delete_test.go b/test/volume_server/grpc/batch_delete_test.go index b02d4ea27..076617465 100644 --- a/test/volume_server/grpc/batch_delete_test.go +++ b/test/volume_server/grpc/batch_delete_test.go @@ -3,7 +3,10 @@ package volume_server_grpc_test import ( "bytes" "context" + "io" + "net" "net/http" + "strconv" "strings" "testing" "time" @@ -262,3 +265,141 @@ func TestBatchDeleteRejectsChunkManifestNeedles(t *testing.T) { t.Fatalf("chunk manifest should not be deleted by BatchDelete reject path, got %d", readResp.StatusCode) } } + +func TestBatchDeleteEcNeedleSuccess(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartSingleVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + if stopProxy := maybeStartGrpcOffsetProxy(t, cluster.VolumeAdminAddress(), cluster.VolumeGRPCAddress()); stopProxy != nil { + t.Cleanup(stopProxy) + } + + const volumeID = uint32(34) + const needleID = uint64(930001) + const cookie = uint32(0x6677BBCC) + framework.AllocateVolume(t, client, volumeID, "") + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, needleID, cookie) + uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(), fid, []byte("batch-delete-ec-success")) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := client.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, + Collection: "", + }) + if err != nil { + t.Fatalf("VolumeEcShardsGenerate failed: %v", err) + } + + _, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: "", + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }) + if err != nil { + t.Fatalf("VolumeEcShardsMount failed: %v", err) + } + + deleteResp, err := client.BatchDelete(ctx, &volume_server_pb.BatchDeleteRequest{FileIds: []string{fid}}) + if err != nil { + t.Fatalf("BatchDelete EC needle should return response, got grpc error: %v", err) + } + if len(deleteResp.GetResults()) != 1 { + t.Fatalf("BatchDelete EC needle expected one result, got %d", len(deleteResp.GetResults())) + } + if deleteResp.GetResults()[0].GetStatus() != http.StatusAccepted { + t.Fatalf("BatchDelete EC needle expected status 202, got %d error=%q", deleteResp.GetResults()[0].GetStatus(), deleteResp.GetResults()[0].GetError()) + } + if deleteResp.GetResults()[0].GetSize() == 0 { + t.Fatalf("BatchDelete EC needle expected non-zero deleted size") + } + + deletedStream, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + VolumeId: volumeID, + ShardId: 0, + FileKey: needleID, + Offset: 0, + Size: 1, + }) + if err != nil { + t.Fatalf("VolumeEcShardRead deleted-check start failed: %v", err) + } + deletedMsg, err := deletedStream.Recv() + if err != nil { + t.Fatalf("VolumeEcShardRead deleted-check recv failed: %v", err) + } + if !deletedMsg.GetIsDeleted() { + t.Fatalf("VolumeEcShardRead expected IsDeleted=true after EC batch delete") + } +} + +func maybeStartGrpcOffsetProxy(t testing.TB, adminAddr, actualGrpcAddr string) func() { + t.Helper() + + host, portText, err := net.SplitHostPort(adminAddr) + if err != nil { + t.Fatalf("split admin address %q: %v", adminAddr, err) + } + port, err := strconv.Atoi(portText) + if err != nil { + t.Fatalf("parse admin port %q: %v", portText, err) + } + + expectedGrpcAddr := net.JoinHostPort(host, strconv.Itoa(port+10000)) + if expectedGrpcAddr == actualGrpcAddr { + return nil + } + + listener, err := net.Listen("tcp", expectedGrpcAddr) + if err != nil { + t.Fatalf("listen grpc-offset proxy %s -> %s: %v", expectedGrpcAddr, actualGrpcAddr, err) + } + + stopped := make(chan struct{}) + go func() { + for { + conn, acceptErr := listener.Accept() + if acceptErr != nil { + select { + case <-stopped: + return + default: + continue + } + } + go proxyBidirectional(conn, actualGrpcAddr) + } + }() + + return func() { + close(stopped) + _ = listener.Close() + } +} + +func proxyBidirectional(inbound net.Conn, targetAddr string) { + outbound, err := net.Dial("tcp", targetAddr) + if err != nil { + _ = inbound.Close() + return + } + + go func() { + _, _ = io.Copy(outbound, inbound) + _ = outbound.Close() + }() + _, _ = io.Copy(inbound, outbound) + _ = inbound.Close() +}