You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
12 KiB
401 lines
12 KiB
// Package seaweedfs provides SeaweedFS-specific RDMA integration
|
|
package seaweedfs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"seaweedfs-rdma-sidecar/pkg/rdma"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// SeaweedFSRDMAClient provides SeaweedFS-specific RDMA operations
|
|
type SeaweedFSRDMAClient struct {
|
|
rdmaClient *rdma.Client
|
|
logger *logrus.Logger
|
|
volumeServerURL string
|
|
enabled bool
|
|
|
|
// Zero-copy optimization
|
|
tempDir string
|
|
useZeroCopy bool
|
|
}
|
|
|
|
// Config holds configuration for the SeaweedFS RDMA client
|
|
type Config struct {
|
|
RDMASocketPath string
|
|
VolumeServerURL string
|
|
Enabled bool
|
|
DefaultTimeout time.Duration
|
|
Logger *logrus.Logger
|
|
|
|
// Zero-copy optimization
|
|
TempDir string // Directory for temp files (default: /tmp/rdma-cache)
|
|
UseZeroCopy bool // Enable zero-copy via temp files
|
|
|
|
// Connection pooling options
|
|
EnablePooling bool // Enable RDMA connection pooling (default: true)
|
|
MaxConnections int // Max connections in pool (default: 10)
|
|
MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
|
|
}
|
|
|
|
// NeedleReadRequest represents a SeaweedFS needle read request
|
|
type NeedleReadRequest struct {
|
|
VolumeID uint32
|
|
NeedleID uint64
|
|
Cookie uint32
|
|
Offset uint64
|
|
Size uint64
|
|
VolumeServer string // Override volume server URL for this request
|
|
}
|
|
|
|
// NeedleReadResponse represents the result of a needle read
|
|
type NeedleReadResponse struct {
|
|
Data []byte
|
|
IsRDMA bool
|
|
Latency time.Duration
|
|
Source string // "rdma" or "http"
|
|
SessionID string
|
|
|
|
// Zero-copy optimization fields
|
|
TempFilePath string // Path to temp file with data (for zero-copy)
|
|
UseTempFile bool // Whether to use temp file instead of Data
|
|
}
|
|
|
|
// NewSeaweedFSRDMAClient creates a new SeaweedFS RDMA client
|
|
func NewSeaweedFSRDMAClient(config *Config) (*SeaweedFSRDMAClient, error) {
|
|
if config.Logger == nil {
|
|
config.Logger = logrus.New()
|
|
config.Logger.SetLevel(logrus.InfoLevel)
|
|
}
|
|
|
|
var rdmaClient *rdma.Client
|
|
if config.Enabled && config.RDMASocketPath != "" {
|
|
rdmaConfig := &rdma.Config{
|
|
EngineSocketPath: config.RDMASocketPath,
|
|
DefaultTimeout: config.DefaultTimeout,
|
|
Logger: config.Logger,
|
|
EnablePooling: config.EnablePooling,
|
|
MaxConnections: config.MaxConnections,
|
|
MaxIdleTime: config.MaxIdleTime,
|
|
}
|
|
rdmaClient = rdma.NewClient(rdmaConfig)
|
|
}
|
|
|
|
// Setup temp directory for zero-copy optimization
|
|
tempDir := config.TempDir
|
|
if tempDir == "" {
|
|
tempDir = "/tmp/rdma-cache"
|
|
}
|
|
|
|
if config.UseZeroCopy {
|
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
|
config.Logger.WithError(err).Warn("Failed to create temp directory, disabling zero-copy")
|
|
config.UseZeroCopy = false
|
|
}
|
|
}
|
|
|
|
return &SeaweedFSRDMAClient{
|
|
rdmaClient: rdmaClient,
|
|
logger: config.Logger,
|
|
volumeServerURL: config.VolumeServerURL,
|
|
enabled: config.Enabled,
|
|
tempDir: tempDir,
|
|
useZeroCopy: config.UseZeroCopy,
|
|
}, nil
|
|
}
|
|
|
|
// Start initializes the RDMA client connection
|
|
func (c *SeaweedFSRDMAClient) Start(ctx context.Context) error {
|
|
if !c.enabled || c.rdmaClient == nil {
|
|
c.logger.Info("🔄 RDMA disabled, using HTTP fallback only")
|
|
return nil
|
|
}
|
|
|
|
c.logger.Info("🚀 Starting SeaweedFS RDMA client...")
|
|
|
|
if err := c.rdmaClient.Connect(ctx); err != nil {
|
|
c.logger.WithError(err).Error("❌ Failed to connect to RDMA engine")
|
|
return fmt.Errorf("failed to connect to RDMA engine: %w", err)
|
|
}
|
|
|
|
c.logger.Info("✅ SeaweedFS RDMA client started successfully")
|
|
return nil
|
|
}
|
|
|
|
// Stop shuts down the RDMA client
|
|
func (c *SeaweedFSRDMAClient) Stop() {
|
|
if c.rdmaClient != nil {
|
|
c.rdmaClient.Disconnect()
|
|
c.logger.Info("🔌 SeaweedFS RDMA client stopped")
|
|
}
|
|
}
|
|
|
|
// IsEnabled returns true if RDMA is enabled and available
|
|
func (c *SeaweedFSRDMAClient) IsEnabled() bool {
|
|
return c.enabled && c.rdmaClient != nil && c.rdmaClient.IsConnected()
|
|
}
|
|
|
|
// ReadNeedle reads a needle using RDMA fast path or HTTP fallback
|
|
func (c *SeaweedFSRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
|
|
start := time.Now()
|
|
var rdmaErr error
|
|
|
|
// Try RDMA fast path first
|
|
if c.IsEnabled() {
|
|
c.logger.WithFields(logrus.Fields{
|
|
"volume_id": req.VolumeID,
|
|
"needle_id": req.NeedleID,
|
|
"offset": req.Offset,
|
|
"size": req.Size,
|
|
}).Debug("🚀 Attempting RDMA fast path")
|
|
|
|
rdmaReq := &rdma.ReadRequest{
|
|
VolumeID: req.VolumeID,
|
|
NeedleID: req.NeedleID,
|
|
Cookie: req.Cookie,
|
|
Offset: req.Offset,
|
|
Size: req.Size,
|
|
}
|
|
|
|
resp, err := c.rdmaClient.Read(ctx, rdmaReq)
|
|
if err != nil {
|
|
c.logger.WithError(err).Warn("⚠️ RDMA read failed, falling back to HTTP")
|
|
rdmaErr = err
|
|
} else {
|
|
c.logger.WithFields(logrus.Fields{
|
|
"volume_id": req.VolumeID,
|
|
"needle_id": req.NeedleID,
|
|
"bytes_read": resp.BytesRead,
|
|
"transfer_rate": resp.TransferRate,
|
|
"latency": time.Since(start),
|
|
}).Info("🚀 RDMA fast path successful")
|
|
|
|
// Try zero-copy optimization if enabled and data is large enough
|
|
if c.useZeroCopy && len(resp.Data) > 64*1024 { // 64KB threshold
|
|
tempFilePath, err := c.writeToTempFile(req, resp.Data)
|
|
if err != nil {
|
|
c.logger.WithError(err).Warn("Failed to write temp file, using regular response")
|
|
// Fall back to regular response
|
|
} else {
|
|
c.logger.WithFields(logrus.Fields{
|
|
"temp_file": tempFilePath,
|
|
"size": len(resp.Data),
|
|
}).Info("🔥 Zero-copy temp file created")
|
|
|
|
return &NeedleReadResponse{
|
|
Data: nil, // Don't duplicate data in memory
|
|
IsRDMA: true,
|
|
Latency: time.Since(start),
|
|
Source: "rdma-zerocopy",
|
|
SessionID: resp.SessionID,
|
|
TempFilePath: tempFilePath,
|
|
UseTempFile: true,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return &NeedleReadResponse{
|
|
Data: resp.Data,
|
|
IsRDMA: true,
|
|
Latency: time.Since(start),
|
|
Source: "rdma",
|
|
SessionID: resp.SessionID,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// Fallback to HTTP
|
|
c.logger.WithFields(logrus.Fields{
|
|
"volume_id": req.VolumeID,
|
|
"needle_id": req.NeedleID,
|
|
"reason": "rdma_unavailable",
|
|
}).Debug("🌐 Using HTTP fallback")
|
|
|
|
data, err := c.httpFallback(ctx, req)
|
|
if err != nil {
|
|
if rdmaErr != nil {
|
|
return nil, fmt.Errorf("both RDMA and HTTP fallback failed: RDMA=%v, HTTP=%v", rdmaErr, err)
|
|
}
|
|
return nil, fmt.Errorf("HTTP fallback failed: %w", err)
|
|
}
|
|
|
|
return &NeedleReadResponse{
|
|
Data: data,
|
|
IsRDMA: false,
|
|
Latency: time.Since(start),
|
|
Source: "http",
|
|
}, nil
|
|
}
|
|
|
|
// ReadNeedleRange reads a specific range from a needle
|
|
func (c *SeaweedFSRDMAClient) ReadNeedleRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*NeedleReadResponse, error) {
|
|
req := &NeedleReadRequest{
|
|
VolumeID: volumeID,
|
|
NeedleID: needleID,
|
|
Cookie: cookie,
|
|
Offset: offset,
|
|
Size: size,
|
|
}
|
|
return c.ReadNeedle(ctx, req)
|
|
}
|
|
|
|
// httpFallback performs HTTP fallback read from SeaweedFS volume server
|
|
func (c *SeaweedFSRDMAClient) httpFallback(ctx context.Context, req *NeedleReadRequest) ([]byte, error) {
|
|
// Use volume server from request, fallback to configured URL
|
|
volumeServerURL := req.VolumeServer
|
|
if volumeServerURL == "" {
|
|
volumeServerURL = c.volumeServerURL
|
|
}
|
|
|
|
if volumeServerURL == "" {
|
|
return nil, fmt.Errorf("no volume server URL provided in request or configured")
|
|
}
|
|
|
|
// Build URL using existing SeaweedFS file ID construction
|
|
volumeId := needle.VolumeId(req.VolumeID)
|
|
needleId := types.NeedleId(req.NeedleID)
|
|
cookie := types.Cookie(req.Cookie)
|
|
|
|
fileId := &needle.FileId{
|
|
VolumeId: volumeId,
|
|
Key: needleId,
|
|
Cookie: cookie,
|
|
}
|
|
|
|
url := fmt.Sprintf("%s/%s", volumeServerURL, fileId.String())
|
|
|
|
if req.Offset > 0 || req.Size > 0 {
|
|
url += fmt.Sprintf("?offset=%d&size=%d", req.Offset, req.Size)
|
|
}
|
|
|
|
c.logger.WithField("url", url).Debug("📥 HTTP fallback request")
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
|
|
}
|
|
|
|
client := &http.Client{Timeout: 30 * time.Second}
|
|
resp, err := client.Do(httpReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("HTTP request failed: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("HTTP request failed with status: %d", resp.StatusCode)
|
|
}
|
|
|
|
// Read response data - io.ReadAll handles context cancellation and timeouts correctly
|
|
data, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read HTTP response body: %w", err)
|
|
}
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"volume_id": req.VolumeID,
|
|
"needle_id": req.NeedleID,
|
|
"data_size": len(data),
|
|
}).Debug("📥 HTTP fallback successful")
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// HealthCheck verifies that the RDMA client is healthy
|
|
func (c *SeaweedFSRDMAClient) HealthCheck(ctx context.Context) error {
|
|
if !c.enabled {
|
|
return fmt.Errorf("RDMA is disabled")
|
|
}
|
|
|
|
if c.rdmaClient == nil {
|
|
return fmt.Errorf("RDMA client not initialized")
|
|
}
|
|
|
|
if !c.rdmaClient.IsConnected() {
|
|
return fmt.Errorf("RDMA client not connected")
|
|
}
|
|
|
|
// Try a ping to the RDMA engine
|
|
_, err := c.rdmaClient.Ping(ctx)
|
|
return err
|
|
}
|
|
|
|
// GetStats returns statistics about the RDMA client
|
|
func (c *SeaweedFSRDMAClient) GetStats() map[string]interface{} {
|
|
stats := map[string]interface{}{
|
|
"enabled": c.enabled,
|
|
"volume_server_url": c.volumeServerURL,
|
|
"rdma_socket_path": "",
|
|
}
|
|
|
|
if c.rdmaClient != nil {
|
|
stats["connected"] = c.rdmaClient.IsConnected()
|
|
// Note: Capabilities method may not be available, skip for now
|
|
} else {
|
|
stats["connected"] = false
|
|
stats["error"] = "RDMA client not initialized"
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// writeToTempFile writes RDMA data to a temp file for zero-copy optimization
|
|
func (c *SeaweedFSRDMAClient) writeToTempFile(req *NeedleReadRequest, data []byte) (string, error) {
|
|
// Create temp file with unique name based on needle info
|
|
fileName := fmt.Sprintf("vol%d_needle%x_cookie%d_offset%d_size%d.tmp",
|
|
req.VolumeID, req.NeedleID, req.Cookie, req.Offset, req.Size)
|
|
tempFilePath := filepath.Join(c.tempDir, fileName)
|
|
|
|
// Write data to temp file (this populates the page cache)
|
|
err := os.WriteFile(tempFilePath, data, 0644)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to write temp file: %w", err)
|
|
}
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"temp_file": tempFilePath,
|
|
"size": len(data),
|
|
}).Debug("📁 Temp file written to page cache")
|
|
|
|
return tempFilePath, nil
|
|
}
|
|
|
|
// CleanupTempFile removes a temp file (called by mount client after use)
|
|
func (c *SeaweedFSRDMAClient) CleanupTempFile(tempFilePath string) error {
|
|
if tempFilePath == "" {
|
|
return nil
|
|
}
|
|
|
|
// Validate that tempFilePath is within c.tempDir
|
|
absTempDir, err := filepath.Abs(c.tempDir)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve temp dir: %w", err)
|
|
}
|
|
absFilePath, err := filepath.Abs(tempFilePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to resolve temp file path: %w", err)
|
|
}
|
|
// Ensure absFilePath is within absTempDir
|
|
if !strings.HasPrefix(absFilePath, absTempDir+string(os.PathSeparator)) && absFilePath != absTempDir {
|
|
c.logger.WithField("temp_file", tempFilePath).Warn("Attempted cleanup of file outside temp dir")
|
|
return fmt.Errorf("invalid temp file path")
|
|
}
|
|
|
|
err = os.Remove(absFilePath)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
c.logger.WithError(err).WithField("temp_file", absFilePath).Warn("Failed to cleanup temp file")
|
|
return err
|
|
}
|
|
|
|
c.logger.WithField("temp_file", absFilePath).Debug("🧹 Temp file cleaned up")
|
|
return nil
|
|
}
|