You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
345 lines
9.6 KiB
345 lines
9.6 KiB
// Package main provides the main RDMA sidecar service that integrates with SeaweedFS
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"syscall"
|
|
"time"
|
|
|
|
"seaweedfs-rdma-sidecar/pkg/rdma"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var (
|
|
port int
|
|
engineSocket string
|
|
debug bool
|
|
timeout time.Duration
|
|
)
|
|
|
|
// Response structs for JSON encoding
|
|
type HealthResponse struct {
|
|
Status string `json:"status"`
|
|
RdmaEngineConnected bool `json:"rdma_engine_connected"`
|
|
RdmaEngineLatency string `json:"rdma_engine_latency"`
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
|
|
type CapabilitiesResponse struct {
|
|
Version string `json:"version"`
|
|
DeviceName string `json:"device_name"`
|
|
VendorId uint32 `json:"vendor_id"`
|
|
MaxSessions uint32 `json:"max_sessions"`
|
|
MaxTransferSize uint64 `json:"max_transfer_size"`
|
|
ActiveSessions uint32 `json:"active_sessions"`
|
|
RealRdma bool `json:"real_rdma"`
|
|
PortGid string `json:"port_gid"`
|
|
PortLid uint16 `json:"port_lid"`
|
|
SupportedAuth []string `json:"supported_auth"`
|
|
}
|
|
|
|
type PingResponse struct {
|
|
Success bool `json:"success"`
|
|
EngineLatency string `json:"engine_latency"`
|
|
TotalLatency string `json:"total_latency"`
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
|
|
func main() {
|
|
var rootCmd = &cobra.Command{
|
|
Use: "rdma-sidecar",
|
|
Short: "SeaweedFS RDMA acceleration sidecar",
|
|
Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine.
|
|
|
|
This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance
|
|
Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`,
|
|
RunE: runSidecar,
|
|
}
|
|
|
|
// Flags
|
|
rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port")
|
|
rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
|
|
rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
|
|
rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout")
|
|
|
|
if err := rootCmd.Execute(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func runSidecar(cmd *cobra.Command, args []string) error {
|
|
// Setup logging
|
|
logger := logrus.New()
|
|
if debug {
|
|
logger.SetLevel(logrus.DebugLevel)
|
|
logger.SetFormatter(&logrus.TextFormatter{
|
|
FullTimestamp: true,
|
|
ForceColors: true,
|
|
})
|
|
} else {
|
|
logger.SetLevel(logrus.InfoLevel)
|
|
}
|
|
|
|
logger.WithFields(logrus.Fields{
|
|
"port": port,
|
|
"engine_socket": engineSocket,
|
|
"debug": debug,
|
|
"timeout": timeout,
|
|
}).Info("🚀 Starting SeaweedFS RDMA Sidecar")
|
|
|
|
// Create RDMA client
|
|
rdmaConfig := &rdma.Config{
|
|
EngineSocketPath: engineSocket,
|
|
DefaultTimeout: timeout,
|
|
Logger: logger,
|
|
}
|
|
|
|
rdmaClient := rdma.NewClient(rdmaConfig)
|
|
|
|
// Connect to RDMA engine
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
logger.Info("🔗 Connecting to RDMA engine...")
|
|
if err := rdmaClient.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect to RDMA engine: %w", err)
|
|
}
|
|
logger.Info("✅ Connected to RDMA engine successfully")
|
|
|
|
// Create HTTP server
|
|
sidecar := &Sidecar{
|
|
rdmaClient: rdmaClient,
|
|
logger: logger,
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
|
|
// Health check endpoint
|
|
mux.HandleFunc("/health", sidecar.healthHandler)
|
|
|
|
// RDMA operations endpoints
|
|
mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler)
|
|
mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler)
|
|
mux.HandleFunc("/rdma/ping", sidecar.pingHandler)
|
|
|
|
server := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", port),
|
|
Handler: mux,
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
logger.WithField("port", port).Info("🌐 HTTP server starting")
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.WithError(err).Fatal("HTTP server failed")
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal
|
|
<-sigChan
|
|
logger.Info("📡 Received shutdown signal, gracefully shutting down...")
|
|
|
|
// Shutdown HTTP server
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer shutdownCancel()
|
|
|
|
if err := server.Shutdown(shutdownCtx); err != nil {
|
|
logger.WithError(err).Error("HTTP server shutdown failed")
|
|
} else {
|
|
logger.Info("🌐 HTTP server shutdown complete")
|
|
}
|
|
|
|
// Disconnect from RDMA engine
|
|
rdmaClient.Disconnect()
|
|
logger.Info("🛑 RDMA sidecar shutdown complete")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Sidecar represents the main sidecar service
|
|
type Sidecar struct {
|
|
rdmaClient *rdma.Client
|
|
logger *logrus.Logger
|
|
}
|
|
|
|
// Health check handler
|
|
func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
// Test RDMA engine connectivity
|
|
if !s.rdmaClient.IsConnected() {
|
|
s.logger.Warn("⚠️ RDMA engine not connected")
|
|
http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// Ping RDMA engine
|
|
latency, err := s.rdmaClient.Ping(ctx)
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("❌ RDMA engine ping failed")
|
|
http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
response := HealthResponse{
|
|
Status: "healthy",
|
|
RdmaEngineConnected: true,
|
|
RdmaEngineLatency: latency.String(),
|
|
Timestamp: time.Now().Format(time.RFC3339),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// RDMA capabilities handler
|
|
func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
caps := s.rdmaClient.GetCapabilities()
|
|
if caps == nil {
|
|
http.Error(w, "No capabilities available", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
response := CapabilitiesResponse{
|
|
Version: caps.Version,
|
|
DeviceName: caps.DeviceName,
|
|
VendorId: caps.VendorId,
|
|
MaxSessions: uint32(caps.MaxSessions),
|
|
MaxTransferSize: caps.MaxTransferSize,
|
|
ActiveSessions: uint32(caps.ActiveSessions),
|
|
RealRdma: caps.RealRdma,
|
|
PortGid: caps.PortGid,
|
|
PortLid: caps.PortLid,
|
|
SupportedAuth: caps.SupportedAuth,
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// RDMA ping handler
|
|
func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
start := time.Now()
|
|
latency, err := s.rdmaClient.Ping(ctx)
|
|
totalLatency := time.Since(start)
|
|
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("❌ RDMA ping failed")
|
|
http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
response := PingResponse{
|
|
Success: true,
|
|
EngineLatency: latency.String(),
|
|
TotalLatency: totalLatency.String(),
|
|
Timestamp: time.Now().Format(time.RFC3339),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
|
|
// RDMA read handler - uses GET method with query parameters for RESTful read operations
|
|
func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// Parse query parameters
|
|
query := r.URL.Query()
|
|
|
|
// Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier
|
|
fileID := query.Get("file_id")
|
|
if fileID == "" {
|
|
http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Parse optional offset and size parameters
|
|
offset := uint64(0) // default value
|
|
if offsetStr := query.Get("offset"); offsetStr != "" {
|
|
val, err := strconv.ParseUint(offsetStr, 10, 64)
|
|
if err != nil {
|
|
http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
offset = val
|
|
}
|
|
|
|
size := uint64(4096) // default value
|
|
if sizeStr := query.Get("size"); sizeStr != "" {
|
|
val, err := strconv.ParseUint(sizeStr, 10, 64)
|
|
if err != nil {
|
|
http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
|
|
return
|
|
}
|
|
size = val
|
|
}
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"file_id": fileID,
|
|
"offset": offset,
|
|
"size": size,
|
|
}).Info("📖 Processing RDMA read request")
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
|
defer cancel()
|
|
|
|
start := time.Now()
|
|
resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size)
|
|
duration := time.Since(start)
|
|
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("❌ RDMA read failed")
|
|
http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"file_id": fileID,
|
|
"bytes_read": resp.BytesRead,
|
|
"duration": duration,
|
|
"transfer_rate": resp.TransferRate,
|
|
"session_id": resp.SessionID,
|
|
}).Info("✅ RDMA read completed successfully")
|
|
|
|
// Set response headers
|
|
w.Header().Set("Content-Type", "application/octet-stream")
|
|
w.Header().Set("X-RDMA-Session-ID", resp.SessionID)
|
|
w.Header().Set("X-RDMA-Duration", duration.String())
|
|
w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate))
|
|
w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead))
|
|
|
|
// Write the data
|
|
w.Write(resp.Data)
|
|
}
|