You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
363 lines
10 KiB
363 lines
10 KiB
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
filers = flag.String("filers", "localhost:8888", "comma-separated list of filer addresses")
|
|
workers = flag.Int("workers", 300, "number of concurrent workers")
|
|
threadsPerWorker = flag.Int("threadsPerWorker", 4, "number of threads per worker")
|
|
concurrentFiles = flag.Int("concurrentFiles", 16, "number of files open concurrently per worker")
|
|
filesPerWorker = flag.Int("filesPerWorker", 4096, "total number of files each worker creates")
|
|
chunkSize = flag.Int64("chunkSize", 1024*1024, "chunk size in bytes")
|
|
chunksPerFile = flag.Int("chunksPerFile", 5, "number of chunks per file")
|
|
testDir = flag.String("testDir", "/benchmark", "test directory on filer")
|
|
verbose = flag.Bool("verbose", false, "verbose logging")
|
|
)
|
|
|
|
type BenchmarkStats struct {
|
|
filesCreated int64
|
|
chunksAdded int64
|
|
errors int64
|
|
raceConditions int64
|
|
totalDuration time.Duration
|
|
}
|
|
|
|
type FilerClient struct {
|
|
address string
|
|
conn *grpc.ClientConn
|
|
client filer_pb.SeaweedFilerClient
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
// Configure logging based on verbose flag
|
|
if !*verbose {
|
|
log.SetFlags(log.LstdFlags) // Minimal logging
|
|
}
|
|
|
|
filerAddresses := util.StringSplit(*filers, ",")
|
|
if len(filerAddresses) == 0 {
|
|
log.Fatal("No filer addresses provided")
|
|
}
|
|
|
|
log.Printf("Starting filer benchmark: %d workers, %d threads each, %d concurrent files, %d files per worker, %d filers",
|
|
*workers, *threadsPerWorker, *concurrentFiles, *filesPerWorker, len(filerAddresses))
|
|
|
|
// Create filer clients
|
|
clients, err := createFilerClients(filerAddresses)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create filer clients: %v", err)
|
|
}
|
|
defer closeFilerClients(clients)
|
|
|
|
// Ensure test directory exists
|
|
if err := ensureDirectory(clients[0], *testDir); err != nil {
|
|
log.Fatalf("Failed to create test directory: %v", err)
|
|
}
|
|
|
|
// Run benchmark
|
|
stats := runBenchmark(clients)
|
|
|
|
// Print results
|
|
printResults(stats)
|
|
}
|
|
|
|
func createFilerClients(addresses []string) ([]*FilerClient, error) {
|
|
var clients []*FilerClient
|
|
|
|
util.LoadSecurityConfiguration()
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
for _, addr := range addresses {
|
|
conn, err := pb.GrpcDial(context.Background(), addr, true, grpcDialOption)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to %s: %v", addr, err)
|
|
}
|
|
|
|
client := &FilerClient{
|
|
address: addr,
|
|
conn: conn,
|
|
client: filer_pb.NewSeaweedFilerClient(conn),
|
|
}
|
|
clients = append(clients, client)
|
|
}
|
|
|
|
return clients, nil
|
|
}
|
|
|
|
func closeFilerClients(clients []*FilerClient) {
|
|
for _, client := range clients {
|
|
client.conn.Close()
|
|
}
|
|
}
|
|
|
|
func ensureDirectory(client *FilerClient, dir string) error {
|
|
_, err := client.client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
|
|
Directory: "/",
|
|
Entry: &filer_pb.Entry{
|
|
Name: dir[1:], // Remove leading slash
|
|
IsDirectory: true,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: 0755,
|
|
},
|
|
},
|
|
OExcl: false,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func runBenchmark(clients []*FilerClient) *BenchmarkStats {
|
|
stats := &BenchmarkStats{}
|
|
var wg sync.WaitGroup
|
|
startTime := time.Now()
|
|
|
|
// Start workers
|
|
for i := 0; i < *workers; i++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
runWorker(workerID, clients, stats)
|
|
}(i)
|
|
}
|
|
|
|
// Wait for completion
|
|
wg.Wait()
|
|
stats.totalDuration = time.Since(startTime)
|
|
|
|
return stats
|
|
}
|
|
|
|
func runWorker(workerID int, clients []*FilerClient, stats *BenchmarkStats) {
|
|
// Create work queue with concurrency limit
|
|
workQueue := make(chan int, *concurrentFiles)
|
|
var workerWg sync.WaitGroup
|
|
|
|
// Start threads for this worker
|
|
for threadID := 0; threadID < *threadsPerWorker; threadID++ {
|
|
workerWg.Add(1)
|
|
go func(tID int) {
|
|
defer workerWg.Done()
|
|
runWorkerThread(workerID, tID, clients, stats, workQueue)
|
|
}(threadID)
|
|
}
|
|
|
|
// Queue up all the file creation tasks
|
|
go func() {
|
|
defer close(workQueue)
|
|
for fileID := 0; fileID < *filesPerWorker; fileID++ {
|
|
workQueue <- fileID
|
|
}
|
|
}()
|
|
|
|
// Wait for all threads in this worker to complete
|
|
workerWg.Wait()
|
|
}
|
|
|
|
func runWorkerThread(workerID, threadID int, clients []*FilerClient, stats *BenchmarkStats, workQueue <-chan int) {
|
|
for fileID := range workQueue {
|
|
// Select random filer client
|
|
client := clients[rand.Intn(len(clients))]
|
|
|
|
// Create unique filename
|
|
filename := fmt.Sprintf("file_%d_%d_%d_%d", workerID, threadID, fileID, time.Now().UnixNano())
|
|
|
|
if err := createFileWithChunks(client, filename, stats); err != nil {
|
|
atomic.AddInt64(&stats.errors, 1)
|
|
if isRaceConditionError(err) {
|
|
atomic.AddInt64(&stats.raceConditions, 1)
|
|
}
|
|
if *verbose {
|
|
log.Printf("Worker %d Thread %d error: %v", workerID, threadID, err)
|
|
}
|
|
} else {
|
|
atomic.AddInt64(&stats.filesCreated, 1)
|
|
}
|
|
|
|
// Small random delay to create timing variations
|
|
if rand.Intn(10) == 0 {
|
|
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
func createFileWithChunks(client *FilerClient, filename string, stats *BenchmarkStats) error {
|
|
ctx := context.Background()
|
|
|
|
// Step 1: Create empty file
|
|
entry := &filer_pb.Entry{
|
|
Name: filename,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: 0644,
|
|
FileSize: 0,
|
|
},
|
|
Chunks: []*filer_pb.FileChunk{},
|
|
}
|
|
|
|
_, err := client.client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
|
|
Directory: *testDir,
|
|
Entry: entry,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create entry failed: %v", err)
|
|
}
|
|
|
|
// Step 2: Add chunks to the file
|
|
var chunks []*filer_pb.FileChunk
|
|
var offset int64 = 0
|
|
|
|
for i := 0; i < *chunksPerFile; i++ {
|
|
chunk := &filer_pb.FileChunk{
|
|
FileId: generateFakeFileId(),
|
|
Offset: offset,
|
|
Size: uint64(*chunkSize),
|
|
ModifiedTsNs: time.Now().UnixNano(),
|
|
ETag: generateETag(),
|
|
}
|
|
chunks = append(chunks, chunk)
|
|
offset += *chunkSize
|
|
atomic.AddInt64(&stats.chunksAdded, 1)
|
|
}
|
|
|
|
// Update file with chunks
|
|
entry.Chunks = chunks
|
|
entry.Attributes.FileSize = uint64(offset)
|
|
|
|
_, err = client.client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
|
Directory: *testDir,
|
|
Entry: entry,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("update entry with chunks failed: %v", err)
|
|
}
|
|
|
|
// Step 3: Verify file was created properly (this may catch race conditions)
|
|
_, err = client.client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: *testDir,
|
|
Name: filename,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("lookup after creation failed (race condition?): %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func generateFakeFileId() string {
|
|
// Generate fake file ID that looks real but doesn't exist on volume servers
|
|
volumeId := rand.Intn(100) + 1
|
|
fileKey := rand.Int63()
|
|
cookie := rand.Uint32()
|
|
return fmt.Sprintf("%d,%x%08x", volumeId, fileKey, cookie)
|
|
}
|
|
|
|
func generateETag() string {
|
|
// Generate fake ETag
|
|
return fmt.Sprintf("%x", rand.Int63())
|
|
}
|
|
|
|
func isRaceConditionError(err error) bool {
|
|
errStr := err.Error()
|
|
return strings.Contains(errStr, "leveldb: closed") ||
|
|
strings.Contains(errStr, "transport is closing") ||
|
|
strings.Contains(errStr, "connection refused") ||
|
|
strings.Contains(errStr, "not found") && strings.Contains(errStr, "after creation")
|
|
}
|
|
|
|
func printResults(stats *BenchmarkStats) {
|
|
fmt.Println("\n" + strings.Repeat("=", 60))
|
|
fmt.Println("FILER BENCHMARK RESULTS")
|
|
fmt.Println(strings.Repeat("=", 60))
|
|
|
|
totalOps := int64(*workers) * int64(*filesPerWorker)
|
|
successRate := float64(stats.filesCreated) / float64(totalOps) * 100
|
|
|
|
fmt.Printf("Configuration:\n")
|
|
fmt.Printf(" Filers: %s\n", *filers)
|
|
fmt.Printf(" Workers: %d\n", *workers)
|
|
fmt.Printf(" Threads per worker: %d\n", *threadsPerWorker)
|
|
fmt.Printf(" Concurrent files per worker: %d\n", *concurrentFiles)
|
|
fmt.Printf(" Files per worker: %d\n", *filesPerWorker)
|
|
fmt.Printf(" Total threads: %d\n", *workers**threadsPerWorker)
|
|
fmt.Printf(" Chunks per file: %d\n", *chunksPerFile)
|
|
fmt.Printf(" Chunk size: %d bytes\n", *chunkSize)
|
|
fmt.Printf("\n")
|
|
|
|
fmt.Printf("Results:\n")
|
|
fmt.Printf(" Total operations attempted: %d\n", totalOps)
|
|
fmt.Printf(" Files successfully created: %d\n", stats.filesCreated)
|
|
fmt.Printf(" Total chunks added: %d\n", stats.chunksAdded)
|
|
fmt.Printf(" Errors: %d\n", stats.errors)
|
|
fmt.Printf(" Race condition errors: %d\n", stats.raceConditions)
|
|
fmt.Printf(" Success rate: %.2f%%\n", successRate)
|
|
fmt.Printf("\n")
|
|
|
|
fmt.Printf("Performance:\n")
|
|
fmt.Printf(" Total duration: %v\n", stats.totalDuration)
|
|
fmt.Printf(" Operations/second: %.2f\n", float64(totalOps)/stats.totalDuration.Seconds())
|
|
fmt.Printf(" Files/second: %.2f\n", float64(stats.filesCreated)/stats.totalDuration.Seconds())
|
|
fmt.Printf(" Chunks/second: %.2f\n", float64(stats.chunksAdded)/stats.totalDuration.Seconds())
|
|
fmt.Printf("\n")
|
|
|
|
// Race condition analysis
|
|
fmt.Printf("Race Condition Analysis:\n")
|
|
if stats.raceConditions > 0 {
|
|
raceRate := float64(stats.raceConditions) / float64(totalOps) * 100
|
|
fmt.Printf(" Race condition rate: %.4f%%\n", raceRate)
|
|
fmt.Printf(" Race conditions detected: %d\n", stats.raceConditions)
|
|
|
|
if raceRate > 1.0 {
|
|
fmt.Printf(" 🔴 HIGH race condition rate detected!\n")
|
|
} else if raceRate > 0.1 {
|
|
fmt.Printf(" 🟡 MODERATE race condition rate\n")
|
|
} else {
|
|
fmt.Printf(" 🟢 LOW race condition rate\n")
|
|
}
|
|
} else {
|
|
fmt.Printf(" No race conditions detected\n")
|
|
if stats.errors == 0 {
|
|
fmt.Printf(" 🟢 All operations completed successfully\n")
|
|
}
|
|
}
|
|
|
|
if stats.errors > 0 {
|
|
errorRate := float64(stats.errors) / float64(totalOps) * 100
|
|
fmt.Printf(" Overall error rate: %.2f%%\n", errorRate)
|
|
}
|
|
|
|
fmt.Println(strings.Repeat("=", 60))
|
|
|
|
// Recommendations
|
|
if stats.raceConditions > 0 || stats.errors > totalOps/10 {
|
|
fmt.Println("\nRecommendations:")
|
|
if stats.raceConditions > 0 {
|
|
fmt.Println(" • Race conditions detected - investigate filer concurrent access handling")
|
|
fmt.Println(" • Check filer logs for 'leveldb: closed' or 'transport is closing' errors")
|
|
}
|
|
if stats.errors > totalOps/20 {
|
|
fmt.Println(" • High error rate - check filer stability and resource limits")
|
|
}
|
|
fmt.Println(" • Consider running with -verbose flag for detailed error analysis")
|
|
}
|
|
}
|