|
@ -28,11 +28,11 @@ type RDMAMountClient struct { |
|
|
lookupFileIdFn wdclient.LookupFileIdFunctionType |
|
|
lookupFileIdFn wdclient.LookupFileIdFunctionType |
|
|
|
|
|
|
|
|
// Statistics
|
|
|
// Statistics
|
|
|
totalRequests int64 |
|
|
|
|
|
successfulReads int64 |
|
|
|
|
|
failedReads int64 |
|
|
|
|
|
totalBytesRead int64 |
|
|
|
|
|
totalLatencyNs int64 |
|
|
|
|
|
|
|
|
totalRequests atomic.Int64 |
|
|
|
|
|
successfulReads atomic.Int64 |
|
|
|
|
|
failedReads atomic.Int64 |
|
|
|
|
|
totalBytesRead atomic.Int64 |
|
|
|
|
|
totalLatencyNs atomic.Int64 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// RDMAReadRequest represents a request to read data via RDMA
|
|
|
// RDMAReadRequest represents a request to read data via RDMA
|
|
@ -178,13 +178,13 @@ func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, |
|
|
return nil, false, ctx.Err() |
|
|
return nil, false, ctx.Err() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
atomic.AddInt64(&c.totalRequests, 1) |
|
|
|
|
|
|
|
|
c.totalRequests.Add(1) |
|
|
startTime := time.Now() |
|
|
startTime := time.Now() |
|
|
|
|
|
|
|
|
// Lookup volume location using file ID directly
|
|
|
// Lookup volume location using file ID directly
|
|
|
volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID) |
|
|
volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
atomic.AddInt64(&c.failedReads, 1) |
|
|
|
|
|
|
|
|
c.failedReads.Add(1) |
|
|
return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) |
|
|
return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -194,23 +194,23 @@ func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, |
|
|
|
|
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) |
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
atomic.AddInt64(&c.failedReads, 1) |
|
|
|
|
|
|
|
|
c.failedReads.Add(1) |
|
|
return nil, false, fmt.Errorf("failed to create RDMA request: %w", err) |
|
|
return nil, false, fmt.Errorf("failed to create RDMA request: %w", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Execute request
|
|
|
// Execute request
|
|
|
resp, err := c.httpClient.Do(req) |
|
|
resp, err := c.httpClient.Do(req) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
atomic.AddInt64(&c.failedReads, 1) |
|
|
|
|
|
|
|
|
c.failedReads.Add(1) |
|
|
return nil, false, fmt.Errorf("RDMA request failed: %w", err) |
|
|
return nil, false, fmt.Errorf("RDMA request failed: %w", err) |
|
|
} |
|
|
} |
|
|
defer resp.Body.Close() |
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
|
duration := time.Since(startTime) |
|
|
duration := time.Since(startTime) |
|
|
atomic.AddInt64(&c.totalLatencyNs, duration.Nanoseconds()) |
|
|
|
|
|
|
|
|
c.totalLatencyNs.Add(duration.Nanoseconds()) |
|
|
|
|
|
|
|
|
if resp.StatusCode != http.StatusOK { |
|
|
if resp.StatusCode != http.StatusOK { |
|
|
atomic.AddInt64(&c.failedReads, 1) |
|
|
|
|
|
|
|
|
c.failedReads.Add(1) |
|
|
body, _ := io.ReadAll(resp.Body) |
|
|
body, _ := io.ReadAll(resp.Body) |
|
|
return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body)) |
|
|
return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body)) |
|
|
} |
|
|
} |
|
@ -256,12 +256,12 @@ func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
atomic.AddInt64(&c.failedReads, 1) |
|
|
|
|
|
|
|
|
c.failedReads.Add(1) |
|
|
return nil, false, fmt.Errorf("failed to read RDMA response: %w", err) |
|
|
return nil, false, fmt.Errorf("failed to read RDMA response: %w", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
atomic.AddInt64(&c.successfulReads, 1) |
|
|
|
|
|
atomic.AddInt64(&c.totalBytesRead, int64(len(data))) |
|
|
|
|
|
|
|
|
c.successfulReads.Add(1) |
|
|
|
|
|
c.totalBytesRead.Add(int64(len(data))) |
|
|
|
|
|
|
|
|
// Log successful operation
|
|
|
// Log successful operation
|
|
|
glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s", |
|
|
glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s", |
|
@ -308,11 +308,11 @@ func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) { |
|
|
|
|
|
|
|
|
// GetStats returns current RDMA client statistics
|
|
|
// GetStats returns current RDMA client statistics
|
|
|
func (c *RDMAMountClient) GetStats() map[string]interface{} { |
|
|
func (c *RDMAMountClient) GetStats() map[string]interface{} { |
|
|
totalRequests := atomic.LoadInt64(&c.totalRequests) |
|
|
|
|
|
successfulReads := atomic.LoadInt64(&c.successfulReads) |
|
|
|
|
|
failedReads := atomic.LoadInt64(&c.failedReads) |
|
|
|
|
|
totalBytesRead := atomic.LoadInt64(&c.totalBytesRead) |
|
|
|
|
|
totalLatencyNs := atomic.LoadInt64(&c.totalLatencyNs) |
|
|
|
|
|
|
|
|
totalRequests := c.totalRequests.Load() |
|
|
|
|
|
successfulReads := c.successfulReads.Load() |
|
|
|
|
|
failedReads := c.failedReads.Load() |
|
|
|
|
|
totalBytesRead := c.totalBytesRead.Load() |
|
|
|
|
|
totalLatencyNs := c.totalLatencyNs.Load() |
|
|
|
|
|
|
|
|
successRate := float64(0) |
|
|
successRate := float64(0) |
|
|
avgLatencyNs := int64(0) |
|
|
avgLatencyNs := int64(0) |
|
|