You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
663 lines
20 KiB
663 lines
20 KiB
// Package main provides a demonstration server showing SeaweedFS RDMA integration
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"seaweedfs-rdma-sidecar/pkg/seaweedfs"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var (
|
|
port int
|
|
rdmaSocket string
|
|
volumeServerURL string
|
|
enableRDMA bool
|
|
enableZeroCopy bool
|
|
tempDir string
|
|
enablePooling bool
|
|
maxConnections int
|
|
maxIdleTime time.Duration
|
|
debug bool
|
|
)
|
|
|
|
func main() {
|
|
var rootCmd = &cobra.Command{
|
|
Use: "demo-server",
|
|
Short: "SeaweedFS RDMA integration demonstration server",
|
|
Long: `Demonstration server that shows how SeaweedFS can integrate with the RDMA sidecar
|
|
for accelerated read operations. This server provides HTTP endpoints that demonstrate
|
|
the RDMA fast path with HTTP fallback capabilities.`,
|
|
RunE: runServer,
|
|
}
|
|
|
|
rootCmd.Flags().IntVarP(&port, "port", "p", 8080, "Demo server HTTP port")
|
|
rootCmd.Flags().StringVarP(&rdmaSocket, "rdma-socket", "r", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
|
|
rootCmd.Flags().StringVarP(&volumeServerURL, "volume-server", "v", "http://localhost:8080", "SeaweedFS volume server URL for HTTP fallback")
|
|
rootCmd.Flags().BoolVarP(&enableRDMA, "enable-rdma", "e", true, "Enable RDMA acceleration")
|
|
rootCmd.Flags().BoolVarP(&enableZeroCopy, "enable-zerocopy", "z", true, "Enable zero-copy optimization via temp files")
|
|
rootCmd.Flags().StringVarP(&tempDir, "temp-dir", "t", "/tmp/rdma-cache", "Temp directory for zero-copy files")
|
|
rootCmd.Flags().BoolVar(&enablePooling, "enable-pooling", true, "Enable RDMA connection pooling")
|
|
rootCmd.Flags().IntVar(&maxConnections, "max-connections", 10, "Maximum connections in RDMA pool")
|
|
rootCmd.Flags().DurationVar(&maxIdleTime, "max-idle-time", 5*time.Minute, "Maximum idle time for pooled connections")
|
|
rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
|
|
|
|
if err := rootCmd.Execute(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func runServer(cmd *cobra.Command, args []string) error {
|
|
// Setup logging
|
|
logger := logrus.New()
|
|
if debug {
|
|
logger.SetLevel(logrus.DebugLevel)
|
|
logger.SetFormatter(&logrus.TextFormatter{
|
|
FullTimestamp: true,
|
|
ForceColors: true,
|
|
})
|
|
} else {
|
|
logger.SetLevel(logrus.InfoLevel)
|
|
}
|
|
|
|
logger.WithFields(logrus.Fields{
|
|
"port": port,
|
|
"rdma_socket": rdmaSocket,
|
|
"volume_server_url": volumeServerURL,
|
|
"enable_rdma": enableRDMA,
|
|
"enable_zerocopy": enableZeroCopy,
|
|
"temp_dir": tempDir,
|
|
"enable_pooling": enablePooling,
|
|
"max_connections": maxConnections,
|
|
"max_idle_time": maxIdleTime,
|
|
"debug": debug,
|
|
}).Info("🚀 Starting SeaweedFS RDMA Demo Server")
|
|
|
|
// Create SeaweedFS RDMA client
|
|
config := &seaweedfs.Config{
|
|
RDMASocketPath: rdmaSocket,
|
|
VolumeServerURL: volumeServerURL,
|
|
Enabled: enableRDMA,
|
|
DefaultTimeout: 30 * time.Second,
|
|
Logger: logger,
|
|
TempDir: tempDir,
|
|
UseZeroCopy: enableZeroCopy,
|
|
EnablePooling: enablePooling,
|
|
MaxConnections: maxConnections,
|
|
MaxIdleTime: maxIdleTime,
|
|
}
|
|
|
|
rdmaClient, err := seaweedfs.NewSeaweedFSRDMAClient(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create RDMA client: %w", err)
|
|
}
|
|
|
|
// Start RDMA client
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
if err := rdmaClient.Start(ctx); err != nil {
|
|
logger.WithError(err).Error("Failed to start RDMA client")
|
|
}
|
|
cancel()
|
|
|
|
// Create demo server
|
|
server := &DemoServer{
|
|
rdmaClient: rdmaClient,
|
|
logger: logger,
|
|
}
|
|
|
|
// Setup HTTP routes
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/", server.homeHandler)
|
|
mux.HandleFunc("/health", server.healthHandler)
|
|
mux.HandleFunc("/stats", server.statsHandler)
|
|
mux.HandleFunc("/read", server.readHandler)
|
|
mux.HandleFunc("/benchmark", server.benchmarkHandler)
|
|
mux.HandleFunc("/cleanup", server.cleanupHandler)
|
|
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", port),
|
|
Handler: mux,
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
logger.WithField("port", port).Info("🌐 Demo server starting")
|
|
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.WithError(err).Fatal("HTTP server failed")
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal
|
|
<-sigChan
|
|
logger.Info("📡 Received shutdown signal, gracefully shutting down...")
|
|
|
|
// Shutdown HTTP server
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer shutdownCancel()
|
|
|
|
if err := httpServer.Shutdown(shutdownCtx); err != nil {
|
|
logger.WithError(err).Error("HTTP server shutdown failed")
|
|
} else {
|
|
logger.Info("🌐 HTTP server shutdown complete")
|
|
}
|
|
|
|
// Stop RDMA client
|
|
rdmaClient.Stop()
|
|
logger.Info("🛑 Demo server shutdown complete")
|
|
|
|
return nil
|
|
}
|
|
|
|
// DemoServer demonstrates SeaweedFS RDMA integration
|
|
type DemoServer struct {
|
|
rdmaClient *seaweedfs.SeaweedFSRDMAClient
|
|
logger *logrus.Logger
|
|
}
|
|
|
|
// homeHandler provides information about the demo server
|
|
func (s *DemoServer) homeHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/html")
|
|
fmt.Fprintf(w, `<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>SeaweedFS RDMA Demo Server</title>
|
|
<style>
|
|
body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }
|
|
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
|
|
h1 { color: #2c3e50; }
|
|
.endpoint { margin: 20px 0; padding: 15px; background: #ecf0f1; border-radius: 4px; }
|
|
.endpoint h3 { margin: 0 0 10px 0; color: #34495e; }
|
|
.endpoint a { color: #3498db; text-decoration: none; }
|
|
.endpoint a:hover { text-decoration: underline; }
|
|
.status { padding: 10px; border-radius: 4px; margin: 10px 0; }
|
|
.status.enabled { background: #d5f4e6; color: #27ae60; }
|
|
.status.disabled { background: #fadbd8; color: #e74c3c; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<h1>🚀 SeaweedFS RDMA Demo Server</h1>
|
|
<p>This server demonstrates SeaweedFS integration with RDMA acceleration for high-performance reads.</p>
|
|
|
|
<div class="status %s">
|
|
<strong>RDMA Status:</strong> %s
|
|
</div>
|
|
|
|
<h2>📋 Available Endpoints</h2>
|
|
|
|
<div class="endpoint">
|
|
<h3>🏥 Health Check</h3>
|
|
<p><a href="/health">/health</a> - Check server and RDMA engine health</p>
|
|
</div>
|
|
|
|
<div class="endpoint">
|
|
<h3>📊 Statistics</h3>
|
|
<p><a href="/stats">/stats</a> - Get RDMA client statistics and capabilities</p>
|
|
</div>
|
|
|
|
<div class="endpoint">
|
|
<h3>📖 Read Needle</h3>
|
|
<p><a href="/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080">/read</a> - Read a needle with RDMA fast path</p>
|
|
<p><strong>Parameters:</strong> file_id OR (volume, needle, cookie), volume_server, offset (optional), size (optional)</p>
|
|
</div>
|
|
|
|
<div class="endpoint">
|
|
<h3>🏁 Benchmark</h3>
|
|
<p><a href="/benchmark?iterations=10&size=4096">/benchmark</a> - Run performance benchmark</p>
|
|
<p><strong>Parameters:</strong> iterations (default: 10), size (default: 4096)</p>
|
|
</div>
|
|
|
|
<h2>📝 Example Usage</h2>
|
|
<pre>
|
|
# Read a needle using file ID (recommended)
|
|
curl "http://localhost:%d/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080"
|
|
|
|
# Read a needle using individual parameters (legacy)
|
|
curl "http://localhost:%d/read?volume=1&needle=12345&cookie=305419896&size=1024&volume_server=http://localhost:8080"
|
|
|
|
# Read a needle (hex cookie)
|
|
curl "http://localhost:%d/read?volume=1&needle=12345&cookie=0x12345678&size=1024&volume_server=http://localhost:8080"
|
|
|
|
# Run benchmark
|
|
curl "http://localhost:%d/benchmark?iterations=5&size=2048"
|
|
|
|
# Check health
|
|
curl "http://localhost:%d/health"
|
|
</pre>
|
|
</div>
|
|
</body>
|
|
</html>`,
|
|
map[bool]string{true: "enabled", false: "disabled"}[s.rdmaClient.IsEnabled()],
|
|
map[bool]string{true: "RDMA Enabled ✅", false: "RDMA Disabled (HTTP Fallback Only) ⚠️"}[s.rdmaClient.IsEnabled()],
|
|
port, port, port, port)
|
|
}
|
|
|
|
// healthHandler checks server and RDMA health
|
|
func (s *DemoServer) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
health := map[string]interface{}{
|
|
"status": "healthy",
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
"rdma": map[string]interface{}{
|
|
"enabled": false,
|
|
"connected": false,
|
|
},
|
|
}
|
|
|
|
if s.rdmaClient != nil {
|
|
health["rdma"].(map[string]interface{})["enabled"] = s.rdmaClient.IsEnabled()
|
|
health["rdma"].(map[string]interface{})["type"] = "local"
|
|
|
|
if s.rdmaClient.IsEnabled() {
|
|
if err := s.rdmaClient.HealthCheck(ctx); err != nil {
|
|
s.logger.WithError(err).Warn("RDMA health check failed")
|
|
health["rdma"].(map[string]interface{})["error"] = err.Error()
|
|
} else {
|
|
health["rdma"].(map[string]interface{})["connected"] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(health)
|
|
}
|
|
|
|
// statsHandler returns RDMA statistics
|
|
func (s *DemoServer) statsHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var stats map[string]interface{}
|
|
|
|
if s.rdmaClient != nil {
|
|
stats = s.rdmaClient.GetStats()
|
|
stats["client_type"] = "local"
|
|
} else {
|
|
stats = map[string]interface{}{
|
|
"client_type": "none",
|
|
"error": "no RDMA client available",
|
|
}
|
|
}
|
|
|
|
stats["timestamp"] = time.Now().Format(time.RFC3339)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
// readHandler demonstrates needle reading with RDMA
|
|
func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// Parse parameters - support both file_id and individual parameters for backward compatibility
|
|
query := r.URL.Query()
|
|
volumeServer := query.Get("volume_server")
|
|
fileID := query.Get("file_id")
|
|
|
|
var volumeID, cookie uint64
|
|
var needleID uint64
|
|
var err error
|
|
|
|
if fileID != "" {
|
|
// Use file ID format (e.g., "3,01637037d6")
|
|
// Extract individual components using existing SeaweedFS parsing
|
|
fid, parseErr := needle.ParseFileIdFromString(fileID)
|
|
if parseErr != nil {
|
|
http.Error(w, fmt.Sprintf("invalid 'file_id' parameter: %v", parseErr), http.StatusBadRequest)
|
|
return
|
|
}
|
|
volumeID = uint64(fid.VolumeId)
|
|
needleID = uint64(fid.Key)
|
|
cookie = uint64(fid.Cookie)
|
|
} else {
|
|
// Use individual parameters (backward compatibility)
|
|
volumeID, err = strconv.ParseUint(query.Get("volume"), 10, 32)
|
|
if err != nil {
|
|
http.Error(w, "invalid 'volume' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
needleID, err = strconv.ParseUint(query.Get("needle"), 10, 64)
|
|
if err != nil {
|
|
http.Error(w, "invalid 'needle' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Parse cookie parameter - support both decimal and hexadecimal formats
|
|
cookieStr := query.Get("cookie")
|
|
if strings.HasPrefix(strings.ToLower(cookieStr), "0x") {
|
|
// Parse as hexadecimal (remove "0x" prefix)
|
|
cookie, err = strconv.ParseUint(cookieStr[2:], 16, 32)
|
|
} else {
|
|
// Parse as decimal (default)
|
|
cookie, err = strconv.ParseUint(cookieStr, 10, 32)
|
|
}
|
|
if err != nil {
|
|
http.Error(w, "invalid 'cookie' parameter (expected decimal or hex with 0x prefix)", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
var offset uint64
|
|
if offsetStr := query.Get("offset"); offsetStr != "" {
|
|
var parseErr error
|
|
offset, parseErr = strconv.ParseUint(offsetStr, 10, 64)
|
|
if parseErr != nil {
|
|
http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
var size uint64
|
|
if sizeStr := query.Get("size"); sizeStr != "" {
|
|
var parseErr error
|
|
size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
|
|
if parseErr != nil {
|
|
http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
if volumeServer == "" {
|
|
http.Error(w, "volume_server parameter is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if volumeID == 0 || needleID == 0 {
|
|
http.Error(w, "volume and needle parameters are required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Note: cookie and size can have defaults for demo purposes when user provides empty values,
|
|
// but invalid parsing is caught above with proper error responses
|
|
if cookie == 0 {
|
|
cookie = 0x12345678 // Default cookie for demo
|
|
}
|
|
|
|
if size == 0 {
|
|
size = 4096 // Default size
|
|
}
|
|
|
|
logFields := logrus.Fields{
|
|
"volume_server": volumeServer,
|
|
"volume_id": volumeID,
|
|
"needle_id": needleID,
|
|
"cookie": fmt.Sprintf("0x%x", cookie),
|
|
"offset": offset,
|
|
"size": size,
|
|
}
|
|
if fileID != "" {
|
|
logFields["file_id"] = fileID
|
|
}
|
|
s.logger.WithFields(logFields).Info("📖 Processing needle read request")
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
start := time.Now()
|
|
req := &seaweedfs.NeedleReadRequest{
|
|
VolumeID: uint32(volumeID),
|
|
NeedleID: needleID,
|
|
Cookie: uint32(cookie),
|
|
Offset: offset,
|
|
Size: size,
|
|
VolumeServer: volumeServer,
|
|
}
|
|
|
|
resp, err := s.rdmaClient.ReadNeedle(ctx, req)
|
|
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("❌ Needle read failed")
|
|
http.Error(w, fmt.Sprintf("Read failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"volume_id": volumeID,
|
|
"needle_id": needleID,
|
|
"is_rdma": resp.IsRDMA,
|
|
"source": resp.Source,
|
|
"duration": duration,
|
|
"data_size": len(resp.Data),
|
|
}).Info("✅ Needle read completed")
|
|
|
|
// Return metadata and first few bytes
|
|
result := map[string]interface{}{
|
|
"success": true,
|
|
"volume_id": volumeID,
|
|
"needle_id": needleID,
|
|
"cookie": fmt.Sprintf("0x%x", cookie),
|
|
"is_rdma": resp.IsRDMA,
|
|
"source": resp.Source,
|
|
"session_id": resp.SessionID,
|
|
"duration": duration.String(),
|
|
"data_size": len(resp.Data),
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
"use_temp_file": resp.UseTempFile,
|
|
"temp_file": resp.TempFilePath,
|
|
}
|
|
|
|
// Set headers for zero-copy optimization
|
|
if resp.UseTempFile && resp.TempFilePath != "" {
|
|
w.Header().Set("X-Use-Temp-File", "true")
|
|
w.Header().Set("X-Temp-File", resp.TempFilePath)
|
|
w.Header().Set("X-Source", resp.Source)
|
|
w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
|
|
|
|
// For zero-copy, return minimal JSON response and let client read from temp file
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
return
|
|
}
|
|
|
|
// Regular response with data
|
|
w.Header().Set("X-Source", resp.Source)
|
|
w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
|
|
|
|
// Include first 32 bytes as hex for verification
|
|
if len(resp.Data) > 0 {
|
|
displayLen := 32
|
|
if len(resp.Data) < displayLen {
|
|
displayLen = len(resp.Data)
|
|
}
|
|
result["data_preview"] = fmt.Sprintf("%x", resp.Data[:displayLen])
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// benchmarkHandler runs performance benchmarks
|
|
func (s *DemoServer) benchmarkHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// Parse parameters
|
|
query := r.URL.Query()
|
|
|
|
iterations := 10 // default value
|
|
if iterationsStr := query.Get("iterations"); iterationsStr != "" {
|
|
var parseErr error
|
|
iterations, parseErr = strconv.Atoi(iterationsStr)
|
|
if parseErr != nil {
|
|
http.Error(w, "invalid 'iterations' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
size := uint64(4096) // default value
|
|
if sizeStr := query.Get("size"); sizeStr != "" {
|
|
var parseErr error
|
|
size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
|
|
if parseErr != nil {
|
|
http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
if iterations <= 0 {
|
|
iterations = 10
|
|
}
|
|
if size == 0 {
|
|
size = 4096
|
|
}
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"iterations": iterations,
|
|
"size": size,
|
|
}).Info("🏁 Starting benchmark")
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
|
|
defer cancel()
|
|
|
|
var rdmaSuccessful, rdmaFailed, httpSuccessful, httpFailed int
|
|
var totalDuration time.Duration
|
|
var totalBytes uint64
|
|
|
|
startTime := time.Now()
|
|
|
|
for i := 0; i < iterations; i++ {
|
|
req := &seaweedfs.NeedleReadRequest{
|
|
VolumeID: 1,
|
|
NeedleID: uint64(i + 1),
|
|
Cookie: 0x12345678,
|
|
Offset: 0,
|
|
Size: size,
|
|
}
|
|
|
|
opStart := time.Now()
|
|
resp, err := s.rdmaClient.ReadNeedle(ctx, req)
|
|
opDuration := time.Since(opStart)
|
|
|
|
if err != nil {
|
|
httpFailed++
|
|
continue
|
|
}
|
|
|
|
totalDuration += opDuration
|
|
totalBytes += uint64(len(resp.Data))
|
|
|
|
if resp.IsRDMA {
|
|
rdmaSuccessful++
|
|
} else {
|
|
httpSuccessful++
|
|
}
|
|
}
|
|
|
|
benchDuration := time.Since(startTime)
|
|
|
|
// Calculate statistics
|
|
totalOperations := rdmaSuccessful + httpSuccessful
|
|
avgLatency := time.Duration(0)
|
|
if totalOperations > 0 {
|
|
avgLatency = totalDuration / time.Duration(totalOperations)
|
|
}
|
|
|
|
throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
|
|
opsPerSec := float64(totalOperations) / benchDuration.Seconds()
|
|
|
|
result := map[string]interface{}{
|
|
"benchmark_results": map[string]interface{}{
|
|
"iterations": iterations,
|
|
"size_per_op": size,
|
|
"total_duration": benchDuration.String(),
|
|
"successful_ops": totalOperations,
|
|
"failed_ops": rdmaFailed + httpFailed,
|
|
"rdma_ops": rdmaSuccessful,
|
|
"http_ops": httpSuccessful,
|
|
"avg_latency": avgLatency.String(),
|
|
"throughput_mbps": fmt.Sprintf("%.2f", throughputMBps),
|
|
"ops_per_sec": fmt.Sprintf("%.1f", opsPerSec),
|
|
"total_bytes": totalBytes,
|
|
},
|
|
"rdma_enabled": s.rdmaClient.IsEnabled(),
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
}
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"iterations": iterations,
|
|
"successful_ops": totalOperations,
|
|
"rdma_ops": rdmaSuccessful,
|
|
"http_ops": httpSuccessful,
|
|
"avg_latency": avgLatency,
|
|
"throughput_mbps": throughputMBps,
|
|
"ops_per_sec": opsPerSec,
|
|
}).Info("📊 Benchmark completed")
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// cleanupHandler handles temp file cleanup requests from mount clients
|
|
func (s *DemoServer) cleanupHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodDelete {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// Get temp file path from query parameters
|
|
tempFilePath := r.URL.Query().Get("temp_file")
|
|
if tempFilePath == "" {
|
|
http.Error(w, "missing 'temp_file' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
s.logger.WithField("temp_file", tempFilePath).Debug("🗑️ Processing cleanup request")
|
|
|
|
// Use the RDMA client's cleanup method (which delegates to seaweedfs client)
|
|
err := s.rdmaClient.CleanupTempFile(tempFilePath)
|
|
if err != nil {
|
|
s.logger.WithError(err).WithField("temp_file", tempFilePath).Warn("Failed to cleanup temp file")
|
|
http.Error(w, fmt.Sprintf("cleanup failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
s.logger.WithField("temp_file", tempFilePath).Debug("🧹 Temp file cleanup successful")
|
|
|
|
// Return success response
|
|
w.Header().Set("Content-Type", "application/json")
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"message": "temp file cleaned up successfully",
|
|
"temp_file": tempFilePath,
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|