You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
630 lines
18 KiB
630 lines
18 KiB
// Package rdma provides high-level RDMA operations for SeaweedFS integration
|
|
package rdma
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"seaweedfs-rdma-sidecar/pkg/ipc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// PooledConnection represents a pooled RDMA connection
|
|
type PooledConnection struct {
|
|
ipcClient *ipc.Client
|
|
lastUsed time.Time
|
|
inUse bool
|
|
sessionID string
|
|
created time.Time
|
|
}
|
|
|
|
// ConnectionPool manages a pool of RDMA connections
|
|
type ConnectionPool struct {
|
|
connections []*PooledConnection
|
|
mutex sync.RWMutex
|
|
maxConnections int
|
|
maxIdleTime time.Duration
|
|
enginePath string
|
|
logger *logrus.Logger
|
|
}
|
|
|
|
// Client provides high-level RDMA operations with connection pooling
|
|
type Client struct {
|
|
pool *ConnectionPool
|
|
logger *logrus.Logger
|
|
enginePath string
|
|
capabilities *ipc.GetCapabilitiesResponse
|
|
connected bool
|
|
defaultTimeout time.Duration
|
|
|
|
// Legacy single connection (for backward compatibility)
|
|
ipcClient *ipc.Client
|
|
}
|
|
|
|
// Config holds configuration for the RDMA client
|
|
type Config struct {
|
|
EngineSocketPath string
|
|
DefaultTimeout time.Duration
|
|
Logger *logrus.Logger
|
|
|
|
// Connection pooling options
|
|
EnablePooling bool // Enable connection pooling (default: true)
|
|
MaxConnections int // Max connections in pool (default: 10)
|
|
MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
|
|
}
|
|
|
|
// ReadRequest represents a SeaweedFS needle read request
|
|
type ReadRequest struct {
|
|
VolumeID uint32
|
|
NeedleID uint64
|
|
Cookie uint32
|
|
Offset uint64
|
|
Size uint64
|
|
AuthToken *string
|
|
}
|
|
|
|
// ReadResponse represents the result of an RDMA read operation
|
|
type ReadResponse struct {
|
|
Data []byte
|
|
BytesRead uint64
|
|
Duration time.Duration
|
|
TransferRate float64
|
|
SessionID string
|
|
Success bool
|
|
Message string
|
|
}
|
|
|
|
// NewConnectionPool creates a new connection pool
|
|
func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool {
|
|
if maxConnections <= 0 {
|
|
maxConnections = 10 // Default
|
|
}
|
|
if maxIdleTime <= 0 {
|
|
maxIdleTime = 5 * time.Minute // Default
|
|
}
|
|
|
|
return &ConnectionPool{
|
|
connections: make([]*PooledConnection, 0, maxConnections),
|
|
maxConnections: maxConnections,
|
|
maxIdleTime: maxIdleTime,
|
|
enginePath: enginePath,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// getConnection gets an available connection from the pool or creates a new one
|
|
func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
// Look for an available connection
|
|
for _, conn := range p.connections {
|
|
if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime {
|
|
conn.inUse = true
|
|
conn.lastUsed = time.Now()
|
|
p.logger.WithField("session_id", conn.sessionID).Debug("🔌 Reusing pooled RDMA connection")
|
|
return conn, nil
|
|
}
|
|
}
|
|
|
|
// Create new connection if under limit
|
|
if len(p.connections) < p.maxConnections {
|
|
ipcClient := ipc.NewClient(p.enginePath, p.logger)
|
|
if err := ipcClient.Connect(ctx); err != nil {
|
|
return nil, fmt.Errorf("failed to create new pooled connection: %w", err)
|
|
}
|
|
|
|
conn := &PooledConnection{
|
|
ipcClient: ipcClient,
|
|
lastUsed: time.Now(),
|
|
inUse: true,
|
|
sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()),
|
|
created: time.Now(),
|
|
}
|
|
|
|
p.connections = append(p.connections, conn)
|
|
p.logger.WithFields(logrus.Fields{
|
|
"session_id": conn.sessionID,
|
|
"pool_size": len(p.connections),
|
|
}).Info("🚀 Created new pooled RDMA connection")
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// Pool is full, wait for an available connection
|
|
return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections)
|
|
}
|
|
|
|
// releaseConnection returns a connection to the pool
|
|
func (p *ConnectionPool) releaseConnection(conn *PooledConnection) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
conn.inUse = false
|
|
conn.lastUsed = time.Now()
|
|
|
|
p.logger.WithField("session_id", conn.sessionID).Debug("🔄 Released RDMA connection back to pool")
|
|
}
|
|
|
|
// cleanup removes idle connections from the pool
|
|
func (p *ConnectionPool) cleanup() {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
now := time.Now()
|
|
activeConnections := make([]*PooledConnection, 0, len(p.connections))
|
|
|
|
for _, conn := range p.connections {
|
|
if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime {
|
|
activeConnections = append(activeConnections, conn)
|
|
} else {
|
|
// Close idle connection
|
|
conn.ipcClient.Disconnect()
|
|
p.logger.WithFields(logrus.Fields{
|
|
"session_id": conn.sessionID,
|
|
"idle_time": now.Sub(conn.lastUsed),
|
|
}).Debug("🧹 Cleaned up idle RDMA connection")
|
|
}
|
|
}
|
|
|
|
p.connections = activeConnections
|
|
}
|
|
|
|
// Close closes all connections in the pool
|
|
func (p *ConnectionPool) Close() {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
for _, conn := range p.connections {
|
|
conn.ipcClient.Disconnect()
|
|
}
|
|
p.connections = nil
|
|
p.logger.Info("🔌 Connection pool closed")
|
|
}
|
|
|
|
// NewClient creates a new RDMA client
|
|
func NewClient(config *Config) *Client {
|
|
if config.Logger == nil {
|
|
config.Logger = logrus.New()
|
|
config.Logger.SetLevel(logrus.InfoLevel)
|
|
}
|
|
|
|
if config.DefaultTimeout == 0 {
|
|
config.DefaultTimeout = 30 * time.Second
|
|
}
|
|
|
|
client := &Client{
|
|
logger: config.Logger,
|
|
enginePath: config.EngineSocketPath,
|
|
defaultTimeout: config.DefaultTimeout,
|
|
}
|
|
|
|
// Initialize connection pooling if enabled (default: true)
|
|
enablePooling := config.EnablePooling
|
|
if config.MaxConnections == 0 && config.MaxIdleTime == 0 {
|
|
// Default to enabled if not explicitly configured
|
|
enablePooling = true
|
|
}
|
|
|
|
if enablePooling {
|
|
client.pool = NewConnectionPool(
|
|
config.EngineSocketPath,
|
|
config.MaxConnections,
|
|
config.MaxIdleTime,
|
|
config.Logger,
|
|
)
|
|
|
|
// Start cleanup goroutine
|
|
go client.startCleanupRoutine()
|
|
|
|
config.Logger.WithFields(logrus.Fields{
|
|
"max_connections": client.pool.maxConnections,
|
|
"max_idle_time": client.pool.maxIdleTime,
|
|
}).Info("🔌 RDMA connection pooling enabled")
|
|
} else {
|
|
// Legacy single connection mode
|
|
client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger)
|
|
config.Logger.Info("🔌 RDMA single connection mode (pooling disabled)")
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
// startCleanupRoutine starts a background goroutine to clean up idle connections
|
|
func (c *Client) startCleanupRoutine() {
|
|
ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute
|
|
go func() {
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
if c.pool != nil {
|
|
c.pool.cleanup()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Connect establishes connection to the Rust RDMA engine and queries capabilities
|
|
func (c *Client) Connect(ctx context.Context) error {
|
|
c.logger.Info("🚀 Connecting to RDMA engine")
|
|
|
|
if c.pool != nil {
|
|
// Connection pooling mode - connections are created on-demand
|
|
c.connected = true
|
|
c.logger.Info("✅ RDMA client ready (connection pooling enabled)")
|
|
return nil
|
|
}
|
|
|
|
// Single connection mode
|
|
if err := c.ipcClient.Connect(ctx); err != nil {
|
|
return fmt.Errorf("failed to connect to IPC: %w", err)
|
|
}
|
|
|
|
// Test connectivity with ping
|
|
clientID := "rdma-client"
|
|
pong, err := c.ipcClient.Ping(ctx, &clientID)
|
|
if err != nil {
|
|
c.ipcClient.Disconnect()
|
|
return fmt.Errorf("failed to ping RDMA engine: %w", err)
|
|
}
|
|
|
|
latency := time.Duration(pong.ServerRttNs)
|
|
c.logger.WithFields(logrus.Fields{
|
|
"latency": latency,
|
|
"server_rtt": time.Duration(pong.ServerRttNs),
|
|
}).Info("📡 RDMA engine ping successful")
|
|
|
|
// Get capabilities
|
|
caps, err := c.ipcClient.GetCapabilities(ctx, &clientID)
|
|
if err != nil {
|
|
c.ipcClient.Disconnect()
|
|
return fmt.Errorf("failed to get engine capabilities: %w", err)
|
|
}
|
|
|
|
c.capabilities = caps
|
|
c.connected = true
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"version": caps.Version,
|
|
"device_name": caps.DeviceName,
|
|
"vendor_id": caps.VendorId,
|
|
"max_sessions": caps.MaxSessions,
|
|
"max_transfer_size": caps.MaxTransferSize,
|
|
"active_sessions": caps.ActiveSessions,
|
|
"real_rdma": caps.RealRdma,
|
|
"port_gid": caps.PortGid,
|
|
"port_lid": caps.PortLid,
|
|
}).Info("✅ RDMA engine connected and ready")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Disconnect closes the connection to the RDMA engine
|
|
func (c *Client) Disconnect() {
|
|
if c.connected {
|
|
if c.pool != nil {
|
|
// Connection pooling mode
|
|
c.pool.Close()
|
|
c.logger.Info("🔌 Disconnected from RDMA engine (pool closed)")
|
|
} else {
|
|
// Single connection mode
|
|
c.ipcClient.Disconnect()
|
|
c.logger.Info("🔌 Disconnected from RDMA engine")
|
|
}
|
|
c.connected = false
|
|
}
|
|
}
|
|
|
|
// IsConnected returns true if connected to the RDMA engine
|
|
func (c *Client) IsConnected() bool {
|
|
if c.pool != nil {
|
|
// Connection pooling mode - always connected if pool exists
|
|
return c.connected
|
|
} else {
|
|
// Single connection mode
|
|
return c.connected && c.ipcClient.IsConnected()
|
|
}
|
|
}
|
|
|
|
// GetCapabilities returns the RDMA engine capabilities
|
|
func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse {
|
|
return c.capabilities
|
|
}
|
|
|
|
// Read performs an RDMA read operation for a SeaweedFS needle
|
|
func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
|
|
if !c.IsConnected() {
|
|
return nil, fmt.Errorf("not connected to RDMA engine")
|
|
}
|
|
|
|
startTime := time.Now()
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"volume_id": req.VolumeID,
|
|
"needle_id": req.NeedleID,
|
|
"offset": req.Offset,
|
|
"size": req.Size,
|
|
}).Debug("📖 Starting RDMA read operation")
|
|
|
|
if c.pool != nil {
|
|
// Connection pooling mode
|
|
return c.readWithPool(ctx, req, startTime)
|
|
}
|
|
|
|
// Single connection mode
|
|
// Create IPC request
|
|
ipcReq := &ipc.StartReadRequest{
|
|
VolumeID: req.VolumeID,
|
|
NeedleID: req.NeedleID,
|
|
Cookie: req.Cookie,
|
|
Offset: req.Offset,
|
|
Size: req.Size,
|
|
RemoteAddr: 0, // Will be set by engine (mock for now)
|
|
RemoteKey: 0, // Will be set by engine (mock for now)
|
|
TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
|
|
AuthToken: req.AuthToken,
|
|
}
|
|
|
|
// Start RDMA read
|
|
startResp, err := c.ipcClient.StartRead(ctx, ipcReq)
|
|
if err != nil {
|
|
c.logger.WithError(err).Error("❌ Failed to start RDMA read")
|
|
return nil, fmt.Errorf("failed to start RDMA read: %w", err)
|
|
}
|
|
|
|
// In the new protocol, if we got a StartReadResponse, the operation was successful
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": startResp.SessionID,
|
|
"local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
|
|
"local_key": startResp.LocalKey,
|
|
"transfer_size": startResp.TransferSize,
|
|
"expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
|
|
"expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
|
|
}).Debug("📖 RDMA read session started")
|
|
|
|
// Complete the RDMA read
|
|
completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
|
|
if err != nil {
|
|
c.logger.WithError(err).Error("❌ Failed to complete RDMA read")
|
|
return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
|
|
}
|
|
|
|
duration := time.Since(startTime)
|
|
|
|
if !completeResp.Success {
|
|
errorMsg := "unknown error"
|
|
if completeResp.Message != nil {
|
|
errorMsg = *completeResp.Message
|
|
}
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": startResp.SessionID,
|
|
"error_message": errorMsg,
|
|
}).Error("❌ RDMA read completion failed")
|
|
return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
|
|
}
|
|
|
|
// Calculate transfer rate (bytes/second)
|
|
transferRate := float64(startResp.TransferSize) / duration.Seconds()
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": startResp.SessionID,
|
|
"bytes_read": startResp.TransferSize,
|
|
"duration": duration,
|
|
"transfer_rate": transferRate,
|
|
"server_crc": completeResp.ServerCrc,
|
|
}).Info("✅ RDMA read completed successfully")
|
|
|
|
// MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
|
|
//
|
|
// This section generates placeholder data for the mock RDMA implementation.
|
|
// In a production RDMA implementation, this should be replaced with:
|
|
//
|
|
// 1. The actual data transferred via RDMA from the remote memory region
|
|
// 2. Data validation using checksums/CRC from the RDMA completion
|
|
// 3. Proper error handling for RDMA transfer failures
|
|
// 4. Memory region cleanup and deregistration
|
|
//
|
|
// TODO for real RDMA implementation:
|
|
// - Replace mockData with actual RDMA buffer contents
|
|
// - Validate data integrity using server CRC: completeResp.ServerCrc
|
|
// - Handle partial transfers and retry logic
|
|
// - Implement proper memory management for RDMA regions
|
|
//
|
|
// Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...)
|
|
// This allows testing of the integration pipeline without real hardware
|
|
mockData := make([]byte, startResp.TransferSize)
|
|
for i := range mockData {
|
|
mockData[i] = byte(i % 256) // Simple repeating pattern for verification
|
|
}
|
|
// END MOCK DATA IMPLEMENTATION
|
|
|
|
return &ReadResponse{
|
|
Data: mockData,
|
|
BytesRead: startResp.TransferSize,
|
|
Duration: duration,
|
|
TransferRate: transferRate,
|
|
SessionID: startResp.SessionID,
|
|
Success: true,
|
|
Message: "RDMA read completed successfully",
|
|
}, nil
|
|
}
|
|
|
|
// ReadRange performs an RDMA read for a specific range within a needle
|
|
func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) {
|
|
req := &ReadRequest{
|
|
VolumeID: volumeID,
|
|
NeedleID: needleID,
|
|
Cookie: cookie,
|
|
Offset: offset,
|
|
Size: size,
|
|
}
|
|
return c.Read(ctx, req)
|
|
}
|
|
|
|
// ReadFileRange performs an RDMA read using SeaweedFS file ID format
|
|
func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) {
|
|
// Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted)
|
|
volumeID, needleID, cookie, err := parseFileID(fileID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
|
|
}
|
|
|
|
req := &ReadRequest{
|
|
VolumeID: volumeID,
|
|
NeedleID: needleID,
|
|
Cookie: cookie,
|
|
Offset: offset,
|
|
Size: size,
|
|
}
|
|
return c.Read(ctx, req)
|
|
}
|
|
|
|
// parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID
|
|
// Uses existing SeaweedFS parsing logic to ensure compatibility
|
|
func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) {
|
|
// Use existing SeaweedFS file ID parsing
|
|
fid, err := needle.ParseFileIdFromString(fileId)
|
|
if err != nil {
|
|
return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err)
|
|
}
|
|
|
|
volumeID = uint32(fid.VolumeId)
|
|
needleID = uint64(fid.Key)
|
|
cookie = uint32(fid.Cookie)
|
|
|
|
return volumeID, needleID, cookie, nil
|
|
}
|
|
|
|
// ReadFull performs an RDMA read for an entire needle
|
|
func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) {
|
|
req := &ReadRequest{
|
|
VolumeID: volumeID,
|
|
NeedleID: needleID,
|
|
Cookie: cookie,
|
|
Offset: 0,
|
|
Size: 0, // 0 means read entire needle
|
|
}
|
|
return c.Read(ctx, req)
|
|
}
|
|
|
|
// Ping tests connectivity to the RDMA engine
|
|
func (c *Client) Ping(ctx context.Context) (time.Duration, error) {
|
|
if !c.IsConnected() {
|
|
return 0, fmt.Errorf("not connected to RDMA engine")
|
|
}
|
|
|
|
clientID := "health-check"
|
|
start := time.Now()
|
|
pong, err := c.ipcClient.Ping(ctx, &clientID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
totalLatency := time.Since(start)
|
|
serverRtt := time.Duration(pong.ServerRttNs)
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"total_latency": totalLatency,
|
|
"server_rtt": serverRtt,
|
|
"client_id": clientID,
|
|
}).Debug("🏓 RDMA engine ping successful")
|
|
|
|
return totalLatency, nil
|
|
}
|
|
|
|
// readWithPool performs RDMA read using connection pooling
|
|
func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) {
|
|
// Get connection from pool
|
|
conn, err := c.pool.getConnection(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get pooled connection: %w", err)
|
|
}
|
|
defer c.pool.releaseConnection(conn)
|
|
|
|
c.logger.WithField("session_id", conn.sessionID).Debug("🔌 Using pooled RDMA connection")
|
|
|
|
// Create IPC request
|
|
ipcReq := &ipc.StartReadRequest{
|
|
VolumeID: req.VolumeID,
|
|
NeedleID: req.NeedleID,
|
|
Cookie: req.Cookie,
|
|
Offset: req.Offset,
|
|
Size: req.Size,
|
|
RemoteAddr: 0, // Will be set by engine (mock for now)
|
|
RemoteKey: 0, // Will be set by engine (mock for now)
|
|
TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
|
|
AuthToken: req.AuthToken,
|
|
}
|
|
|
|
// Start RDMA read
|
|
startResp, err := conn.ipcClient.StartRead(ctx, ipcReq)
|
|
if err != nil {
|
|
c.logger.WithError(err).Error("❌ Failed to start RDMA read (pooled)")
|
|
return nil, fmt.Errorf("failed to start RDMA read: %w", err)
|
|
}
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": startResp.SessionID,
|
|
"local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
|
|
"local_key": startResp.LocalKey,
|
|
"transfer_size": startResp.TransferSize,
|
|
"expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
|
|
"expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
|
|
"pooled": true,
|
|
}).Debug("📖 RDMA read session started (pooled)")
|
|
|
|
// Complete the RDMA read
|
|
completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
|
|
if err != nil {
|
|
c.logger.WithError(err).Error("❌ Failed to complete RDMA read (pooled)")
|
|
return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
|
|
}
|
|
|
|
duration := time.Since(startTime)
|
|
|
|
if !completeResp.Success {
|
|
errorMsg := "unknown error"
|
|
if completeResp.Message != nil {
|
|
errorMsg = *completeResp.Message
|
|
}
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": conn.sessionID,
|
|
"error_message": errorMsg,
|
|
"pooled": true,
|
|
}).Error("❌ RDMA read completion failed (pooled)")
|
|
return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
|
|
}
|
|
|
|
// Calculate transfer rate (bytes/second)
|
|
transferRate := float64(startResp.TransferSize) / duration.Seconds()
|
|
|
|
c.logger.WithFields(logrus.Fields{
|
|
"session_id": conn.sessionID,
|
|
"bytes_read": startResp.TransferSize,
|
|
"duration": duration,
|
|
"transfer_rate": transferRate,
|
|
"server_crc": completeResp.ServerCrc,
|
|
"pooled": true,
|
|
}).Info("✅ RDMA read completed successfully (pooled)")
|
|
|
|
// For the mock implementation, we'll return placeholder data
|
|
// In the real implementation, this would be the actual RDMA transferred data
|
|
mockData := make([]byte, startResp.TransferSize)
|
|
for i := range mockData {
|
|
mockData[i] = byte(i % 256) // Simple pattern for testing
|
|
}
|
|
|
|
return &ReadResponse{
|
|
Data: mockData,
|
|
BytesRead: startResp.TransferSize,
|
|
Duration: duration,
|
|
TransferRate: transferRate,
|
|
SessionID: conn.sessionID,
|
|
Success: true,
|
|
Message: "RDMA read successful (pooled)",
|
|
}, nil
|
|
}
|