You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
9.0 KiB
9.0 KiB
📋 Weed Mount RDMA Integration - Code Path Analysis
Current Status
The RDMA client (RDMAMountClient
) exists in weed/mount/rdma_client.go
but is not yet integrated into the actual file read path. The integration points are identified but not implemented.
🔍 Complete Code Path
1. FUSE Read Request Entry Point
// File: weed/mount/weedfs_file_read.go:41
func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
// ...
offset := int64(in.Offset)
totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
// ...
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
2. File Handle Read Coordination
// File: weed/mount/weedfs_file_read.go:103
func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
size := len(buff)
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
// KEY INTEGRATION POINT: This is where RDMA should be attempted
n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
// ...
return n, err
}
3. Chunk Reading (Current Implementation)
// File: weed/mount/filehandle_read.go:29
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
// ...
// CURRENT: Direct chunk reading without RDMA
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
// MISSING: RDMA integration should happen here
return int64(totalRead), ts, err
}
4. RDMA Integration Point (What Needs to Be Added)
The integration should happen in readFromChunksWithContext
like this:
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
// ... existing code ...
// NEW: Try RDMA acceleration first
if fh.wfs.rdmaClient != nil && fh.wfs.rdmaClient.IsHealthy() {
if totalRead, ts, err := fh.tryRDMARead(ctx, buff, offset); err == nil {
glog.V(4).Infof("RDMA read successful: %d bytes", totalRead)
return totalRead, ts, nil
}
glog.V(2).Infof("RDMA read failed, falling back to HTTP")
}
// FALLBACK: Original HTTP-based chunk reading
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
return int64(totalRead), ts, err
}
🚀 RDMA Client Integration
5. RDMA Read Implementation (Already Exists)
// File: weed/mount/rdma_client.go:129
func (c *RDMAMountClient) ReadNeedle(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) ([]byte, bool, error) {
// Prepare request URL
reqURL := fmt.Sprintf("http://%s/read?volume=%d&needle=%d&cookie=%d&offset=%d&size=%d",
c.sidecarAddr, volumeID, needleID, cookie, offset, size)
// Execute HTTP request to RDMA sidecar
resp, err := c.httpClient.Do(req)
// ...
// Return data with RDMA metadata
return data, isRDMA, nil
}
6. RDMA Sidecar Processing
// File: seaweedfs-rdma-sidecar/cmd/demo-server/main.go:375
func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
// Parse volume, needle, cookie from URL parameters
volumeID, _ := strconv.ParseUint(query.Get("volume"), 10, 32)
needleID, _ := strconv.ParseUint(query.Get("needle"), 10, 64)
// Use distributed client for volume lookup + RDMA
if s.useDistributed && s.distributedClient != nil {
resp, err = s.distributedClient.ReadNeedle(ctx, req)
} else {
resp, err = s.rdmaClient.ReadNeedle(ctx, req) // Local RDMA
}
// Return binary data or JSON metadata
w.Write(resp.Data)
}
7. Volume Lookup & RDMA Engine
// File: seaweedfs-rdma-sidecar/pkg/seaweedfs/distributed_client.go:45
func (c *DistributedRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
// Step 1: Lookup volume location from master
locations, err := c.locationService.LookupVolume(ctx, req.VolumeID)
// Step 2: Find best server (local preferred)
bestLocation := c.locationService.FindBestLocation(locations)
// Step 3: Make HTTP request to target server's RDMA sidecar
return c.makeRDMARequest(ctx, req, bestLocation, start)
}
8. Rust RDMA Engine (Final Data Access)
// File: rdma-engine/src/ipc.rs:403
async fn handle_start_read(req: StartReadRequest, ...) -> RdmaResult<StartReadResponse> {
// Create RDMA session
let session_id = Uuid::new_v4().to_string();
let buffer = vec![0u8; transfer_size as usize];
// Register memory for RDMA
let memory_region = rdma_context.register_memory(local_addr, transfer_size).await?;
// Perform RDMA read (mock implementation)
rdma_context.post_read(local_addr, remote_addr, remote_key, size, wr_id).await?;
let completions = rdma_context.poll_completion(1).await?;
// Return session info
Ok(StartReadResponse { session_id, local_addr, ... })
}
🔧 Missing Integration Components
1. WFS Struct Extension
// File: weed/mount/weedfs.go (needs modification)
type WFS struct {
// ... existing fields ...
rdmaClient *RDMAMountClient // ADD THIS
}
2. RDMA Client Initialization
// File: weed/command/mount.go (needs modification)
func runMount(cmd *cobra.Command, args []string) bool {
// ... existing code ...
// NEW: Initialize RDMA client if enabled
var rdmaClient *mount.RDMAMountClient
if *mountOptions.rdmaEnabled && *mountOptions.rdmaSidecarAddr != "" {
rdmaClient, err = mount.NewRDMAMountClient(
*mountOptions.rdmaSidecarAddr,
*mountOptions.rdmaMaxConcurrent,
*mountOptions.rdmaTimeoutMs,
)
if err != nil {
glog.Warningf("Failed to initialize RDMA client: %v", err)
}
}
// Pass RDMA client to WFS
wfs := mount.NewSeaweedFileSystem(&mount.Option{
// ... existing options ...
RDMAClient: rdmaClient, // ADD THIS
})
}
3. Chunk-to-Needle Mapping
// File: weed/mount/filehandle_read.go (needs new method)
func (fh *FileHandle) tryRDMARead(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
entry := fh.GetEntry()
// Find which chunk contains the requested offset
for _, chunk := range entry.GetEntry().Chunks {
if offset >= chunk.Offset && offset < chunk.Offset+int64(chunk.Size) {
// Parse chunk.FileId to get volume, needle, cookie
volumeID, needleID, cookie, err := ParseFileId(chunk.FileId)
if err != nil {
return 0, 0, err
}
// Calculate offset within the chunk
chunkOffset := uint64(offset - chunk.Offset)
readSize := uint64(min(len(buff), int(chunk.Size-chunkOffset)))
// Make RDMA request
data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(
ctx, volumeID, needleID, cookie, chunkOffset, readSize)
if err != nil {
return 0, 0, err
}
// Copy data to buffer
copied := copy(buff, data)
return int64(copied), time.Now().UnixNano(), nil
}
}
return 0, 0, fmt.Errorf("chunk not found for offset %d", offset)
}
📊 Request Flow Summary
- User Application →
read()
system call - FUSE Kernel → Routes to
WFS.Read()
- WFS.Read() → Calls
readDataByFileHandleWithContext()
- readDataByFileHandleWithContext() → Calls
fh.readFromChunksWithContext()
- readFromChunksWithContext() → [INTEGRATION POINT] Try RDMA first
- tryRDMARead() → Parse chunk info, call
RDMAMountClient.ReadNeedle()
- RDMAMountClient → HTTP request to RDMA sidecar
- RDMA Sidecar → Volume lookup + RDMA engine call
- RDMA Engine → Direct memory access via RDMA hardware
- Response Path → Data flows back through all layers to user
✅ What's Working vs Missing
✅ Already Implemented:
- ✅
RDMAMountClient
with HTTP communication - ✅ RDMA sidecar with volume lookup
- ✅ Rust RDMA engine with mock hardware
- ✅ File ID parsing utilities
- ✅ Health checks and statistics
- ✅ Command-line flags for RDMA options
❌ Missing Integration:
- ❌ RDMA client not added to WFS struct
- ❌ RDMA client not initialized in mount command
- ❌
tryRDMARead()
method not implemented - ❌ Chunk-to-needle mapping logic missing
- ❌ RDMA integration not wired into read path
🎯 Next Steps
- Add RDMA client to WFS struct and Option
- Initialize RDMA client in mount command
- Implement
tryRDMARead()
method - Wire RDMA integration into
readFromChunksWithContext()
- Test end-to-end RDMA acceleration
The architecture is sound and most components exist - only the final integration wiring is needed!