You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
295 lines
8.4 KiB
295 lines
8.4 KiB
// Package main provides a test client for the RDMA engine integration
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"seaweedfs-rdma-sidecar/pkg/rdma"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var (
|
|
socketPath string
|
|
debug bool
|
|
timeout time.Duration
|
|
volumeID uint32
|
|
needleID uint64
|
|
cookie uint32
|
|
offset uint64
|
|
size uint64
|
|
)
|
|
|
|
func main() {
|
|
var rootCmd = &cobra.Command{
|
|
Use: "test-rdma",
|
|
Short: "Test client for SeaweedFS RDMA engine integration",
|
|
Long: `Test client that demonstrates communication between Go sidecar and Rust RDMA engine.
|
|
|
|
This tool allows you to test various RDMA operations including:
|
|
- Engine connectivity and capabilities
|
|
- RDMA read operations with mock data
|
|
- Performance measurements
|
|
- IPC protocol validation`,
|
|
}
|
|
|
|
// Global flags
|
|
defaultSocketPath := os.Getenv("RDMA_SOCKET_PATH")
|
|
if defaultSocketPath == "" {
|
|
defaultSocketPath = "/tmp/rdma-engine.sock"
|
|
}
|
|
rootCmd.PersistentFlags().StringVarP(&socketPath, "socket", "s", defaultSocketPath, "Path to RDMA engine Unix socket (env: RDMA_SOCKET_PATH)")
|
|
rootCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
|
|
rootCmd.PersistentFlags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "Operation timeout")
|
|
|
|
// Subcommands
|
|
rootCmd.AddCommand(pingCmd())
|
|
rootCmd.AddCommand(capsCmd())
|
|
rootCmd.AddCommand(readCmd())
|
|
rootCmd.AddCommand(benchCmd())
|
|
|
|
if err := rootCmd.Execute(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func pingCmd() *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "ping",
|
|
Short: "Test connectivity to RDMA engine",
|
|
Long: "Send a ping message to the RDMA engine and measure latency",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
client := createClient()
|
|
defer client.Disconnect()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
fmt.Printf("🏓 Pinging RDMA engine at %s...\n", socketPath)
|
|
|
|
if err := client.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect: %w", err)
|
|
}
|
|
|
|
latency, err := client.Ping(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("ping failed: %w", err)
|
|
}
|
|
|
|
fmt.Printf("✅ Ping successful! Latency: %v\n", latency)
|
|
return nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func capsCmd() *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "capabilities",
|
|
Short: "Get RDMA engine capabilities",
|
|
Long: "Query the RDMA engine for its current capabilities and status",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
client := createClient()
|
|
defer client.Disconnect()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
fmt.Printf("🔍 Querying RDMA engine capabilities...\n")
|
|
|
|
if err := client.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect: %w", err)
|
|
}
|
|
|
|
caps := client.GetCapabilities()
|
|
if caps == nil {
|
|
return fmt.Errorf("no capabilities received")
|
|
}
|
|
|
|
fmt.Printf("\n📊 RDMA Engine Capabilities:\n")
|
|
fmt.Printf(" Version: %s\n", caps.Version)
|
|
fmt.Printf(" Max Sessions: %d\n", caps.MaxSessions)
|
|
fmt.Printf(" Max Transfer Size: %d bytes (%.1f MB)\n", caps.MaxTransferSize, float64(caps.MaxTransferSize)/(1024*1024))
|
|
fmt.Printf(" Active Sessions: %d\n", caps.ActiveSessions)
|
|
fmt.Printf(" Real RDMA: %t\n", caps.RealRdma)
|
|
fmt.Printf(" Port GID: %s\n", caps.PortGid)
|
|
fmt.Printf(" Port LID: %d\n", caps.PortLid)
|
|
fmt.Printf(" Supported Auth: %v\n", caps.SupportedAuth)
|
|
|
|
if caps.RealRdma {
|
|
fmt.Printf("🚀 Hardware RDMA enabled!\n")
|
|
} else {
|
|
fmt.Printf("🟡 Using mock RDMA (development mode)\n")
|
|
}
|
|
|
|
return nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func readCmd() *cobra.Command {
|
|
cmd := &cobra.Command{
|
|
Use: "read",
|
|
Short: "Test RDMA read operation",
|
|
Long: "Perform a test RDMA read operation with specified parameters",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
client := createClient()
|
|
defer client.Disconnect()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
fmt.Printf("📖 Testing RDMA read operation...\n")
|
|
fmt.Printf(" Volume ID: %d\n", volumeID)
|
|
fmt.Printf(" Needle ID: %d\n", needleID)
|
|
fmt.Printf(" Cookie: 0x%x\n", cookie)
|
|
fmt.Printf(" Offset: %d\n", offset)
|
|
fmt.Printf(" Size: %d bytes\n", size)
|
|
|
|
if err := client.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect: %w", err)
|
|
}
|
|
|
|
start := time.Now()
|
|
resp, err := client.ReadRange(ctx, volumeID, needleID, cookie, offset, size)
|
|
if err != nil {
|
|
return fmt.Errorf("read failed: %w", err)
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
|
|
fmt.Printf("\n✅ RDMA read completed successfully!\n")
|
|
fmt.Printf(" Session ID: %s\n", resp.SessionID)
|
|
fmt.Printf(" Bytes Read: %d\n", resp.BytesRead)
|
|
fmt.Printf(" Duration: %v\n", duration)
|
|
fmt.Printf(" Transfer Rate: %.2f MB/s\n", resp.TransferRate)
|
|
fmt.Printf(" Success: %t\n", resp.Success)
|
|
fmt.Printf(" Message: %s\n", resp.Message)
|
|
|
|
// Show first few bytes of data for verification
|
|
if len(resp.Data) > 0 {
|
|
displayLen := 32
|
|
if len(resp.Data) < displayLen {
|
|
displayLen = len(resp.Data)
|
|
}
|
|
fmt.Printf(" Data (first %d bytes): %x\n", displayLen, resp.Data[:displayLen])
|
|
}
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
cmd.Flags().Uint32VarP(&volumeID, "volume", "v", 1, "Volume ID")
|
|
cmd.Flags().Uint64VarP(&needleID, "needle", "n", 100, "Needle ID")
|
|
cmd.Flags().Uint32VarP(&cookie, "cookie", "c", 0x12345678, "Needle cookie")
|
|
cmd.Flags().Uint64VarP(&offset, "offset", "o", 0, "Read offset")
|
|
cmd.Flags().Uint64VarP(&size, "size", "z", 4096, "Read size in bytes")
|
|
|
|
return cmd
|
|
}
|
|
|
|
func benchCmd() *cobra.Command {
|
|
var (
|
|
iterations int
|
|
readSize uint64
|
|
)
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "bench",
|
|
Short: "Benchmark RDMA read performance",
|
|
Long: "Run multiple RDMA read operations and measure performance statistics",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
client := createClient()
|
|
defer client.Disconnect()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
fmt.Printf("🏁 Starting RDMA read benchmark...\n")
|
|
fmt.Printf(" Iterations: %d\n", iterations)
|
|
fmt.Printf(" Read Size: %d bytes\n", readSize)
|
|
fmt.Printf(" Socket: %s\n", socketPath)
|
|
|
|
if err := client.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect: %w", err)
|
|
}
|
|
|
|
// Warmup
|
|
fmt.Printf("🔥 Warming up...\n")
|
|
for i := 0; i < 5; i++ {
|
|
_, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize)
|
|
if err != nil {
|
|
return fmt.Errorf("warmup read %d failed: %w", i+1, err)
|
|
}
|
|
}
|
|
|
|
// Benchmark
|
|
fmt.Printf("📊 Running benchmark...\n")
|
|
var totalDuration time.Duration
|
|
var totalBytes uint64
|
|
successful := 0
|
|
|
|
startTime := time.Now()
|
|
for i := 0; i < iterations; i++ {
|
|
opStart := time.Now()
|
|
resp, err := client.ReadRange(ctx, 1, uint64(i+1), 0x12345678, 0, readSize)
|
|
opDuration := time.Since(opStart)
|
|
|
|
if err != nil {
|
|
fmt.Printf("❌ Read %d failed: %v\n", i+1, err)
|
|
continue
|
|
}
|
|
|
|
totalDuration += opDuration
|
|
totalBytes += resp.BytesRead
|
|
successful++
|
|
|
|
if (i+1)%10 == 0 || i == iterations-1 {
|
|
fmt.Printf(" Completed %d/%d reads\n", i+1, iterations)
|
|
}
|
|
}
|
|
benchDuration := time.Since(startTime)
|
|
|
|
// Calculate statistics
|
|
avgLatency := totalDuration / time.Duration(successful)
|
|
throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
|
|
opsPerSec := float64(successful) / benchDuration.Seconds()
|
|
|
|
fmt.Printf("\n📈 Benchmark Results:\n")
|
|
fmt.Printf(" Total Duration: %v\n", benchDuration)
|
|
fmt.Printf(" Successful Operations: %d/%d (%.1f%%)\n", successful, iterations, float64(successful)/float64(iterations)*100)
|
|
fmt.Printf(" Total Bytes Transferred: %d (%.1f MB)\n", totalBytes, float64(totalBytes)/(1024*1024))
|
|
fmt.Printf(" Average Latency: %v\n", avgLatency)
|
|
fmt.Printf(" Throughput: %.2f MB/s\n", throughputMBps)
|
|
fmt.Printf(" Operations/sec: %.1f\n", opsPerSec)
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
cmd.Flags().IntVarP(&iterations, "iterations", "i", 100, "Number of read operations")
|
|
cmd.Flags().Uint64VarP(&readSize, "read-size", "r", 4096, "Size of each read in bytes")
|
|
|
|
return cmd
|
|
}
|
|
|
|
func createClient() *rdma.Client {
|
|
logger := logrus.New()
|
|
if debug {
|
|
logger.SetLevel(logrus.DebugLevel)
|
|
} else {
|
|
logger.SetLevel(logrus.InfoLevel)
|
|
}
|
|
|
|
config := &rdma.Config{
|
|
EngineSocketPath: socketPath,
|
|
DefaultTimeout: timeout,
|
|
Logger: logger,
|
|
}
|
|
|
|
return rdma.NewClient(config)
|
|
}
|