You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							345 lines
						
					
					
						
							9.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							345 lines
						
					
					
						
							9.6 KiB
						
					
					
				| // Package main provides the main RDMA sidecar service that integrates with SeaweedFS | |
| package main | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"encoding/json" | |
| 	"fmt" | |
| 	"net/http" | |
| 	"os" | |
| 	"os/signal" | |
| 	"strconv" | |
| 	"syscall" | |
| 	"time" | |
| 
 | |
| 	"seaweedfs-rdma-sidecar/pkg/rdma" | |
| 
 | |
| 	"github.com/sirupsen/logrus" | |
| 	"github.com/spf13/cobra" | |
| ) | |
| 
 | |
| var ( | |
| 	port         int | |
| 	engineSocket string | |
| 	debug        bool | |
| 	timeout      time.Duration | |
| ) | |
| 
 | |
| // Response structs for JSON encoding | |
| type HealthResponse struct { | |
| 	Status              string `json:"status"` | |
| 	RdmaEngineConnected bool   `json:"rdma_engine_connected"` | |
| 	RdmaEngineLatency   string `json:"rdma_engine_latency"` | |
| 	Timestamp           string `json:"timestamp"` | |
| } | |
| 
 | |
| type CapabilitiesResponse struct { | |
| 	Version         string   `json:"version"` | |
| 	DeviceName      string   `json:"device_name"` | |
| 	VendorId        uint32   `json:"vendor_id"` | |
| 	MaxSessions     uint32   `json:"max_sessions"` | |
| 	MaxTransferSize uint64   `json:"max_transfer_size"` | |
| 	ActiveSessions  uint32   `json:"active_sessions"` | |
| 	RealRdma        bool     `json:"real_rdma"` | |
| 	PortGid         string   `json:"port_gid"` | |
| 	PortLid         uint16   `json:"port_lid"` | |
| 	SupportedAuth   []string `json:"supported_auth"` | |
| } | |
| 
 | |
| type PingResponse struct { | |
| 	Success       bool   `json:"success"` | |
| 	EngineLatency string `json:"engine_latency"` | |
| 	TotalLatency  string `json:"total_latency"` | |
| 	Timestamp     string `json:"timestamp"` | |
| } | |
| 
 | |
| func main() { | |
| 	var rootCmd = &cobra.Command{ | |
| 		Use:   "rdma-sidecar", | |
| 		Short: "SeaweedFS RDMA acceleration sidecar", | |
| 		Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine. | |
|  | |
| This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance | |
| Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`, | |
| 		RunE: runSidecar, | |
| 	} | |
| 
 | |
| 	// Flags | |
| 	rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port") | |
| 	rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket") | |
| 	rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging") | |
| 	rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout") | |
| 
 | |
| 	if err := rootCmd.Execute(); err != nil { | |
| 		fmt.Fprintf(os.Stderr, "Error: %v\n", err) | |
| 		os.Exit(1) | |
| 	} | |
| } | |
| 
 | |
| func runSidecar(cmd *cobra.Command, args []string) error { | |
| 	// Setup logging | |
| 	logger := logrus.New() | |
| 	if debug { | |
| 		logger.SetLevel(logrus.DebugLevel) | |
| 		logger.SetFormatter(&logrus.TextFormatter{ | |
| 			FullTimestamp: true, | |
| 			ForceColors:   true, | |
| 		}) | |
| 	} else { | |
| 		logger.SetLevel(logrus.InfoLevel) | |
| 	} | |
| 
 | |
| 	logger.WithFields(logrus.Fields{ | |
| 		"port":          port, | |
| 		"engine_socket": engineSocket, | |
| 		"debug":         debug, | |
| 		"timeout":       timeout, | |
| 	}).Info("🚀 Starting SeaweedFS RDMA Sidecar") | |
| 
 | |
| 	// Create RDMA client | |
| 	rdmaConfig := &rdma.Config{ | |
| 		EngineSocketPath: engineSocket, | |
| 		DefaultTimeout:   timeout, | |
| 		Logger:           logger, | |
| 	} | |
| 
 | |
| 	rdmaClient := rdma.NewClient(rdmaConfig) | |
| 
 | |
| 	// Connect to RDMA engine | |
| 	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
| 	defer cancel() | |
| 
 | |
| 	logger.Info("🔗 Connecting to RDMA engine...") | |
| 	if err := rdmaClient.Connect(ctx); err != nil { | |
| 		return fmt.Errorf("failed to connect to RDMA engine: %w", err) | |
| 	} | |
| 	logger.Info("✅ Connected to RDMA engine successfully") | |
| 
 | |
| 	// Create HTTP server | |
| 	sidecar := &Sidecar{ | |
| 		rdmaClient: rdmaClient, | |
| 		logger:     logger, | |
| 	} | |
| 
 | |
| 	mux := http.NewServeMux() | |
| 
 | |
| 	// Health check endpoint | |
| 	mux.HandleFunc("/health", sidecar.healthHandler) | |
| 
 | |
| 	// RDMA operations endpoints | |
| 	mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler) | |
| 	mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler) | |
| 	mux.HandleFunc("/rdma/ping", sidecar.pingHandler) | |
| 
 | |
| 	server := &http.Server{ | |
| 		Addr:    fmt.Sprintf(":%d", port), | |
| 		Handler: mux, | |
| 	} | |
| 
 | |
| 	// Handle graceful shutdown | |
| 	sigChan := make(chan os.Signal, 1) | |
| 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | |
| 
 | |
| 	go func() { | |
| 		logger.WithField("port", port).Info("🌐 HTTP server starting") | |
| 		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
| 			logger.WithError(err).Fatal("HTTP server failed") | |
| 		} | |
| 	}() | |
| 
 | |
| 	// Wait for shutdown signal | |
| 	<-sigChan | |
| 	logger.Info("📡 Received shutdown signal, gracefully shutting down...") | |
| 
 | |
| 	// Shutdown HTTP server | |
| 	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) | |
| 	defer shutdownCancel() | |
| 
 | |
| 	if err := server.Shutdown(shutdownCtx); err != nil { | |
| 		logger.WithError(err).Error("HTTP server shutdown failed") | |
| 	} else { | |
| 		logger.Info("🌐 HTTP server shutdown complete") | |
| 	} | |
| 
 | |
| 	// Disconnect from RDMA engine | |
| 	rdmaClient.Disconnect() | |
| 	logger.Info("🛑 RDMA sidecar shutdown complete") | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // Sidecar represents the main sidecar service | |
| type Sidecar struct { | |
| 	rdmaClient *rdma.Client | |
| 	logger     *logrus.Logger | |
| } | |
| 
 | |
| // Health check handler | |
| func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) { | |
| 	if r.Method != http.MethodGet { | |
| 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) | |
| 		return | |
| 	} | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) | |
| 	defer cancel() | |
| 
 | |
| 	// Test RDMA engine connectivity | |
| 	if !s.rdmaClient.IsConnected() { | |
| 		s.logger.Warn("⚠️  RDMA engine not connected") | |
| 		http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Ping RDMA engine | |
| 	latency, err := s.rdmaClient.Ping(ctx) | |
| 	if err != nil { | |
| 		s.logger.WithError(err).Error("❌ RDMA engine ping failed") | |
| 		http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable) | |
| 		return | |
| 	} | |
| 
 | |
| 	w.Header().Set("Content-Type", "application/json") | |
| 	response := HealthResponse{ | |
| 		Status:              "healthy", | |
| 		RdmaEngineConnected: true, | |
| 		RdmaEngineLatency:   latency.String(), | |
| 		Timestamp:           time.Now().Format(time.RFC3339), | |
| 	} | |
| 	json.NewEncoder(w).Encode(response) | |
| } | |
| 
 | |
| // RDMA capabilities handler | |
| func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) { | |
| 	if r.Method != http.MethodGet { | |
| 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) | |
| 		return | |
| 	} | |
| 
 | |
| 	caps := s.rdmaClient.GetCapabilities() | |
| 	if caps == nil { | |
| 		http.Error(w, "No capabilities available", http.StatusServiceUnavailable) | |
| 		return | |
| 	} | |
| 
 | |
| 	w.Header().Set("Content-Type", "application/json") | |
| 	response := CapabilitiesResponse{ | |
| 		Version:         caps.Version, | |
| 		DeviceName:      caps.DeviceName, | |
| 		VendorId:        caps.VendorId, | |
| 		MaxSessions:     uint32(caps.MaxSessions), | |
| 		MaxTransferSize: caps.MaxTransferSize, | |
| 		ActiveSessions:  uint32(caps.ActiveSessions), | |
| 		RealRdma:        caps.RealRdma, | |
| 		PortGid:         caps.PortGid, | |
| 		PortLid:         caps.PortLid, | |
| 		SupportedAuth:   caps.SupportedAuth, | |
| 	} | |
| 	json.NewEncoder(w).Encode(response) | |
| } | |
| 
 | |
| // RDMA ping handler | |
| func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) { | |
| 	if r.Method != http.MethodGet { | |
| 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) | |
| 		return | |
| 	} | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) | |
| 	defer cancel() | |
| 
 | |
| 	start := time.Now() | |
| 	latency, err := s.rdmaClient.Ping(ctx) | |
| 	totalLatency := time.Since(start) | |
| 
 | |
| 	if err != nil { | |
| 		s.logger.WithError(err).Error("❌ RDMA ping failed") | |
| 		http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError) | |
| 		return | |
| 	} | |
| 
 | |
| 	w.Header().Set("Content-Type", "application/json") | |
| 	response := PingResponse{ | |
| 		Success:       true, | |
| 		EngineLatency: latency.String(), | |
| 		TotalLatency:  totalLatency.String(), | |
| 		Timestamp:     time.Now().Format(time.RFC3339), | |
| 	} | |
| 	json.NewEncoder(w).Encode(response) | |
| } | |
| 
 | |
| // RDMA read handler - uses GET method with query parameters for RESTful read operations | |
| func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) { | |
| 	if r.Method != http.MethodGet { | |
| 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Parse query parameters | |
| 	query := r.URL.Query() | |
| 
 | |
| 	// Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier | |
| 	fileID := query.Get("file_id") | |
| 	if fileID == "" { | |
| 		http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Parse optional offset and size parameters | |
| 	offset := uint64(0) // default value | |
| 	if offsetStr := query.Get("offset"); offsetStr != "" { | |
| 		val, err := strconv.ParseUint(offsetStr, 10, 64) | |
| 		if err != nil { | |
| 			http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest) | |
| 			return | |
| 		} | |
| 		offset = val | |
| 	} | |
| 
 | |
| 	size := uint64(4096) // default value | |
| 	if sizeStr := query.Get("size"); sizeStr != "" { | |
| 		val, err := strconv.ParseUint(sizeStr, 10, 64) | |
| 		if err != nil { | |
| 			http.Error(w, "invalid 'size' parameter", http.StatusBadRequest) | |
| 			return | |
| 		} | |
| 		size = val | |
| 	} | |
| 
 | |
| 	s.logger.WithFields(logrus.Fields{ | |
| 		"file_id": fileID, | |
| 		"offset":  offset, | |
| 		"size":    size, | |
| 	}).Info("📖 Processing RDMA read request") | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(r.Context(), timeout) | |
| 	defer cancel() | |
| 
 | |
| 	start := time.Now() | |
| 	resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size) | |
| 	duration := time.Since(start) | |
| 
 | |
| 	if err != nil { | |
| 		s.logger.WithError(err).Error("❌ RDMA read failed") | |
| 		http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError) | |
| 		return | |
| 	} | |
| 
 | |
| 	s.logger.WithFields(logrus.Fields{ | |
| 		"file_id":       fileID, | |
| 		"bytes_read":    resp.BytesRead, | |
| 		"duration":      duration, | |
| 		"transfer_rate": resp.TransferRate, | |
| 		"session_id":    resp.SessionID, | |
| 	}).Info("✅ RDMA read completed successfully") | |
| 
 | |
| 	// Set response headers | |
| 	w.Header().Set("Content-Type", "application/octet-stream") | |
| 	w.Header().Set("X-RDMA-Session-ID", resp.SessionID) | |
| 	w.Header().Set("X-RDMA-Duration", duration.String()) | |
| 	w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate)) | |
| 	w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead)) | |
| 
 | |
| 	// Write the data | |
| 	w.Write(resp.Data) | |
| }
 |