From 69f92f46eecc2925c37cfe1bc8971d64c8693705 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 16:01:54 -0800 Subject: [PATCH] Add CI for Rust volume server with integration tests against Go master - cluster_rust.go: test framework to start Go master + Rust volume server - test/volume_server/rust/: 8 integration tests (healthz, status, ping, write/read/delete round-trip, volume lifecycle, get/set state, server status, metrics endpoint) - rust-volume-server-tests.yml: CI workflow with Rust unit tests and Go+Rust integration tests --- .../workflows/rust-volume-server-tests.yml | 123 ++++++++ test/volume_server/framework/cluster_rust.go | 288 +++++++++++++++++ test/volume_server/rust/rust_volume_test.go | 295 ++++++++++++++++++ 3 files changed, 706 insertions(+) create mode 100644 .github/workflows/rust-volume-server-tests.yml create mode 100644 test/volume_server/framework/cluster_rust.go create mode 100644 test/volume_server/rust/rust_volume_test.go diff --git a/.github/workflows/rust-volume-server-tests.yml b/.github/workflows/rust-volume-server-tests.yml new file mode 100644 index 000000000..13f0436cb --- /dev/null +++ b/.github/workflows/rust-volume-server-tests.yml @@ -0,0 +1,123 @@ +name: "Rust Volume Server Tests" + +on: + pull_request: + branches: [ master ] + paths: + - 'seaweed-volume/**' + - 'test/volume_server/**' + - '.github/workflows/rust-volume-server-tests.yml' + push: + branches: [ master, main ] + paths: + - 'seaweed-volume/**' + - 'test/volume_server/**' + - '.github/workflows/rust-volume-server-tests.yml' + +concurrency: + group: ${{ github.head_ref || github.ref }}/rust-volume-server-tests + cancel-in-progress: true + +permissions: + contents: read + +env: + GO_VERSION: '1.24' + +jobs: + rust-unit-tests: + name: Rust Unit Tests + runs-on: ubuntu-22.04 + timeout-minutes: 15 + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and target + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + seaweed-volume/target + key: rust-${{ hashFiles('seaweed-volume/Cargo.lock') }} + restore-keys: | + rust- + + - name: Build Rust volume server + run: cd seaweed-volume && cargo build --release + + - name: Run Rust unit tests + run: cd seaweed-volume && cargo test + + rust-integration-tests: + name: Rust Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up Go ${{ env.GO_VERSION }} + uses: actions/setup-go@v6 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and target + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + seaweed-volume/target + key: rust-${{ hashFiles('seaweed-volume/Cargo.lock') }} + restore-keys: | + rust- + + - name: Build Go weed binary + run: | + cd weed + go build -o weed . + chmod +x weed + ./weed version + + - name: Build Rust volume binary + run: cd seaweed-volume && cargo build --release + + - name: Run integration tests + env: + WEED_BINARY: ${{ github.workspace }}/weed/weed + RUST_VOLUME_BINARY: ${{ github.workspace }}/seaweed-volume/target/release/seaweed-volume + run: | + echo "Running Rust volume server integration tests..." + go test -v -count=1 -timeout=15m ./test/volume_server/rust/... + + - name: Collect logs on failure + if: failure() + run: | + mkdir -p /tmp/rust-volume-server-it-logs + find /tmp -maxdepth 1 -type d -name "seaweedfs_volume_server_it_*" -print -exec cp -r {} /tmp/rust-volume-server-it-logs/ \; || true + + - name: Archive logs on failure + if: failure() + uses: actions/upload-artifact@v7 + with: + name: rust-volume-server-integration-test-logs + path: /tmp/rust-volume-server-it-logs/ + if-no-files-found: warn + retention-days: 7 + + - name: Test summary + if: always() + run: | + echo "## Rust Volume Server Integration Test Summary" >> "$GITHUB_STEP_SUMMARY" + echo "- Suite: test/volume_server/rust" >> "$GITHUB_STEP_SUMMARY" + echo "- Command: go test -v -count=1 -timeout=15m ./test/volume_server/rust/..." >> "$GITHUB_STEP_SUMMARY" diff --git a/test/volume_server/framework/cluster_rust.go b/test/volume_server/framework/cluster_rust.go new file mode 100644 index 000000000..6d53e2525 --- /dev/null +++ b/test/volume_server/framework/cluster_rust.go @@ -0,0 +1,288 @@ +package framework + +import ( + "bytes" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" +) + +// RustCluster wraps a Go master + Rust volume server for integration testing. +type RustCluster struct { + testingTB testing.TB + profile matrix.Profile + + weedBinary string // Go weed binary (for the master) + rustVolumeBinary string // Rust volume binary + + baseDir string + configDir string + logsDir string + keepLogs bool + + masterPort int + masterGrpcPort int + volumePort int + volumeGrpcPort int + volumePubPort int + + masterCmd *exec.Cmd + volumeCmd *exec.Cmd + + cleanupOnce sync.Once +} + +// StartRustVolumeCluster starts a Go master + Rust volume server. +func StartRustVolumeCluster(t testing.TB, profile matrix.Profile) *RustCluster { + t.Helper() + + weedBinary, err := FindOrBuildWeedBinary() + if err != nil { + t.Fatalf("resolve weed binary: %v", err) + } + + rustBinary, err := FindOrBuildRustBinary() + if err != nil { + t.Fatalf("resolve rust volume binary: %v", err) + } + + baseDir, keepLogs, err := newWorkDir() + if err != nil { + t.Fatalf("create temp test directory: %v", err) + } + + configDir := filepath.Join(baseDir, "config") + logsDir := filepath.Join(baseDir, "logs") + masterDataDir := filepath.Join(baseDir, "master") + volumeDataDir := filepath.Join(baseDir, "volume") + for _, dir := range []string{configDir, logsDir, masterDataDir, volumeDataDir} { + if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { + t.Fatalf("create %s: %v", dir, mkErr) + } + } + + if err = writeSecurityConfig(configDir, profile); err != nil { + t.Fatalf("write security config: %v", err) + } + + masterPort, masterGrpcPort, err := allocateMasterPortPair() + if err != nil { + t.Fatalf("allocate master port pair: %v", err) + } + + ports, err := allocatePorts(3) + if err != nil { + t.Fatalf("allocate ports: %v", err) + } + + rc := &RustCluster{ + testingTB: t, + profile: profile, + weedBinary: weedBinary, + rustVolumeBinary: rustBinary, + baseDir: baseDir, + configDir: configDir, + logsDir: logsDir, + keepLogs: keepLogs, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePort: ports[0], + volumeGrpcPort: ports[1], + volumePubPort: ports[0], + } + if profile.SplitPublicPort { + rc.volumePubPort = ports[2] + } + + if err = rc.startMaster(masterDataDir); err != nil { + rc.Stop() + t.Fatalf("start master: %v", err) + } + // Reuse the same HTTP readiness helper via an unexported Cluster shim. + helper := &Cluster{logsDir: logsDir} + if err = helper.waitForHTTP(rc.MasterURL() + "/dir/status"); err != nil { + masterLog := helper.tailLog("master.log") + rc.Stop() + t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog) + } + + if err = rc.startRustVolume(volumeDataDir); err != nil { + masterLog := helper.tailLog("master.log") + rc.Stop() + t.Fatalf("start rust volume: %v\nmaster log tail:\n%s", err, masterLog) + } + if err = helper.waitForHTTP(rc.VolumeAdminURL() + "/healthz"); err != nil { + volumeLog := helper.tailLog("volume.log") + rc.Stop() + t.Fatalf("wait for rust volume readiness: %v\nvolume log tail:\n%s", err, volumeLog) + } + if err = helper.waitForTCP(rc.VolumeGRPCAddress()); err != nil { + volumeLog := helper.tailLog("volume.log") + rc.Stop() + t.Fatalf("wait for rust volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog) + } + + t.Cleanup(func() { + rc.Stop() + }) + + return rc +} + +// Stop terminates all processes and cleans temporary files. +func (rc *RustCluster) Stop() { + if rc == nil { + return + } + rc.cleanupOnce.Do(func() { + stopProcess(rc.volumeCmd) + stopProcess(rc.masterCmd) + if !rc.keepLogs && !rc.testingTB.Failed() { + _ = os.RemoveAll(rc.baseDir) + } else if rc.baseDir != "" { + rc.testingTB.Logf("rust volume server integration logs kept at %s", rc.baseDir) + } + }) +} + +func (rc *RustCluster) startMaster(dataDir string) error { + logFile, err := os.Create(filepath.Join(rc.logsDir, "master.log")) + if err != nil { + return err + } + + args := []string{ + "-config_dir=" + rc.configDir, + "master", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(rc.masterPort), + "-port.grpc=" + strconv.Itoa(rc.masterGrpcPort), + "-mdir=" + dataDir, + "-peers=none", + "-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB), + "-defaultReplication=000", + } + + rc.masterCmd = exec.Command(rc.weedBinary, args...) + rc.masterCmd.Dir = rc.baseDir + rc.masterCmd.Stdout = logFile + rc.masterCmd.Stderr = logFile + return rc.masterCmd.Start() +} + +func (rc *RustCluster) startRustVolume(dataDir string) error { + logFile, err := os.Create(filepath.Join(rc.logsDir, "volume.log")) + if err != nil { + return err + } + + args := []string{ + "--port", strconv.Itoa(rc.volumePort), + "--port.grpc", strconv.Itoa(rc.volumeGrpcPort), + "--port.public", strconv.Itoa(rc.volumePubPort), + "--ip", "127.0.0.1", + "--ip.bind", "127.0.0.1", + "--dir", dataDir, + "--max", "16", + "--master", "127.0.0.1:" + strconv.Itoa(rc.masterPort), + } + + rc.volumeCmd = exec.Command(rc.rustVolumeBinary, args...) + rc.volumeCmd.Dir = rc.baseDir + rc.volumeCmd.Stdout = logFile + rc.volumeCmd.Stderr = logFile + return rc.volumeCmd.Start() +} + +// FindOrBuildRustBinary returns an executable Rust volume binary, building one when needed. +func FindOrBuildRustBinary() (string, error) { + if fromEnv := os.Getenv("RUST_VOLUME_BINARY"); fromEnv != "" { + if isExecutableFile(fromEnv) { + return fromEnv, nil + } + return "", fmt.Errorf("RUST_VOLUME_BINARY is set but not executable: %s", fromEnv) + } + + // Derive the seaweed-volume crate directory from this source file's location. + rustCrateDir := "" + if _, file, _, ok := runtime.Caller(0); ok { + repoRoot := filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", "..")) + rustCrateDir = filepath.Join(repoRoot, "seaweed-volume") + } + if rustCrateDir == "" { + return "", fmt.Errorf("unable to detect seaweed-volume crate directory") + } + + // Check for a pre-built release binary. + releaseBin := filepath.Join(rustCrateDir, "target", "release", "seaweed-volume") + if isExecutableFile(releaseBin) { + return releaseBin, nil + } + + // Check for a pre-built debug binary. + debugBin := filepath.Join(rustCrateDir, "target", "debug", "seaweed-volume") + if isExecutableFile(debugBin) { + return debugBin, nil + } + + // Build with cargo. + cmd := exec.Command("cargo", "build", "--release") + cmd.Dir = rustCrateDir + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("build rust volume binary: %w\n%s", err, out.String()) + } + if !isExecutableFile(releaseBin) { + return "", fmt.Errorf("built rust volume binary is not executable: %s", releaseBin) + } + return releaseBin, nil +} + +// --- accessor methods (mirror Cluster) --- + +func (rc *RustCluster) MasterAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(rc.masterPort)) +} + +func (rc *RustCluster) VolumeAdminAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(rc.volumePort)) +} + +func (rc *RustCluster) VolumePublicAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(rc.volumePubPort)) +} + +func (rc *RustCluster) VolumeGRPCAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(rc.volumeGrpcPort)) +} + +// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort +func (rc *RustCluster) VolumeServerAddress() string { + return fmt.Sprintf("%s.%d", rc.VolumeAdminAddress(), rc.volumeGrpcPort) +} + +func (rc *RustCluster) MasterURL() string { + return "http://" + rc.MasterAddress() +} + +func (rc *RustCluster) VolumeAdminURL() string { + return "http://" + rc.VolumeAdminAddress() +} + +func (rc *RustCluster) VolumePublicURL() string { + return "http://" + rc.VolumePublicAddress() +} + +func (rc *RustCluster) BaseDir() string { + return rc.baseDir +} diff --git a/test/volume_server/rust/rust_volume_test.go b/test/volume_server/rust/rust_volume_test.go new file mode 100644 index 000000000..33affe2db --- /dev/null +++ b/test/volume_server/rust/rust_volume_test.go @@ -0,0 +1,295 @@ +package volume_server_rust_test + +import ( + "context" + "encoding/json" + "net/http" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +func mustNewRequest(t testing.TB, method, url string) *http.Request { + t.Helper() + req, err := http.NewRequest(method, url, nil) + if err != nil { + t.Fatalf("create request %s %s: %v", method, url, err) + } + return req +} + +// TestRustHealthzEndpoint verifies that the Rust volume server responds to +// GET /healthz with HTTP 200. +func TestRustHealthzEndpoint(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + client := framework.NewHTTPClient() + + resp := framework.DoRequest(t, client, mustNewRequest(t, http.MethodGet, cluster.VolumeAdminURL()+"/healthz")) + _ = framework.ReadAllAndClose(t, resp) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected /healthz 200, got %d", resp.StatusCode) + } +} + +// TestRustStatusEndpoint verifies that GET /status returns 200 with a JSON +// body containing a "version" field. The Rust server uses lowercase field +// names in its axum JSON responses. +func TestRustStatusEndpoint(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + client := framework.NewHTTPClient() + + resp := framework.DoRequest(t, client, mustNewRequest(t, http.MethodGet, cluster.VolumeAdminURL()+"/status")) + body := framework.ReadAllAndClose(t, resp) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected /status 200, got %d, body: %s", resp.StatusCode, string(body)) + } + + var payload map[string]interface{} + if err := json.Unmarshal(body, &payload); err != nil { + t.Fatalf("decode /status JSON: %v", err) + } + + if _, ok := payload["version"]; !ok { + t.Fatalf("/status JSON missing \"version\" field, keys: %v", keys(payload)) + } +} + +// TestRustPingRPC verifies the gRPC Ping RPC returns non-zero timestamps. +func TestRustPingRPC(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if err != nil { + t.Fatalf("Ping RPC failed: %v", err) + } + if resp.GetStartTimeNs() == 0 { + t.Fatalf("Ping StartTimeNs should be non-zero") + } + if resp.GetStopTimeNs() == 0 { + t.Fatalf("Ping StopTimeNs should be non-zero") + } + if resp.GetStopTimeNs() < resp.GetStartTimeNs() { + t.Fatalf("Ping StopTimeNs (%d) should be >= StartTimeNs (%d)", resp.GetStopTimeNs(), resp.GetStartTimeNs()) + } +} + +// TestRustAllocateAndWriteReadDelete exercises the full needle lifecycle: +// allocate a volume via gRPC, upload bytes via HTTP POST, read them back +// via HTTP GET, delete via HTTP DELETE, then confirm GET returns 404. +func TestRustAllocateAndWriteReadDelete(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(1) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 1001, 0xAABBCCDD) + data := []byte("rust-volume-server-integration-test-payload") + + // Upload + uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(), fid, data) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode) + } + + // Read back + getResp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fid) + getBody := framework.ReadAllAndClose(t, getResp) + if getResp.StatusCode != http.StatusOK { + t.Fatalf("read expected 200, got %d", getResp.StatusCode) + } + if string(getBody) != string(data) { + t.Fatalf("read body mismatch: got %q, want %q", string(getBody), string(data)) + } + + // Delete + deleteResp := framework.DoRequest(t, httpClient, mustNewRequest(t, http.MethodDelete, cluster.VolumeAdminURL()+"/"+fid)) + _ = framework.ReadAllAndClose(t, deleteResp) + if deleteResp.StatusCode != http.StatusAccepted && deleteResp.StatusCode != http.StatusOK { + t.Fatalf("delete expected 202 or 200, got %d", deleteResp.StatusCode) + } + + // Verify 404 after delete + gone := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(), fid) + _ = framework.ReadAllAndClose(t, gone) + if gone.StatusCode != http.StatusNotFound { + t.Fatalf("read after delete expected 404, got %d", gone.StatusCode) + } +} + +// TestRustVolumeLifecycle tests the volume admin gRPC lifecycle: +// allocate, check status, unmount, mount, delete. +func TestRustVolumeLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + const volumeID = uint32(2) + framework.AllocateVolume(t, client, volumeID, "") + + // VolumeStatus should succeed on a freshly allocated volume. + statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID}) + if err != nil { + t.Fatalf("VolumeStatus failed: %v", err) + } + if statusResp.GetFileCount() != 0 { + t.Fatalf("new volume should be empty, got file_count=%d", statusResp.GetFileCount()) + } + + // Unmount then remount. + if _, err = client.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{VolumeId: volumeID}); err != nil { + t.Fatalf("VolumeUnmount failed: %v", err) + } + if _, err = client.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{VolumeId: volumeID}); err != nil { + t.Fatalf("VolumeMount failed: %v", err) + } + + // Delete. + if _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{VolumeId: volumeID, OnlyEmpty: true}); err != nil { + t.Fatalf("VolumeDelete failed: %v", err) + } + + // VolumeStatus should fail after delete. + _, err = client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{VolumeId: volumeID}) + if err == nil { + t.Fatalf("VolumeStatus should fail after delete") + } +} + +// TestRustGetSetState verifies GetState returns a non-nil state and SetState +// echoes the state back. +func TestRustGetSetState(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // GetState should return non-nil state. + getResp, err := client.GetState(ctx, &volume_server_pb.GetStateRequest{}) + if err != nil { + t.Fatalf("GetState failed: %v", err) + } + if getResp.GetState() == nil { + t.Fatalf("GetState returned nil state") + } + + // SetState should echo back the state. + setResp, err := client.SetState(ctx, &volume_server_pb.SetStateRequest{ + State: &volume_server_pb.VolumeServerState{ + Version: getResp.GetState().GetVersion(), + }, + }) + if err != nil { + t.Fatalf("SetState failed: %v", err) + } + if setResp.GetState() == nil { + t.Fatalf("SetState returned nil state") + } + if setResp.GetState().GetVersion() < getResp.GetState().GetVersion() { + t.Fatalf("SetState version should not decrease: got %d, had %d", + setResp.GetState().GetVersion(), getResp.GetState().GetVersion()) + } +} + +// TestRustVolumeServerStatus verifies VolumeServerStatus returns a version +// string and at least one disk status entry. +func TestRustVolumeServerStatus(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + conn, client := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.VolumeServerStatus(ctx, &volume_server_pb.VolumeServerStatusRequest{}) + if err != nil { + t.Fatalf("VolumeServerStatus failed: %v", err) + } + if resp.GetVersion() == "" { + t.Fatalf("VolumeServerStatus returned empty version") + } + if len(resp.GetDiskStatuses()) == 0 { + t.Fatalf("VolumeServerStatus returned no disk statuses") + } +} + +// TestRustMetricsEndpoint verifies that GET /metrics returns 200 with +// Prometheus text format content. +func TestRustMetricsEndpoint(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartRustVolumeCluster(t, matrix.P1()) + client := framework.NewHTTPClient() + + resp := framework.DoRequest(t, client, mustNewRequest(t, http.MethodGet, cluster.VolumeAdminURL()+"/metrics")) + body := framework.ReadAllAndClose(t, resp) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected /metrics 200, got %d", resp.StatusCode) + } + + bodyStr := string(body) + // Prometheus text format includes lines starting with "# HELP" or "# TYPE", + // or at minimum metric names. Check for common indicators. + if !strings.Contains(bodyStr, "# ") && !strings.Contains(bodyStr, "_total") && !strings.Contains(bodyStr, "_seconds") { + t.Fatalf("/metrics response does not look like Prometheus text format, got: %.200s", bodyStr) + } +} + +// keys returns the keys of a map for diagnostic messages. +func keys(m map[string]interface{}) []string { + ks := make([]string, 0, len(m)) + for k := range m { + ks = append(ks, k) + } + return ks +}