From 5ce2f4e868e4a3c034b2697849456af2fdd91360 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 20:18:52 -0800 Subject: [PATCH] add README and load test harness - seaweed-volume/README.md: build, run, flags, features, testing, architecture - test/volume_server/loadtest: throughput, latency percentiles, sustained p99 tests --- seaweed-volume/README.md | 140 +++++ test/volume_server/loadtest/loadtest_test.go | 628 +++++++++++++++++++ 2 files changed, 768 insertions(+) create mode 100644 seaweed-volume/README.md create mode 100644 test/volume_server/loadtest/loadtest_test.go diff --git a/seaweed-volume/README.md b/seaweed-volume/README.md new file mode 100644 index 000000000..4367a0722 --- /dev/null +++ b/seaweed-volume/README.md @@ -0,0 +1,140 @@ +# SeaweedFS Volume Server (Rust) + +A drop-in replacement for the [SeaweedFS](https://github.com/seaweedfs/seaweedfs) Go volume server, rewritten in Rust. It uses binary-compatible storage formats (`.dat`, `.idx`, `.vif`) and speaks the same HTTP and gRPC protocols, so it works with an unmodified Go master server. + +## Building + +Requires Rust 1.75+ (2021 edition). + +```bash +cd seaweed-volume +cargo build --release +``` + +The binary is produced at `target/release/seaweed-volume`. + +## Running + +Start a Go master server first, then point the Rust volume server at it: + +```bash +# Minimal +seaweed-volume --port 8080 --master localhost:9333 --dir /data/vol1 --max 7 + +# Multiple data directories +seaweed-volume --port 8080 --master localhost:9333 \ + --dir /mnt/ssd1,/mnt/ssd2 --max 100,100 --disk ssd + +# With datacenter/rack topology +seaweed-volume --port 8080 --master localhost:9333 --dir /data/vol1 --max 7 \ + --dataCenter dc1 --rack rack1 + +# With JWT authentication +seaweed-volume --port 8080 --master localhost:9333 --dir /data/vol1 --max 7 \ + --securityFile /etc/seaweedfs/security.toml + +# With TLS (configured in security.toml via [https.volume] and [grpc.volume] sections) +seaweed-volume --port 8080 --master localhost:9333 --dir /data/vol1 --max 7 \ + --securityFile /etc/seaweedfs/security.toml +``` + +### Common flags + +| Flag | Default | Description | +|------|---------|-------------| +| `--port` | `8080` | HTTP listen port | +| `--port.grpc` | `port+10000` | gRPC listen port | +| `--master` | `localhost:9333` | Comma-separated master server addresses | +| `--dir` | `/tmp` | Comma-separated data directories | +| `--max` | `8` | Max volumes per directory (comma-separated) | +| `--ip` | auto-detect | Server IP / identifier | +| `--ip.bind` | same as `--ip` | Bind address | +| `--dataCenter` | | Datacenter name | +| `--rack` | | Rack name | +| `--disk` | | Disk type tag: `hdd`, `ssd`, or custom | +| `--index` | `memory` | Needle map type: `memory`, `leveldb`, `leveldbMedium`, `leveldbLarge` | +| `--readMode` | `proxy` | Non-local read mode: `local`, `proxy`, `redirect` | +| `--fileSizeLimitMB` | `256` | Max upload file size | +| `--minFreeSpace` | `1` (percent) | Min free disk space before marking volumes read-only | +| `--securityFile` | | Path to `security.toml` for JWT keys and TLS certs | +| `--metricsPort` | `0` (disabled) | Prometheus metrics endpoint port | +| `--whiteList` | | Comma-separated IPs with write permission | +| `--preStopSeconds` | `10` | Graceful drain period before shutdown | +| `--compactionMBps` | `0` (unlimited) | Compaction I/O rate limit | +| `--pprof` | `false` | Enable pprof HTTP handlers | + +Set `RUST_LOG=debug` (or `trace`, `info`, `warn`) for log level control. +Set `SEAWEED_WRITE_QUEUE=1` to enable batched async write processing. + +## Features + +- **Binary compatible** -- reads and writes the same `.dat`/`.idx`/`.vif` files as the Go server; seamless migration with no data conversion. +- **HTTP + gRPC** -- full implementation of the volume server HTTP API and all gRPC RPCs including streaming operations (copy, tail, incremental copy, vacuum). +- **Master heartbeat** -- bidirectional streaming heartbeat with the Go master server; volume and EC shard registration, leader failover, graceful shutdown deregistration. +- **JWT authentication** -- signing key configuration via `security.toml` with token source precedence (query > header > cookie), file_id claims validation, and separate read/write keys. +- **TLS** -- HTTPS for the HTTP API and mTLS for gRPC, configured through `security.toml`. +- **Erasure coding** -- Reed-Solomon EC shard management: mount/unmount, read, rebuild, copy, delete, and shard-to-volume reconstruction. +- **S3 remote storage** -- `FetchAndWriteNeedle` reads from any S3-compatible backend (AWS, MinIO, Wasabi, Backblaze, etc.) and writes locally. Supports `VolumeTierMoveDatToRemote`/`FromRemote` for tiered storage. +- **Needle map backends** -- in-memory HashMap, LevelDB (via `rusty-leveldb`), or redb (pure Rust disk-backed) needle maps. +- **Image processing** -- on-the-fly resize/crop, JPEG EXIF orientation auto-fix, WebP support. +- **Streaming reads** -- large files (>1MB) are streamed via `spawn_blocking` to avoid blocking the async runtime. +- **Auto-compression** -- compressible file types (text, JSON, CSS, JS, SVG, etc.) are gzip-compressed on upload. +- **Prometheus metrics** -- counters, histograms, and gauges exported at a dedicated metrics port; optional push gateway support. +- **Graceful shutdown** -- SIGINT/SIGTERM handling with configurable `preStopSeconds` drain period. + +## Testing + +### Rust unit tests + +```bash +cd seaweed-volume +cargo test +``` + +### Go integration tests + +The Go test suite can target either the Go or Rust volume server via the `VOLUME_SERVER_IMPL` environment variable: + +```bash +# Run all HTTP + gRPC integration tests against the Rust server +VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 1200s \ + ./test/volume_server/grpc/... ./test/volume_server/http/... + +# Run a single test +VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 60s \ + -run "TestName" ./test/volume_server/http/... + +# Run S3 remote storage tests +VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 180s \ + -run "TestFetchAndWriteNeedle" ./test/volume_server/grpc/... +``` + +## Load testing + +A load test harness is available at `test/volume_server/loadtest/`. See that directory for usage instructions and scenarios. + +## Architecture + +The server runs three listeners concurrently: + +- **HTTP** (Axum 0.7) -- admin and public routers for file upload/download, status, and stats endpoints. +- **gRPC** (Tonic 0.12) -- all `VolumeServer` RPCs from the SeaweedFS protobuf definition. +- **Metrics** (optional) -- Prometheus scrape endpoint on a separate port. + +Key source modules: + +| Path | Description | +|------|-------------| +| `src/main.rs` | Entry point, server startup, signal handling | +| `src/config.rs` | CLI parsing and configuration resolution | +| `src/server/volume_server.rs` | HTTP router setup and middleware | +| `src/server/handlers.rs` | HTTP request handlers (read, write, delete, status) | +| `src/server/grpc_server.rs` | gRPC service implementation | +| `src/server/heartbeat.rs` | Master heartbeat loop | +| `src/storage/volume.rs` | Volume read/write/delete logic | +| `src/storage/needle.rs` | Needle (file entry) serialization | +| `src/storage/store.rs` | Multi-volume store management | +| `src/security.rs` | JWT validation and IP whitelist guard | +| `src/remote_storage/` | S3 remote storage backend | + +See [DEV_PLAN.md](DEV_PLAN.md) for the full development history and feature checklist. diff --git a/test/volume_server/loadtest/loadtest_test.go b/test/volume_server/loadtest/loadtest_test.go new file mode 100644 index 000000000..d3d0d0fc1 --- /dev/null +++ b/test/volume_server/loadtest/loadtest_test.go @@ -0,0 +1,628 @@ +package loadtest + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "net/http" + "os" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" +) + +// Run with: +// go test -v -count=1 -timeout 300s -run BenchmarkVolumeServer ./test/volume_server/loadtest/... +// VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 300s -run BenchmarkVolumeServer ./test/volume_server/loadtest/... +// +// Compare results: +// go test -count=1 -timeout 300s -run BenchmarkVolumeServer -bench . ./test/volume_server/loadtest/... | tee go.txt +// VOLUME_SERVER_IMPL=rust go test -count=1 -timeout 300s -run BenchmarkVolumeServer -bench . ./test/volume_server/loadtest/... | tee rust.txt + +// Step-by-step payload sizes: 1KB → 4KB → 16KB → 64KB → 256KB → 1MB → 4MB → 8MB +var payloadSteps = []struct { + name string + size int +}{ + {"1KB", 1 << 10}, + {"4KB", 4 << 10}, + {"16KB", 16 << 10}, + {"64KB", 64 << 10}, + {"256KB", 256 << 10}, + {"1MB", 1 << 20}, + {"4MB", 4 << 20}, + {"8MB", 8 << 20}, +} + +func implName() string { + if os.Getenv("VOLUME_SERVER_IMPL") == "rust" { + return "rust" + } + return "go" +} + +// setupCluster starts a volume cluster and returns the admin URL and cleanup. +func setupCluster(tb testing.TB) (adminURL string, grpcAddr string, cleanup func()) { + tb.Helper() + cluster := framework.StartVolumeCluster(tb, matrix.P1()) + return cluster.VolumeAdminURL(), cluster.VolumeGRPCAddress(), cluster.Stop +} + +// allocateVolume allocates a volume via gRPC and returns its ID. +func allocateVolume(tb testing.TB, grpcAddr string, volumeID uint32) { + tb.Helper() + conn, client := framework.DialVolumeServer(tb, grpcAddr) + defer conn.Close() + framework.AllocateVolume(tb, client, volumeID, "loadtest") +} + +func makePayload(size int) []byte { + data := make([]byte, size) + rand.Read(data) + return data +} + +// uploadFile uploads data and returns the file ID used. +func uploadFile(client *http.Client, adminURL string, volumeID uint32, key uint64, cookie uint32, data []byte) error { + fid := framework.NewFileID(volumeID, key, cookie) + url := fmt.Sprintf("%s/%s", adminURL, fid) + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := client.Do(req) + if err != nil { + return err + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if resp.StatusCode >= 400 { + return fmt.Errorf("upload %s: status %d", fid, resp.StatusCode) + } + return nil +} + +// downloadFile reads a file and discards the body. +func downloadFile(client *http.Client, adminURL string, volumeID uint32, key uint64, cookie uint32) error { + fid := framework.NewFileID(volumeID, key, cookie) + url := fmt.Sprintf("%s/%s", adminURL, fid) + resp, err := client.Get(url) + if err != nil { + return err + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if resp.StatusCode >= 400 { + return fmt.Errorf("download %s: status %d", fid, resp.StatusCode) + } + return nil +} + +// deleteFile deletes a file. +func deleteFile(client *http.Client, adminURL string, volumeID uint32, key uint64, cookie uint32) error { + fid := framework.NewFileID(volumeID, key, cookie) + url := fmt.Sprintf("%s/%s", adminURL, fid) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return nil +} + +// --- Throughput load tests (not Go benchmarks, manual timing for comparison) --- + +// TestBenchmarkVolumeServer runs a suite of load tests printing ops/sec and latency. +func TestBenchmarkVolumeServer(t *testing.T) { + if testing.Short() { + t.Skip("skipping load test in short mode") + } + + impl := implName() + adminURL, grpcAddr, cleanup := setupCluster(t) + defer cleanup() + + const volumeID = uint32(10) + allocateVolume(t, grpcAddr, volumeID) + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 128, + MaxConnsPerHost: 128, + }, + } + + // opsForSize returns fewer ops for larger payloads to keep test time reasonable. + opsForSize := func(size, concurrency int) int { + switch { + case size >= 4<<20: + if concurrency > 1 { + return 64 + } + return 30 + case size >= 1<<20: + if concurrency > 1 { + return 200 + } + return 100 + case size >= 64<<10: + if concurrency > 1 { + return 500 + } + return 300 + default: + if concurrency > 1 { + return 1000 + } + return 500 + } + } + + // Step-by-step upload: 1KB → 4KB → 16KB → 64KB → 256KB → 1MB → 4MB → 8MB + for _, ps := range payloadSteps { + for _, mode := range []struct { + label string + concurrency int + }{ + {"seq", 1}, + {"c16", 16}, + } { + name := fmt.Sprintf("Upload/%s/%s", ps.name, mode.label) + numOps := opsForSize(ps.size, mode.concurrency) + t.Run(fmt.Sprintf("%s/%s", impl, name), func(t *testing.T) { + payload := makePayload(ps.size) + runThroughputTest(t, impl, name, httpClient, adminURL, volumeID, + payload, numOps, mode.concurrency, false, false) + }) + } + } + + // Step-by-step download: 1KB → 4KB → 16KB → 64KB → 256KB → 1MB → 4MB → 8MB + for _, ps := range payloadSteps { + for _, mode := range []struct { + label string + concurrency int + }{ + {"seq", 1}, + {"c16", 16}, + } { + name := fmt.Sprintf("Download/%s/%s", ps.name, mode.label) + numOps := opsForSize(ps.size, mode.concurrency) + t.Run(fmt.Sprintf("%s/%s", impl, name), func(t *testing.T) { + payload := makePayload(ps.size) + runThroughputTest(t, impl, name, httpClient, adminURL, volumeID, + payload, numOps, mode.concurrency, true, false) + }) + } + } + + // Mixed read/write at each size + for _, ps := range payloadSteps { + name := fmt.Sprintf("Mixed/%s/c16", ps.name) + numOps := opsForSize(ps.size, 16) + t.Run(fmt.Sprintf("%s/%s", impl, name), func(t *testing.T) { + payload := makePayload(ps.size) + runThroughputTest(t, impl, name, httpClient, adminURL, volumeID, + payload, numOps, 16, false, true) + }) + } + + // Delete test + t.Run(fmt.Sprintf("%s/Delete/1KB/c16", impl), func(t *testing.T) { + payload := makePayload(1 << 10) + numOps := 1000 + baseKey := uint64(900000) + + for i := 0; i < numOps; i++ { + if err := uploadFile(httpClient, adminURL, volumeID, baseKey+uint64(i), 1, payload); err != nil { + t.Fatalf("pre-upload for delete %d: %v", i, err) + } + } + + var ops atomic.Int64 + var totalLatency atomic.Int64 + + start := time.Now() + var wg sync.WaitGroup + concurrency := 16 + opsPerWorker := numOps / concurrency + + for w := 0; w < concurrency; w++ { + workerBase := baseKey + uint64(w*opsPerWorker) + wg.Add(1) + go func(wb uint64) { + defer wg.Done() + for i := 0; i < opsPerWorker; i++ { + opStart := time.Now() + deleteFile(httpClient, adminURL, volumeID, wb+uint64(i), 1) + totalLatency.Add(time.Since(opStart).Nanoseconds()) + ops.Add(1) + } + }(workerBase) + } + wg.Wait() + elapsed := time.Since(start) + + totalOps := ops.Load() + avgLatencyUs := float64(totalLatency.Load()) / float64(totalOps) / 1000.0 + opsPerSec := float64(totalOps) / elapsed.Seconds() + + t.Logf("RESULT impl=%-4s test=%-22s ops=%-6d errors=%-4d elapsed=%-10s ops/s=%-10.1f avg_lat=%-10.0fus", + impl, "Delete/1KB/c16", totalOps, 0, elapsed.Round(time.Millisecond), opsPerSec, avgLatencyUs) + }) +} + +// runThroughputTest is the shared core for throughput tests. +// keyOffset separates key ranges so concurrent tests in the same volume don't collide. +// keyCounter provides globally unique key ranges. Starts at 1 because key=0 is invalid. +var keyCounter atomic.Uint64 + +func init() { + keyCounter.Store(1) +} + +func runThroughputTest( + t *testing.T, impl, name string, + httpClient *http.Client, adminURL string, volumeID uint32, + payload []byte, numOps, concurrency int, + isDownload, isMixed bool, +) { + t.Helper() + + // Each call gets a unique key range + baseKey := keyCounter.Add(uint64(numOps*2)) - uint64(numOps*2) + + // Pre-upload for download / mixed + if isDownload || isMixed { + for i := 0; i < numOps; i++ { + if err := uploadFile(httpClient, adminURL, volumeID, baseKey+uint64(i), 1, payload); err != nil { + t.Fatalf("pre-upload %d: %v", i, err) + } + } + } + + uploadBase := baseKey + if !isDownload && !isMixed { + uploadBase = baseKey + uint64(numOps) // fresh range for uploads + } + + var ops atomic.Int64 + var errors atomic.Int64 + var totalLatency atomic.Int64 + + start := time.Now() + + var wg sync.WaitGroup + opsPerWorker := numOps / concurrency + remainder := numOps % concurrency + + for w := 0; w < concurrency; w++ { + n := opsPerWorker + if w < remainder { + n++ + } + var workerBase uint64 + if w < remainder { + workerBase = uploadBase + uint64(w*(opsPerWorker+1)) + } else { + workerBase = uploadBase + uint64(remainder*(opsPerWorker+1)) + uint64((w-remainder)*opsPerWorker) + } + + wg.Add(1) + go func(wb uint64, count int) { + defer wg.Done() + for i := 0; i < count; i++ { + key := wb + uint64(i) + opStart := time.Now() + var err error + + if isMixed { + if i%2 == 0 { + err = uploadFile(httpClient, adminURL, volumeID, key, 1, payload) + } else { + err = downloadFile(httpClient, adminURL, volumeID, key, 1) + } + } else if isDownload { + err = downloadFile(httpClient, adminURL, volumeID, key, 1) + } else { + err = uploadFile(httpClient, adminURL, volumeID, key, 1, payload) + } + + totalLatency.Add(time.Since(opStart).Nanoseconds()) + ops.Add(1) + if err != nil { + errors.Add(1) + } + } + }(workerBase, n) + } + + wg.Wait() + elapsed := time.Since(start) + + totalOps := ops.Load() + totalErrs := errors.Load() + avgLatencyUs := float64(totalLatency.Load()) / float64(totalOps) / 1000.0 + opsPerSec := float64(totalOps) / elapsed.Seconds() + throughputMBs := opsPerSec * float64(len(payload)) / (1024 * 1024) + + t.Logf("RESULT impl=%-4s test=%-22s ops=%-6d errors=%-4d elapsed=%-10s ops/s=%-10.1f avg_lat=%-10.0fus throughput=%.2f MB/s", + impl, name, totalOps, totalErrs, elapsed.Round(time.Millisecond), opsPerSec, avgLatencyUs, throughputMBs) +} + +// TestLatencyPercentiles measures p50/p95/p99 latencies for upload and download at each size. +func TestLatencyPercentiles(t *testing.T) { + if testing.Short() { + t.Skip("skipping load test in short mode") + } + + impl := implName() + adminURL, grpcAddr, cleanup := setupCluster(t) + defer cleanup() + + const volumeID = uint32(20) + allocateVolume(t, grpcAddr, volumeID) + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 64, + MaxConnsPerHost: 64, + }, + } + + latOpsForSize := func(size int) int { + switch { + case size >= 4<<20: + return 30 + case size >= 1<<20: + return 100 + default: + return 300 + } + } + + for _, ps := range payloadSteps { + for _, dl := range []struct { + prefix string + isDownload bool + }{ + {"Upload", false}, + {"Download", true}, + } { + name := fmt.Sprintf("%s/%s", dl.prefix, ps.name) + numOps := latOpsForSize(ps.size) + + t.Run(fmt.Sprintf("%s/%s", impl, name), func(t *testing.T) { + payload := makePayload(ps.size) + baseKey := keyCounter.Add(uint64(numOps * 2)) + + if dl.isDownload { + for i := 0; i < numOps; i++ { + if err := uploadFile(httpClient, adminURL, volumeID, baseKey+uint64(i), 2, payload); err != nil { + t.Fatalf("pre-upload: %v", err) + } + } + } + + uploadBase := baseKey + if !dl.isDownload { + uploadBase = baseKey + uint64(numOps) + } + + latencies := make([]time.Duration, numOps) + for i := 0; i < numOps; i++ { + key := uploadBase + uint64(i) + start := time.Now() + if dl.isDownload { + downloadFile(httpClient, adminURL, volumeID, key, 2) + } else { + uploadFile(httpClient, adminURL, volumeID, key, 2, payload) + } + latencies[i] = time.Since(start) + } + + sortDurations(latencies) + + p50 := latencies[len(latencies)*50/100] + p95 := latencies[len(latencies)*95/100] + p99 := latencies[len(latencies)*99/100] + min := latencies[0] + max := latencies[len(latencies)-1] + + t.Logf("RESULT impl=%-4s test=%-20s n=%-4d min=%-10s p50=%-10s p95=%-10s p99=%-10s max=%-10s", + impl, name, numOps, min.Round(time.Microsecond), p50.Round(time.Microsecond), p95.Round(time.Microsecond), p99.Round(time.Microsecond), max.Round(time.Microsecond)) + }) + } + } +} + +func sortDurations(d []time.Duration) { + sort.Slice(d, func(i, j int) bool { return d[i] < d[j] }) +} + +// TestSustainedP99 runs high-concurrency load for a sustained period (default 60s, +// override with LOADTEST_DURATION=120s) and reports p50/p95/p99/p999 latencies. +// This reveals tail latency differences that short tests miss (GC pauses, lock contention, etc). +// +// Run: +// go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/... +// VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/... +// LOADTEST_DURATION=120s VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/... +func TestSustainedP99(t *testing.T) { + if testing.Short() { + t.Skip("skipping sustained load test in short mode") + } + + duration := 60 * time.Second + if d := os.Getenv("LOADTEST_DURATION"); d != "" { + parsed, err := time.ParseDuration(d) + if err == nil && parsed > 0 { + duration = parsed + } + } + + impl := implName() + adminURL, grpcAddr, cleanup := setupCluster(t) + defer cleanup() + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 128, + MaxConnsPerHost: 128, + }, + } + + type scenario struct { + name string + size int + concurrency int + isDownload bool + } + + scenarios := []scenario{ + {"Upload/1KB/c16", 1 << 10, 16, false}, + {"Upload/64KB/c16", 64 << 10, 16, false}, + {"Download/1KB/c16", 1 << 10, 16, true}, + {"Download/64KB/c16", 64 << 10, 16, true}, + } + + var nextVolID atomic.Uint32 + nextVolID.Store(30) + + for _, sc := range scenarios { + t.Run(fmt.Sprintf("%s/%s", impl, sc.name), func(t *testing.T) { + // Each scenario gets its own volume to avoid filling up + volumeID := nextVolID.Add(1) - 1 + allocateVolume(t, grpcAddr, volumeID) + + payload := makePayload(sc.size) + + // Pre-upload a pool of files for download tests + poolSize := 500 + baseKey := keyCounter.Add(uint64(poolSize*2)) - uint64(poolSize*2) + + if sc.isDownload { + t.Logf("Pre-uploading %d files for download test...", poolSize) + for i := 0; i < poolSize; i++ { + if err := uploadFile(httpClient, adminURL, volumeID, baseKey+uint64(i), 3, payload); err != nil { + t.Fatalf("pre-upload %d: %v", i, err) + } + } + } + + // Collect latencies from all workers + type latencyBucket struct { + mu sync.Mutex + latencies []time.Duration + } + bucket := &latencyBucket{ + latencies: make([]time.Duration, 0, 100000), + } + + var totalOps atomic.Int64 + var totalErrors atomic.Int64 + + deadline := time.Now().Add(duration) + start := time.Now() + + // For uploads, pre-seed the pool so subsequent writes are overwrites (no volume fill) + if !sc.isDownload { + t.Logf("Pre-seeding %d files for upload overwrite test...", poolSize) + for i := 0; i < poolSize; i++ { + if err := uploadFile(httpClient, adminURL, volumeID, baseKey+uint64(i), 3, payload); err != nil { + t.Fatalf("pre-seed %d: %v", i, err) + } + } + } + + var wg sync.WaitGroup + for w := 0; w < sc.concurrency; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + localLats := make([]time.Duration, 0, 8192) + + var i uint64 + for time.Now().Before(deadline) { + // Cycle through the pool to avoid filling up the volume + key := baseKey + uint64(int(i)%poolSize) + + opStart := time.Now() + var err error + if sc.isDownload { + err = downloadFile(httpClient, adminURL, volumeID, key, 3) + } else { + err = uploadFile(httpClient, adminURL, volumeID, key, 3, payload) + } + lat := time.Since(opStart) + + localLats = append(localLats, lat) + totalOps.Add(1) + if err != nil { + totalErrors.Add(1) + } + i++ + + // Flush local buffer periodically + if len(localLats) >= 8192 { + bucket.mu.Lock() + bucket.latencies = append(bucket.latencies, localLats...) + bucket.mu.Unlock() + localLats = localLats[:0] + } + } + // Final flush + if len(localLats) > 0 { + bucket.mu.Lock() + bucket.latencies = append(bucket.latencies, localLats...) + bucket.mu.Unlock() + } + }(w) + } + + wg.Wait() + elapsed := time.Since(start) + + lats := bucket.latencies + n := len(lats) + ops := totalOps.Load() + errs := totalErrors.Load() + opsPerSec := float64(ops) / elapsed.Seconds() + + sortDurations(lats) + + pct := func(p float64) time.Duration { + idx := int(float64(n) * p / 100.0) + if idx >= n { + idx = n - 1 + } + return lats[idx] + } + + t.Logf("RESULT impl=%-4s test=%-22s duration=%-6s ops=%-8d errors=%-4d ops/s=%-10.1f", + impl, sc.name, elapsed.Round(time.Second), ops, errs, opsPerSec) + t.Logf(" p50=%-10s p90=%-10s p95=%-10s p99=%-10s p999=%-10s max=%-10s", + pct(50).Round(time.Microsecond), + pct(90).Round(time.Microsecond), + pct(95).Round(time.Microsecond), + pct(99).Round(time.Microsecond), + pct(99.9).Round(time.Microsecond), + lats[n-1].Round(time.Microsecond)) + }) + } +}