From c39d18c1da7150a3c5b1bdf7d1b07a92dc43d806 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 13 Aug 2025 21:59:54 -0700 Subject: [PATCH] setup benchmarking --- .../docker-compose-ec-test.yml | 40 +- .../filer_benchmark/README.md | 144 +++++++ .../admin_integration/filer_benchmark/main.go | 363 ++++++++++++++++++ 3 files changed, 540 insertions(+), 7 deletions(-) create mode 100644 docker/admin_integration/filer_benchmark/README.md create mode 100644 docker/admin_integration/filer_benchmark/main.go diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml index 69858f07b..a2756f6a0 100644 --- a/docker/admin_integration/docker-compose-ec-test.yml +++ b/docker/admin_integration/docker-compose-ec-test.yml @@ -98,16 +98,42 @@ services: networks: - seaweed_net - filer: + filer1: image: chrislusf/seaweedfs:local ports: - "8888:8888" - "18888:18888" - command: "filer -master=master:9333 -ip=filer" + command: "filer -master=master:9333 -ip=filer1" depends_on: - master volumes: - - ./data/filer:/data + - ./data/filer1:/data + networks: + - seaweed_net + + filer2: + image: chrislusf/seaweedfs:local + ports: + - "8889:8888" + - "18889:18888" + command: "filer -master=master:9333 -ip=filer2" + depends_on: + - master + volumes: + - ./data/filer2:/data + networks: + - seaweed_net + + filer3: + image: chrislusf/seaweedfs:local + ports: + - "8890:8888" + - "18890:18888" + command: "filer -master=master:9333 -ip=filer3" + depends_on: + - master + volumes: + - ./data/filer3:/data networks: - seaweed_net @@ -119,7 +145,7 @@ services: command: "-v=2 admin -port=23646 -masters=master:9333 -dataDir=/data" depends_on: - master - - filer + - filer1 volumes: - ./data/admin:/data networks: @@ -180,7 +206,7 @@ services: " depends_on: - master - - filer + - filer1 - admin networks: - seaweed_net @@ -210,7 +236,7 @@ services: depends_on: - master - admin - - filer + - filer1 networks: - seaweed_net @@ -229,7 +255,7 @@ services: depends_on: - master - admin - - filer + - filer1 volumes: - .:/testing working_dir: /testing diff --git a/docker/admin_integration/filer_benchmark/README.md b/docker/admin_integration/filer_benchmark/README.md new file mode 100644 index 000000000..94c8e6ffe --- /dev/null +++ b/docker/admin_integration/filer_benchmark/README.md @@ -0,0 +1,144 @@ +# Filer Benchmark Tool + +A simple Go program to benchmark SeaweedFS filer performance and detect race conditions with concurrent file operations. + +## Overview + +This tool creates 300 (configurable) goroutines that concurrently: +1. Create empty files on the filer +2. Add multiple chunks to each file (with fake file IDs) +3. Verify the file was created successfully + +This simulates the race condition scenario from [Issue #7062](https://github.com/seaweedfs/seaweedfs/issues/7062) where concurrent operations can lead to metadata inconsistencies. + +## Usage + +### Build and Run Directly +```bash +# Build the tool +go build -o bin/filer_benchmark ./cmd/filer_benchmark/ + +# Basic usage (single filer) +./bin/filer_benchmark -filers=localhost:8888 + +# Test with multiple filers +./bin/filer_benchmark -filers=localhost:8888,localhost:8889,localhost:8890 + +# High concurrency race condition test +./bin/filer_benchmark -goroutines=500 -loops=200 -verbose +``` + +### Using Helper Scripts +```bash +# Use the wrapper script with predefined configurations +./scripts/run_filer_benchmark.sh + +# Run example test suite +./examples/run_filer_race_test.sh +``` + +## Configuration Options + +| Flag | Default | Description | +|------|---------|-------------| +| `-filers` | `localhost:8888` | Comma-separated list of filer addresses | +| `-goroutines` | `300` | Number of concurrent goroutines | +| `-loops` | `100` | Number of operations per goroutine | +| `-chunkSize` | `1048576` | Chunk size in bytes (1MB) | +| `-chunksPerFile` | `5` | Number of chunks per file | +| `-testDir` | `/benchmark` | Test directory on filer | +| `-verbose` | `false` | Enable verbose error logging | + +## Race Condition Detection + +The tool detects race conditions by monitoring for these error patterns: +- `leveldb: closed` - Metadata cache closed during operation +- `transport is closing` - gRPC connection closed during operation +- `connection refused` - Network connectivity issues +- `not found after creation` - File disappeared after being created + +## Example Output + +``` +============================================================ +FILER BENCHMARK RESULTS +============================================================ +Configuration: + Filers: localhost:8888,localhost:8889,localhost:8890 + Goroutines: 300 + Loops per goroutine: 100 + Chunks per file: 5 + Chunk size: 1048576 bytes + +Results: + Total operations attempted: 30000 + Files successfully created: 29850 + Total chunks added: 149250 + Errors: 150 + Race condition errors: 23 + Success rate: 99.50% + +Performance: + Total duration: 45.2s + Operations/second: 663.72 + Files/second: 660.18 + Chunks/second: 3300.88 + +Race Condition Analysis: + Race condition rate: 0.0767% + Race conditions detected: 23 + 🟡 MODERATE race condition rate + Overall error rate: 0.50% +============================================================ +``` + +## Test Scenarios + +### 1. Basic Functionality Test +```bash +./bin/filer_benchmark -goroutines=20 -loops=10 +``` +Low concurrency test to verify basic functionality. + +### 2. Race Condition Reproduction +```bash +./bin/filer_benchmark -goroutines=500 -loops=100 -verbose +``` +High concurrency test designed to trigger race conditions. + +### 3. Multi-Filer Load Test +```bash +./bin/filer_benchmark -filers=filer1:8888,filer2:8888,filer3:8888 -goroutines=300 +``` +Distribute load across multiple filers. + +### 4. Small Files Benchmark +```bash +./bin/filer_benchmark -chunkSize=4096 -chunksPerFile=1 -goroutines=1000 +``` +Test with many small files to stress metadata operations. + +## How It Simulates Race Conditions + +1. **Concurrent Operations**: Multiple goroutines perform file operations simultaneously +2. **Random Timing**: Small random delays create timing variations +3. **Fake Chunks**: Uses file IDs without actual volume server data to focus on metadata operations +4. **Verification Step**: Attempts to read files immediately after creation to catch race conditions +5. **Multiple Filers**: Distributes load randomly across multiple filer instances + +## Prerequisites + +- SeaweedFS master server running +- SeaweedFS filer server(s) running +- Go 1.19+ for building +- Network connectivity to filer endpoints + +## Integration with Issue #7062 + +This tool reproduces the core problem from the original issue: +- **Concurrent file operations** (simulated by goroutines) +- **Metadata race conditions** (detected through error patterns) +- **Transport disconnections** (monitored in error analysis) +- **File inconsistencies** (caught by verification steps) + +The key difference is this tool focuses on the filer metadata layer rather than the full CSI driver + mount stack, making it easier to isolate and debug the race condition. diff --git a/docker/admin_integration/filer_benchmark/main.go b/docker/admin_integration/filer_benchmark/main.go new file mode 100644 index 000000000..646b69ab6 --- /dev/null +++ b/docker/admin_integration/filer_benchmark/main.go @@ -0,0 +1,363 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "sync" + "sync/atomic" + "time" + + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +var ( + filers = flag.String("filers", "localhost:8888", "comma-separated list of filer addresses") + workers = flag.Int("workers", 300, "number of concurrent workers") + threadsPerWorker = flag.Int("threadsPerWorker", 4, "number of threads per worker") + concurrentFiles = flag.Int("concurrentFiles", 16, "number of files open concurrently per worker") + filesPerWorker = flag.Int("filesPerWorker", 4096, "total number of files each worker creates") + chunkSize = flag.Int64("chunkSize", 1024*1024, "chunk size in bytes") + chunksPerFile = flag.Int("chunksPerFile", 5, "number of chunks per file") + testDir = flag.String("testDir", "/benchmark", "test directory on filer") + verbose = flag.Bool("verbose", false, "verbose logging") +) + +type BenchmarkStats struct { + filesCreated int64 + chunksAdded int64 + errors int64 + raceConditions int64 + totalDuration time.Duration +} + +type FilerClient struct { + address string + conn *grpc.ClientConn + client filer_pb.SeaweedFilerClient +} + +func main() { + flag.Parse() + + // Configure logging based on verbose flag + if !*verbose { + log.SetFlags(log.LstdFlags) // Minimal logging + } + + filerAddresses := util.StringSplit(*filers, ",") + if len(filerAddresses) == 0 { + log.Fatal("No filer addresses provided") + } + + log.Printf("Starting filer benchmark: %d workers, %d threads each, %d concurrent files, %d files per worker, %d filers", + *workers, *threadsPerWorker, *concurrentFiles, *filesPerWorker, len(filerAddresses)) + + // Create filer clients + clients, err := createFilerClients(filerAddresses) + if err != nil { + log.Fatalf("Failed to create filer clients: %v", err) + } + defer closeFilerClients(clients) + + // Ensure test directory exists + if err := ensureDirectory(clients[0], *testDir); err != nil { + log.Fatalf("Failed to create test directory: %v", err) + } + + // Run benchmark + stats := runBenchmark(clients) + + // Print results + printResults(stats) +} + +func createFilerClients(addresses []string) ([]*FilerClient, error) { + var clients []*FilerClient + + util.LoadSecurityConfiguration() + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + for _, addr := range addresses { + conn, err := pb.GrpcDial(context.Background(), addr, true, grpcDialOption) + if err != nil { + return nil, fmt.Errorf("failed to connect to %s: %v", addr, err) + } + + client := &FilerClient{ + address: addr, + conn: conn, + client: filer_pb.NewSeaweedFilerClient(conn), + } + clients = append(clients, client) + } + + return clients, nil +} + +func closeFilerClients(clients []*FilerClient) { + for _, client := range clients { + client.conn.Close() + } +} + +func ensureDirectory(client *FilerClient, dir string) error { + _, err := client.client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{ + Name: dir[1:], // Remove leading slash + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: 0755, + }, + }, + OExcl: false, + }) + return err +} + +func runBenchmark(clients []*FilerClient) *BenchmarkStats { + stats := &BenchmarkStats{} + var wg sync.WaitGroup + startTime := time.Now() + + // Start workers + for i := 0; i < *workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + runWorker(workerID, clients, stats) + }(i) + } + + // Wait for completion + wg.Wait() + stats.totalDuration = time.Since(startTime) + + return stats +} + +func runWorker(workerID int, clients []*FilerClient, stats *BenchmarkStats) { + // Create work queue with concurrency limit + workQueue := make(chan int, *concurrentFiles) + var workerWg sync.WaitGroup + + // Start threads for this worker + for threadID := 0; threadID < *threadsPerWorker; threadID++ { + workerWg.Add(1) + go func(tID int) { + defer workerWg.Done() + runWorkerThread(workerID, tID, clients, stats, workQueue) + }(threadID) + } + + // Queue up all the file creation tasks + go func() { + defer close(workQueue) + for fileID := 0; fileID < *filesPerWorker; fileID++ { + workQueue <- fileID + } + }() + + // Wait for all threads in this worker to complete + workerWg.Wait() +} + +func runWorkerThread(workerID, threadID int, clients []*FilerClient, stats *BenchmarkStats, workQueue <-chan int) { + for fileID := range workQueue { + // Select random filer client + client := clients[rand.Intn(len(clients))] + + // Create unique filename + filename := fmt.Sprintf("file_%d_%d_%d_%d", workerID, threadID, fileID, time.Now().UnixNano()) + + if err := createFileWithChunks(client, filename, stats); err != nil { + atomic.AddInt64(&stats.errors, 1) + if isRaceConditionError(err) { + atomic.AddInt64(&stats.raceConditions, 1) + } + if *verbose { + log.Printf("Worker %d Thread %d error: %v", workerID, threadID, err) + } + } else { + atomic.AddInt64(&stats.filesCreated, 1) + } + + // Small random delay to create timing variations + if rand.Intn(10) == 0 { + time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) + } + } +} + +func createFileWithChunks(client *FilerClient, filename string, stats *BenchmarkStats) error { + ctx := context.Background() + + // Step 1: Create empty file + entry := &filer_pb.Entry{ + Name: filename, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: 0644, + FileSize: 0, + }, + Chunks: []*filer_pb.FileChunk{}, + } + + _, err := client.client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: *testDir, + Entry: entry, + }) + if err != nil { + return fmt.Errorf("create entry failed: %v", err) + } + + // Step 2: Add chunks to the file + var chunks []*filer_pb.FileChunk + var offset int64 = 0 + + for i := 0; i < *chunksPerFile; i++ { + chunk := &filer_pb.FileChunk{ + FileId: generateFakeFileId(), + Offset: offset, + Size: uint64(*chunkSize), + ModifiedTsNs: time.Now().UnixNano(), + ETag: generateETag(), + } + chunks = append(chunks, chunk) + offset += *chunkSize + atomic.AddInt64(&stats.chunksAdded, 1) + } + + // Update file with chunks + entry.Chunks = chunks + entry.Attributes.FileSize = uint64(offset) + + _, err = client.client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: *testDir, + Entry: entry, + }) + if err != nil { + return fmt.Errorf("update entry with chunks failed: %v", err) + } + + // Step 3: Verify file was created properly (this may catch race conditions) + _, err = client.client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: *testDir, + Name: filename, + }) + if err != nil { + return fmt.Errorf("lookup after creation failed (race condition?): %v", err) + } + + return nil +} + +func generateFakeFileId() string { + // Generate fake file ID that looks real but doesn't exist on volume servers + volumeId := rand.Intn(100) + 1 + fileKey := rand.Int63() + cookie := rand.Uint32() + return fmt.Sprintf("%d,%x%08x", volumeId, fileKey, cookie) +} + +func generateETag() string { + // Generate fake ETag + return fmt.Sprintf("%x", rand.Int63()) +} + +func isRaceConditionError(err error) bool { + errStr := err.Error() + return strings.Contains(errStr, "leveldb: closed") || + strings.Contains(errStr, "transport is closing") || + strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "not found") && strings.Contains(errStr, "after creation") +} + +func printResults(stats *BenchmarkStats) { + fmt.Println("\n" + strings.Repeat("=", 60)) + fmt.Println("FILER BENCHMARK RESULTS") + fmt.Println(strings.Repeat("=", 60)) + + totalOps := int64(*workers) * int64(*filesPerWorker) + successRate := float64(stats.filesCreated) / float64(totalOps) * 100 + + fmt.Printf("Configuration:\n") + fmt.Printf(" Filers: %s\n", *filers) + fmt.Printf(" Workers: %d\n", *workers) + fmt.Printf(" Threads per worker: %d\n", *threadsPerWorker) + fmt.Printf(" Concurrent files per worker: %d\n", *concurrentFiles) + fmt.Printf(" Files per worker: %d\n", *filesPerWorker) + fmt.Printf(" Total threads: %d\n", *workers**threadsPerWorker) + fmt.Printf(" Chunks per file: %d\n", *chunksPerFile) + fmt.Printf(" Chunk size: %d bytes\n", *chunkSize) + fmt.Printf("\n") + + fmt.Printf("Results:\n") + fmt.Printf(" Total operations attempted: %d\n", totalOps) + fmt.Printf(" Files successfully created: %d\n", stats.filesCreated) + fmt.Printf(" Total chunks added: %d\n", stats.chunksAdded) + fmt.Printf(" Errors: %d\n", stats.errors) + fmt.Printf(" Race condition errors: %d\n", stats.raceConditions) + fmt.Printf(" Success rate: %.2f%%\n", successRate) + fmt.Printf("\n") + + fmt.Printf("Performance:\n") + fmt.Printf(" Total duration: %v\n", stats.totalDuration) + fmt.Printf(" Operations/second: %.2f\n", float64(totalOps)/stats.totalDuration.Seconds()) + fmt.Printf(" Files/second: %.2f\n", float64(stats.filesCreated)/stats.totalDuration.Seconds()) + fmt.Printf(" Chunks/second: %.2f\n", float64(stats.chunksAdded)/stats.totalDuration.Seconds()) + fmt.Printf("\n") + + // Race condition analysis + fmt.Printf("Race Condition Analysis:\n") + if stats.raceConditions > 0 { + raceRate := float64(stats.raceConditions) / float64(totalOps) * 100 + fmt.Printf(" Race condition rate: %.4f%%\n", raceRate) + fmt.Printf(" Race conditions detected: %d\n", stats.raceConditions) + + if raceRate > 1.0 { + fmt.Printf(" 🔴 HIGH race condition rate detected!\n") + } else if raceRate > 0.1 { + fmt.Printf(" 🟡 MODERATE race condition rate\n") + } else { + fmt.Printf(" 🟢 LOW race condition rate\n") + } + } else { + fmt.Printf(" No race conditions detected\n") + if stats.errors == 0 { + fmt.Printf(" 🟢 All operations completed successfully\n") + } + } + + if stats.errors > 0 { + errorRate := float64(stats.errors) / float64(totalOps) * 100 + fmt.Printf(" Overall error rate: %.2f%%\n", errorRate) + } + + fmt.Println(strings.Repeat("=", 60)) + + // Recommendations + if stats.raceConditions > 0 || stats.errors > totalOps/10 { + fmt.Println("\nRecommendations:") + if stats.raceConditions > 0 { + fmt.Println(" • Race conditions detected - investigate filer concurrent access handling") + fmt.Println(" • Check filer logs for 'leveldb: closed' or 'transport is closing' errors") + } + if stats.errors > totalOps/20 { + fmt.Println(" • High error rate - check filer stability and resource limits") + } + fmt.Println(" • Consider running with -verbose flag for detailed error analysis") + } +}