You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

9.0 KiB

📋 Weed Mount RDMA Integration - Code Path Analysis

Current Status

The RDMA client (RDMAMountClient) exists in weed/mount/rdma_client.go but is not yet integrated into the actual file read path. The integration points are identified but not implemented.

🔍 Complete Code Path

1. FUSE Read Request Entry Point

// File: weed/mount/weedfs_file_read.go:41
func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
    fh := wfs.GetHandle(FileHandleId(in.Fh))
    // ...
    offset := int64(in.Offset)
    totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
    // ...
    return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}

2. File Handle Read Coordination

// File: weed/mount/weedfs_file_read.go:103
func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
    size := len(buff)
    fhIn.lockForRead(offset, size)
    defer fhIn.unlockForRead(offset, size)

    // KEY INTEGRATION POINT: This is where RDMA should be attempted
    n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
    // ...
    return n, err
}

3. Chunk Reading (Current Implementation)

// File: weed/mount/filehandle_read.go:29
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
    // ...
    
    // CURRENT: Direct chunk reading without RDMA
    totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
    
    // MISSING: RDMA integration should happen here
    return int64(totalRead), ts, err
}

4. RDMA Integration Point (What Needs to Be Added)

The integration should happen in readFromChunksWithContext like this:

func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
    // ... existing code ...
    
    // NEW: Try RDMA acceleration first
    if fh.wfs.rdmaClient != nil && fh.wfs.rdmaClient.IsHealthy() {
        if totalRead, ts, err := fh.tryRDMARead(ctx, buff, offset); err == nil {
            glog.V(4).Infof("RDMA read successful: %d bytes", totalRead)
            return totalRead, ts, nil
        }
        glog.V(2).Infof("RDMA read failed, falling back to HTTP")
    }
    
    // FALLBACK: Original HTTP-based chunk reading
    totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
    return int64(totalRead), ts, err
}

🚀 RDMA Client Integration

5. RDMA Read Implementation (Already Exists)

// File: weed/mount/rdma_client.go:129
func (c *RDMAMountClient) ReadNeedle(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) ([]byte, bool, error) {
    // Prepare request URL
    reqURL := fmt.Sprintf("http://%s/read?volume=%d&needle=%d&cookie=%d&offset=%d&size=%d",
        c.sidecarAddr, volumeID, needleID, cookie, offset, size)
    
    // Execute HTTP request to RDMA sidecar
    resp, err := c.httpClient.Do(req)
    // ...
    
    // Return data with RDMA metadata
    return data, isRDMA, nil
}

6. RDMA Sidecar Processing

// File: seaweedfs-rdma-sidecar/cmd/demo-server/main.go:375
func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
    // Parse volume, needle, cookie from URL parameters
    volumeID, _ := strconv.ParseUint(query.Get("volume"), 10, 32)
    needleID, _ := strconv.ParseUint(query.Get("needle"), 10, 64)
    
    // Use distributed client for volume lookup + RDMA
    if s.useDistributed && s.distributedClient != nil {
        resp, err = s.distributedClient.ReadNeedle(ctx, req)
    } else {
        resp, err = s.rdmaClient.ReadNeedle(ctx, req)  // Local RDMA
    }
    
    // Return binary data or JSON metadata
    w.Write(resp.Data)
}

7. Volume Lookup & RDMA Engine

// File: seaweedfs-rdma-sidecar/pkg/seaweedfs/distributed_client.go:45
func (c *DistributedRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
    // Step 1: Lookup volume location from master
    locations, err := c.locationService.LookupVolume(ctx, req.VolumeID)
    
    // Step 2: Find best server (local preferred)
    bestLocation := c.locationService.FindBestLocation(locations)
    
    // Step 3: Make HTTP request to target server's RDMA sidecar
    return c.makeRDMARequest(ctx, req, bestLocation, start)
}

8. Rust RDMA Engine (Final Data Access)

// File: rdma-engine/src/ipc.rs:403
async fn handle_start_read(req: StartReadRequest, ...) -> RdmaResult<StartReadResponse> {
    // Create RDMA session
    let session_id = Uuid::new_v4().to_string();
    let buffer = vec![0u8; transfer_size as usize];
    
    // Register memory for RDMA
    let memory_region = rdma_context.register_memory(local_addr, transfer_size).await?;
    
    // Perform RDMA read (mock implementation)
    rdma_context.post_read(local_addr, remote_addr, remote_key, size, wr_id).await?;
    let completions = rdma_context.poll_completion(1).await?;
    
    // Return session info
    Ok(StartReadResponse { session_id, local_addr, ... })
}

🔧 Missing Integration Components

1. WFS Struct Extension

// File: weed/mount/weedfs.go (needs modification)
type WFS struct {
    // ... existing fields ...
    rdmaClient *RDMAMountClient  // ADD THIS
}

2. RDMA Client Initialization

// File: weed/command/mount.go (needs modification)
func runMount(cmd *cobra.Command, args []string) bool {
    // ... existing code ...
    
    // NEW: Initialize RDMA client if enabled
    var rdmaClient *mount.RDMAMountClient
    if *mountOptions.rdmaEnabled && *mountOptions.rdmaSidecarAddr != "" {
        rdmaClient, err = mount.NewRDMAMountClient(
            *mountOptions.rdmaSidecarAddr,
            *mountOptions.rdmaMaxConcurrent,
            *mountOptions.rdmaTimeoutMs,
        )
        if err != nil {
            glog.Warningf("Failed to initialize RDMA client: %v", err)
        }
    }
    
    // Pass RDMA client to WFS
    wfs := mount.NewSeaweedFileSystem(&mount.Option{
        // ... existing options ...
        RDMAClient: rdmaClient,  // ADD THIS
    })
}

3. Chunk-to-Needle Mapping

// File: weed/mount/filehandle_read.go (needs new method)
func (fh *FileHandle) tryRDMARead(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
    entry := fh.GetEntry()
    
    // Find which chunk contains the requested offset
    for _, chunk := range entry.GetEntry().Chunks {
        if offset >= chunk.Offset && offset < chunk.Offset+int64(chunk.Size) {
            // Parse chunk.FileId to get volume, needle, cookie
            volumeID, needleID, cookie, err := ParseFileId(chunk.FileId)
            if err != nil {
                return 0, 0, err
            }
            
            // Calculate offset within the chunk
            chunkOffset := uint64(offset - chunk.Offset)
            readSize := uint64(min(len(buff), int(chunk.Size-chunkOffset)))
            
            // Make RDMA request
            data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(
                ctx, volumeID, needleID, cookie, chunkOffset, readSize)
            if err != nil {
                return 0, 0, err
            }
            
            // Copy data to buffer
            copied := copy(buff, data)
            return int64(copied), time.Now().UnixNano(), nil
        }
    }
    
    return 0, 0, fmt.Errorf("chunk not found for offset %d", offset)
}

📊 Request Flow Summary

  1. User Applicationread() system call
  2. FUSE Kernel → Routes to WFS.Read()
  3. WFS.Read() → Calls readDataByFileHandleWithContext()
  4. readDataByFileHandleWithContext() → Calls fh.readFromChunksWithContext()
  5. readFromChunksWithContext()[INTEGRATION POINT] Try RDMA first
  6. tryRDMARead() → Parse chunk info, call RDMAMountClient.ReadNeedle()
  7. RDMAMountClient → HTTP request to RDMA sidecar
  8. RDMA Sidecar → Volume lookup + RDMA engine call
  9. RDMA Engine → Direct memory access via RDMA hardware
  10. Response Path → Data flows back through all layers to user

What's Working vs Missing

Already Implemented:

  • RDMAMountClient with HTTP communication
  • RDMA sidecar with volume lookup
  • Rust RDMA engine with mock hardware
  • File ID parsing utilities
  • Health checks and statistics
  • Command-line flags for RDMA options

Missing Integration:

  • RDMA client not added to WFS struct
  • RDMA client not initialized in mount command
  • tryRDMARead() method not implemented
  • Chunk-to-needle mapping logic missing
  • RDMA integration not wired into read path

🎯 Next Steps

  1. Add RDMA client to WFS struct and Option
  2. Initialize RDMA client in mount command
  3. Implement tryRDMARead() method
  4. Wire RDMA integration into readFromChunksWithContext()
  5. Test end-to-end RDMA acceleration

The architecture is sound and most components exist - only the final integration wiring is needed!