Browse Source

feat: CP10B-1 NVMe/TCP RX/TX split + CP10B-2 bench/profiling fixes

RX/TX split: rxLoop reads PDUs, txLoop writes responses via respCh.
Handlers refactored to void + enqueueResponse pattern. IOCCSZ fix
enables inline write data (100K IOPS vs 15K before). R2T deadlock
fix via completeWaiters. Shutdown cleans up pendingCapsules buffers.

Bench: ParseFioMetric accepts plain/quoted numbers for aggregated
medians. Profiling actions: pprof_capture, vmstat_capture, iostat_capture.

196 NVMe tests, 92 testrunner actions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
4c5f9f2b9d
  1. 42
      weed/storage/blockvol/nvme/admin.go
  2. 457
      weed/storage/blockvol/nvme/controller.go
  3. 56
      weed/storage/blockvol/nvme/fabric.go
  4. 44
      weed/storage/blockvol/nvme/identify.go
  5. 89
      weed/storage/blockvol/nvme/io.go
  6. 120
      weed/storage/blockvol/nvme/nvme_test.go
  7. 11
      weed/storage/blockvol/nvme/wire.go
  8. 26
      weed/storage/blockvol/testrunner/actions/bench.go
  9. 49
      weed/storage/blockvol/testrunner/actions/bench_test.go
  10. 10
      weed/storage/blockvol/testrunner/actions/devops_test.go
  11. 155
      weed/storage/blockvol/testrunner/actions/metrics.go
  12. 6
      weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml

42
weed/storage/blockvol/nvme/admin.go

@ -5,7 +5,7 @@ import (
)
// handleSetFeatures processes SetFeatures admin commands.
func (c *Controller) handleSetFeatures(req *Request) error {
func (c *Controller) handleSetFeatures(req *Request) {
fid := uint8(req.capsule.D10 & 0xFF)
switch fid {
@ -31,25 +31,23 @@ func (c *Controller) handleSetFeatures(req *Request) error {
// Response DW0: (NCQR-1) | ((NSQR-1) << 16)
req.resp.DW0 = uint32(ncqr-1) | (uint32(nsqr-1) << 16)
return c.sendResponse(req)
case fidKeepAliveTimer:
// D11 contains KATO in milliseconds
c.katoMs = req.capsule.D11
return c.sendResponse(req)
case fidAsyncEventConfig:
// Stub: accept but don't deliver events
return c.sendResponse(req)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
}
c.enqueueResponse(&response{resp: req.resp})
}
// handleGetFeatures returns stored feature values.
func (c *Controller) handleGetFeatures(req *Request) error {
func (c *Controller) handleGetFeatures(req *Request) {
fid := uint8(req.capsule.D10 & 0xFF)
switch fid {
@ -59,24 +57,22 @@ func (c *Controller) handleGetFeatures(req *Request) error {
n = c.maxIOQueues
}
req.resp.DW0 = uint32(n-1) | (uint32(n-1) << 16)
return c.sendResponse(req)
case fidKeepAliveTimer:
req.resp.DW0 = c.katoMs
return c.sendResponse(req)
case fidAsyncEventConfig:
req.resp.DW0 = 0
return c.sendResponse(req)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
}
c.enqueueResponse(&response{resp: req.resp})
}
// handleGetLogPage returns log page data.
func (c *Controller) handleGetLogPage(req *Request) error {
func (c *Controller) handleGetLogPage(req *Request) {
// D10 bits 7:0 = Log Page Identifier
// D10 bits 27:16 and D11 bits 15:0 = Number of Dwords (NUMD)
lid := uint8(req.capsule.D10 & 0xFF)
@ -87,28 +83,28 @@ func (c *Controller) handleGetLogPage(req *Request) error {
switch lid {
case logPageError:
return c.logPageError(req, length)
c.logPageError(req, length)
case logPageSMART:
return c.logPageSMART(req, length)
c.logPageSMART(req, length)
case logPageANA:
return c.logPageANA(req, length)
c.logPageANA(req, length)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
}
// logPageError returns an empty error log page.
func (c *Controller) logPageError(req *Request, length uint32) error {
func (c *Controller) logPageError(req *Request, length uint32) {
if length > 64 {
length = 64
}
req.c2hData = make([]byte, length)
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// logPageSMART returns a 512-byte SMART/Health log.
func (c *Controller) logPageSMART(req *Request, length uint32) error {
func (c *Controller) logPageSMART(req *Request, length uint32) {
if length > 512 {
length = 512
}
@ -130,11 +126,11 @@ func (c *Controller) logPageSMART(req *Request, length uint32) error {
buf[5] = 0
req.c2hData = buf[:length]
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// logPageANA returns the ANA log page with a single group.
func (c *Controller) logPageANA(req *Request, length uint32) error {
func (c *Controller) logPageANA(req *Request, length uint32) {
// ANA log page format (32 bytes for single group):
// [0:8] CHGCNT (uint64)
// [8:10] NGRPS = 1 (uint16)
@ -167,7 +163,7 @@ func (c *Controller) logPageANA(req *Request, length uint32) error {
length = anaLogSize
}
req.c2hData = buf[:length]
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// anaState returns the current ANA state based on the subsystem's device.
@ -192,7 +188,7 @@ func (c *Controller) anaChangeCount() uint64 {
}
// handleKeepAlive resets the KATO timer and returns success.
func (c *Controller) handleKeepAlive(req *Request) error {
func (c *Controller) handleKeepAlive(req *Request) {
c.resetKATO()
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}

457
weed/storage/blockvol/nvme/controller.go

@ -1,6 +1,7 @@
package nvme
import (
"errors"
"fmt"
"io"
"log"
@ -10,6 +11,10 @@ import (
"time"
)
// errDisconnect is a sentinel returned by handleDisconnect to signal
// the rxLoop to exit gracefully after enqueuing the disconnect response.
var errDisconnect = errors.New("disconnect")
// controllerState tracks the lifecycle of an NVMe controller session.
type controllerState int
@ -31,7 +36,27 @@ type Request struct {
status StatusWord
}
// response represents a pending response to be written by the txLoop.
// Either a standard CapsuleResp (with optional C2H data), or an R2T PDU.
type response struct {
// Standard response fields
resp CapsuleResponse
c2hData []byte // non-nil for read responses
// R2T variant (when r2t is non-nil, resp/c2hData are ignored)
r2t *R2THeader
r2tDone chan struct{} // closed after R2T is flushed to wire
}
// Controller handles one NVMe/TCP connection (one queue per connection).
//
// After the IC handshake, the connection is split into two goroutines:
// - rxLoop: reads PDUs from the wire, dispatches commands
// - txLoop: drains the response channel, writes responses to the wire
//
// IO commands are dispatched to goroutines for concurrent processing.
// Admin commands run synchronously in the rxLoop (infrequent, state-changing).
// All responses flow through respCh — only txLoop writes to c.out.
type Controller struct {
mu sync.Mutex
@ -45,8 +70,9 @@ type Controller struct {
// Queue state (one queue per TCP connection)
queueID uint16
queueSize uint16
sqhd uint16 // Submission Queue Head pointer
flowCtlOff bool // CATTR bit2: SQ flow control disabled
rxSQHD uint16 // Submission Queue Head (updated by rxLoop only)
sqhdVal atomic.Uint32 // Latest SQHD for txLoop (lower 16 bits)
flowCtlOff bool // CATTR bit2: SQ flow control disabled
// Controller identity
cntlID uint16
@ -63,9 +89,10 @@ type Controller struct {
katoTimer *time.Timer
katoMu sync.Mutex
// Async completion (IO queues)
waiting chan *Request // pre-allocated request pool
completions chan *Request // completed requests to send
// RX/TX split
respCh chan *response // responses from handlers → txLoop
done chan struct{} // closed when connection is shutting down
cmdWg sync.WaitGroup // tracks in-flight IO command goroutines
// Backend
subsystem *Subsystem
@ -78,11 +105,11 @@ type Controller struct {
maxDataLen uint32 // C2H/H2C data chunk size (from Config)
// Command interleaving: capsules received during R2T H2CData collection.
// Drained by Serve() before reading the next PDU from the wire.
// Drained by rxLoop before reading the next PDU from the wire.
pendingCapsules []*Request
// Lifecycle
wg sync.WaitGroup
wg sync.WaitGroup
closeOnce sync.Once
}
@ -94,7 +121,7 @@ func newController(conn net.Conn, server *Server) *Controller {
}
c := &Controller{
conn: conn,
in: NewReader(conn),
in: NewReaderSize(conn, int(maxData)+maxHeaderSize),
out: NewWriterSize(conn, int(maxData)+maxHeaderSize),
state: stateConnected,
server: server,
@ -108,14 +135,80 @@ func newController(conn net.Conn, server *Server) *Controller {
}
// Serve is the main event loop for this controller connection.
//
// Phase 1: IC handshake (synchronous, direct c.out access).
// Phase 2: Start txLoop goroutine, enter rxLoop.
func (c *Controller) Serve() error {
defer c.shutdown()
// IC handshake timeout
// Phase 1: IC handshake with timeout (synchronous).
if err := c.conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
return err
}
hdr, err := c.in.Dequeue()
if err != nil {
return fmt.Errorf("IC handshake: %w", err)
}
if hdr.Type != pduICReq {
return fmt.Errorf("expected ICReq (0x%x), got 0x%x", pduICReq, hdr.Type)
}
if err := c.handleIC(); err != nil {
return fmt.Errorf("IC handshake: %w", err)
}
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
// Phase 2: Start TX loop, enter RX loop.
c.respCh = make(chan *response, 128)
c.done = make(chan struct{})
txErrCh := make(chan error, 1)
go func() {
txErrCh <- c.txLoop()
}()
rxErr := c.rxLoop()
// Wait for in-flight IO command goroutines to finish and enqueue responses.
c.cmdWg.Wait()
// Signal txLoop to drain remaining responses and exit.
close(c.done)
txErr := <-txErrCh
// errDisconnect is a graceful exit, not an error.
if errors.Is(rxErr, errDisconnect) {
rxErr = nil
}
if rxErr != nil {
return rxErr
}
return txErr
}
// handleIC processes the IC handshake.
func (c *Controller) handleIC() error {
var req ICRequest
if err := c.in.Receive(&req); err != nil {
return err
}
resp := ICResponse{
PDUFormatVersion: 0,
MaxH2CDataLength: c.maxDataLen,
}
if err := c.out.SendHeaderOnly(pduICResp, &resp, icBodySize); err != nil {
return err
}
c.state = stateICComplete
return nil
}
// rxLoop reads PDUs from the wire and dispatches commands.
// Admin commands (QID=0) run synchronously. IO commands dispatch to goroutines.
func (c *Controller) rxLoop() error {
for {
if c.closed.Load() {
return nil
@ -125,8 +218,9 @@ func (c *Controller) Serve() error {
for len(c.pendingCapsules) > 0 {
req := c.pendingCapsules[0]
c.pendingCapsules = c.pendingCapsules[1:]
if err := c.dispatchPending(req); err != nil {
return fmt.Errorf("pending capsule: %w", err)
c.advanceSQHD()
if err := c.dispatchFromRx(req); err != nil {
return err
}
}
@ -139,27 +233,24 @@ func (c *Controller) Serve() error {
}
switch hdr.Type {
case pduICReq:
if err := c.handleIC(); err != nil {
return fmt.Errorf("IC handshake: %w", err)
}
// Clear read deadline after successful IC
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
case pduCapsuleCmd:
if err := c.handleCapsule(); err != nil {
if c.state < stateICComplete {
return fmt.Errorf("capsule command before IC handshake")
}
req, err := c.parseCapsule()
if err != nil {
return fmt.Errorf("capsule: %w", err)
}
c.advanceSQHD()
if err := c.dispatchFromRx(req); err != nil {
return err
}
case pduH2CData:
// H2CData PDUs are only expected after R2T, handled inline
// by recvH2CData. If we see one here, it's unexpected.
return fmt.Errorf("unexpected H2CData PDU outside R2T flow")
case pduH2CTermReq:
return nil // host terminated
return nil
default:
return fmt.Errorf("unexpected PDU type: 0x%x", hdr.Type)
@ -167,85 +258,145 @@ func (c *Controller) Serve() error {
}
}
// handleIC processes the IC handshake.
func (c *Controller) handleIC() error {
var req ICRequest
if err := c.in.Receive(&req); err != nil {
return err
}
resp := ICResponse{
PDUFormatVersion: 0,
MaxH2CDataLength: c.maxDataLen,
}
if err := c.out.SendHeaderOnly(pduICResp, &resp, icBodySize); err != nil {
return err
// txLoop drains the response channel and writes responses to the wire.
// Only txLoop touches c.out after the IC handshake.
func (c *Controller) txLoop() error {
var firstErr error
for {
select {
case resp := <-c.respCh:
if firstErr != nil {
completeWaiters(resp)
continue // discard — connection already failed
}
if err := c.writeResponse(resp); err != nil {
firstErr = err
c.conn.Close() // force rxLoop EOF
}
case <-c.done:
// Drain remaining responses.
for {
select {
case resp := <-c.respCh:
if firstErr == nil {
if err := c.writeResponse(resp); err != nil {
firstErr = err
}
} else {
completeWaiters(resp)
}
default:
return firstErr
}
}
}
}
c.state = stateICComplete
return nil
}
// handleCapsule dispatches a CapsuleCmd PDU.
func (c *Controller) handleCapsule() error {
// Reject capsule commands before IC handshake is complete.
if c.state < stateICComplete {
return fmt.Errorf("capsule command before IC handshake")
}
// parseCapsule reads a CapsuleCmd PDU (specific header + optional inline data).
func (c *Controller) parseCapsule() (*Request, error) {
var capsule CapsuleCommand
if err := c.in.Receive(&capsule); err != nil {
return err
return nil, err
}
// Read optional inline data
var payload []byte
if dataLen := c.in.Length(); dataLen > 0 {
payload = getBuffer(int(dataLen))
if err := c.in.ReceiveData(payload); err != nil {
putBuffer(payload)
return err
return nil, err
}
}
// Advance SQHD
c.sqhd++
if c.sqhd >= c.queueSize && c.queueSize > 0 {
c.sqhd = 0
}
req := &Request{
capsule: capsule,
payload: payload,
}
req.resp.CID = capsule.CID
req.resp.QueueID = c.queueID
// SQHD is set in sendResponse/sendC2HDataAndResponse using the
// latest c.flowCtlOff value, so Connect responses correctly get
// SQHD=0xFFFF when the host requests flowCtlOff via CATTR.
req.resp.Status = uint16(StatusSuccess)
return req, nil
}
// advanceSQHD increments the submission queue head pointer (rxLoop only).
func (c *Controller) advanceSQHD() {
sqhd := c.rxSQHD + 1
if sqhd >= c.queueSize && c.queueSize > 0 {
sqhd = 0
}
c.rxSQHD = sqhd
c.sqhdVal.Store(uint32(sqhd))
}
// dispatchFromRx routes a parsed capsule to the appropriate handler.
// Admin queue: synchronous in rxLoop. IO queue: concurrent goroutine.
func (c *Controller) dispatchFromRx(req *Request) error {
if c.queueID == 0 {
return c.dispatchAdmin(req)
}
return c.dispatchIO(req)
// For Write commands without inline data, collect R2T data in rxLoop
// before dispatching to the goroutine.
if req.capsule.OpCode == ioWrite && len(req.payload) == 0 {
if err := c.collectR2TData(req); err != nil {
return err
}
}
c.cmdWg.Add(1)
go func() {
defer c.cmdWg.Done()
c.dispatchIO(req)
}()
return nil
}
// dispatchPending processes a capsule that was buffered during R2T data
// collection. The capsule and payload are already fully read — only
// SQHD advance and command dispatch remain.
func (c *Controller) dispatchPending(req *Request) error {
c.sqhd++
if c.sqhd >= c.queueSize && c.queueSize > 0 {
c.sqhd = 0
// collectR2TData handles the R2T/H2C flow in the rxLoop for Write commands
// that don't carry inline data. Sends R2T through the txLoop, then reads
// H2C Data PDUs inline. Sets req.payload with the collected data.
func (c *Controller) collectR2TData(req *Request) error {
sub := c.subsystem
if sub == nil {
// Let handleWrite deal with the nil subsystem.
return nil
}
if c.queueID == 0 {
return c.dispatchAdmin(req)
nlb := req.capsule.LbaLength()
expectedBytes := uint32(nlb) * sub.Dev.BlockSize()
// Send R2T through txLoop and wait for it to be flushed.
r2tDone := make(chan struct{})
select {
case c.respCh <- &response{
r2t: &R2THeader{
CCCID: req.capsule.CID,
TAG: 0,
DATAO: 0,
DATAL: expectedBytes,
},
r2tDone: r2tDone,
}:
case <-c.done:
return nil
}
select {
case <-r2tDone:
case <-c.done:
return nil
}
return c.dispatchIO(req)
// Read H2C Data PDUs from the wire.
data, err := c.recvH2CData(expectedBytes)
if err != nil {
return err
}
req.payload = data
return nil
}
// dispatchAdmin handles admin queue commands synchronously.
// dispatchAdmin handles admin queue commands synchronously in the rxLoop.
func (c *Controller) dispatchAdmin(req *Request) error {
defer func() {
if req.payload != nil {
@ -261,26 +412,26 @@ func (c *Controller) dispatchAdmin(req *Request) error {
switch capsule.OpCode {
case adminIdentify:
return c.handleIdentify(req)
c.handleIdentify(req)
case adminSetFeatures:
return c.handleSetFeatures(req)
c.handleSetFeatures(req)
case adminGetFeatures:
return c.handleGetFeatures(req)
c.handleGetFeatures(req)
case adminGetLogPage:
return c.handleGetLogPage(req)
c.handleGetLogPage(req)
case adminKeepAlive:
return c.handleKeepAlive(req)
c.handleKeepAlive(req)
case adminAsyncEvent:
// Stub: just succeed (don't deliver events in CP10-1)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
default:
req.resp.Status = uint16(StatusInvalidOpcode)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
return nil
}
// dispatchIO handles IO queue commands.
func (c *Controller) dispatchIO(req *Request) error {
// dispatchIO handles IO queue commands (runs in a goroutine).
func (c *Controller) dispatchIO(req *Request) {
defer func() {
if req.payload != nil {
putBuffer(req.payload)
@ -291,95 +442,101 @@ func (c *Controller) dispatchIO(req *Request) error {
switch capsule.OpCode {
case ioRead:
return c.handleRead(req)
c.handleRead(req)
case ioWrite:
return c.handleWrite(req)
c.handleWrite(req)
case ioFlush:
return c.handleFlush(req)
c.handleFlush(req)
case ioWriteZeros:
return c.handleWriteZeros(req)
c.handleWriteZeros(req)
default:
req.resp.Status = uint16(StatusInvalidOpcode)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
}
// sendC2HDataAndResponse sends C2HData PDUs followed by a CapsuleResp.
// All chunks and the final response are batched in the bufio buffer,
// then flushed to the wire in a single FlushBuf() call.
func (c *Controller) sendC2HDataAndResponse(req *Request) error {
if len(req.c2hData) > 0 {
data := req.c2hData
offset := uint32(0)
chunkSize := c.maxDataLen
for offset < uint32(len(data)) {
end := offset + chunkSize
if end > uint32(len(data)) {
end = uint32(len(data))
}
chunk := data[offset:end]
hdr := C2HDataHeader{
CCCID: req.capsule.CID,
DATAO: offset,
DATAL: uint32(len(chunk)),
}
// completeWaiters closes any synchronization channels on a discarded response.
// Must be called when txLoop drops a response after a write error, so that
// senders blocked on r2tDone (e.g. collectR2TData) are unblocked.
func completeWaiters(resp *response) {
if resp.r2tDone != nil {
close(resp.r2tDone)
}
}
flags := uint8(0)
if end >= uint32(len(data)) {
flags = c2hFlagLast
}
// enqueueResponse sends a response to the txLoop for writing.
// Safe to call from any goroutine. If the connection is shutting down,
// the response is silently discarded.
func (c *Controller) enqueueResponse(resp *response) {
select {
case c.respCh <- resp:
case <-c.done:
}
}
if err := c.out.writeHeaderAndData(pduC2HData, flags, &hdr, c2hDataHdrSize, chunk); err != nil {
return err
}
offset = end
// writeResponse writes a single response to the wire (txLoop only).
func (c *Controller) writeResponse(resp *response) error {
// R2T variant: write R2T header and signal caller.
if resp.r2t != nil {
err := c.out.SendHeaderOnly(pduR2T, resp.r2t, r2tHdrSize)
if resp.r2tDone != nil {
close(resp.r2tDone)
}
return err
}
// Write CapsuleResp to bufio buffer
// Standard response: set SQHD from latest value.
if c.flowCtlOff {
req.resp.SQHD = 0xFFFF
resp.resp.SQHD = 0xFFFF
} else {
req.resp.SQHD = c.sqhd
resp.resp.SQHD = uint16(c.sqhdVal.Load())
}
c.resetKATO()
if err := c.out.writeHeaderAndData(pduCapsuleResp, 0, &req.resp, capsuleRespSize, nil); err != nil {
return err
}
// Single flush: all C2H chunks + CapsuleResp in one syscall
return c.out.FlushBuf()
}
// sendResponse sends a CapsuleResp PDU.
// SQHD is set here (not in handleCapsule) so that flowCtlOff changes
// made during command dispatch (e.g. Fabric Connect) take effect
// on the same response.
func (c *Controller) sendResponse(req *Request) error {
if c.flowCtlOff {
req.resp.SQHD = 0xFFFF
} else {
req.resp.SQHD = c.sqhd
if len(resp.c2hData) > 0 {
return c.writeC2HAndResp(resp)
}
c.resetKATO()
return c.out.SendHeaderOnly(pduCapsuleResp, &req.resp, capsuleRespSize)
return c.out.SendHeaderOnly(pduCapsuleResp, &resp.resp, capsuleRespSize)
}
// ---------- R2T / H2C Data ----------
// writeC2HAndResp writes C2H data chunks followed by CapsuleResp, batched.
func (c *Controller) writeC2HAndResp(resp *response) error {
data := resp.c2hData
offset := uint32(0)
chunkSize := c.maxDataLen
for offset < uint32(len(data)) {
end := offset + chunkSize
if end > uint32(len(data)) {
end = uint32(len(data))
}
chunk := data[offset:end]
hdr := C2HDataHeader{
CCCID: resp.resp.CID,
DATAO: offset,
DATAL: uint32(len(chunk)),
}
flags := uint8(0)
if end >= uint32(len(data)) {
flags = c2hFlagLast
}
// sendR2T sends a Ready-to-Transfer PDU requesting data from the host.
func (c *Controller) sendR2T(cid uint16, tag uint16, offset, length uint32) error {
r2t := R2THeader{
CCCID: cid,
TAG: tag,
DATAO: offset,
DATAL: length,
if err := c.out.writeHeaderAndData(pduC2HData, flags, &hdr, c2hDataHdrSize, chunk); err != nil {
return err
}
offset = end
}
return c.out.SendHeaderOnly(pduR2T, &r2t, r2tHdrSize)
if err := c.out.writeHeaderAndData(pduCapsuleResp, 0, &resp.resp, capsuleRespSize, nil); err != nil {
return err
}
return c.out.FlushBuf()
}
// ---------- R2T / H2C Data ----------
// recvH2CData reads H2CData PDU(s) from the wire and returns the accumulated data.
// Reads exactly `totalBytes` of data, potentially across multiple H2C PDUs.
//
@ -511,6 +668,16 @@ func (c *Controller) shutdown() {
c.stopKATO()
c.state = stateClosed
c.conn.Close()
// Release pooled buffers from interleaved capsules that were never dispatched.
for _, req := range c.pendingCapsules {
if req.payload != nil {
putBuffer(req.payload)
req.payload = nil
}
}
c.pendingCapsules = nil
if c.server != nil {
if c.isAdmin && c.cntlID != 0 {
c.server.unregisterAdmin(c.cntlID)

56
weed/storage/blockvol/nvme/fabric.go

@ -5,24 +5,29 @@ import (
)
// handleFabricCommand dispatches Fabric-specific commands by FCType.
// Returns errDisconnect on Disconnect to signal the rxLoop to exit gracefully.
func (c *Controller) handleFabricCommand(req *Request) error {
switch req.capsule.FCType {
case fcConnect:
return c.handleConnect(req)
c.handleConnect(req)
return nil
case fcPropertyGet:
return c.handlePropertyGet(req)
c.handlePropertyGet(req)
return nil
case fcPropertySet:
return c.handlePropertySet(req)
c.handlePropertySet(req)
return nil
case fcDisconnect:
return c.handleDisconnect(req)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return nil
}
}
// handleConnect processes a Fabric Connect command.
func (c *Controller) handleConnect(req *Request) error {
func (c *Controller) handleConnect(req *Request) {
capsule := &req.capsule
// Parse QueueID, QueueSize, KATO, CATTR from capsule dwords.
@ -38,7 +43,8 @@ func (c *Controller) handleConnect(req *Request) error {
// Parse ConnectData from payload
if len(req.payload) < connectDataSize {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
var cd ConnectData
cd.Unmarshal(req.payload)
@ -48,7 +54,8 @@ func (c *Controller) handleConnect(req *Request) error {
sub := c.server.findSubsystem(cd.SubNQN)
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
c.subsystem = sub
@ -76,7 +83,8 @@ func (c *Controller) handleConnect(req *Request) error {
// Return CNTLID in DW0
req.resp.DW0 = uint32(c.cntlID)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
// IO queue connect — look up admin session from server registry.
@ -85,17 +93,20 @@ func (c *Controller) handleConnect(req *Request) error {
admin := c.server.lookupAdmin(cd.CNTLID)
if admin == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
// Validate SubNQN and HostNQN match the admin session.
if cd.SubNQN != admin.subNQN {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
if cd.HostNQN != admin.hostNQN {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
c.cntlID = cd.CNTLID
@ -107,11 +118,11 @@ func (c *Controller) handleConnect(req *Request) error {
c.state = stateIOActive
req.resp.DW0 = uint32(c.cntlID)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// handlePropertyGet returns a controller register value.
func (c *Controller) handlePropertyGet(req *Request) error {
func (c *Controller) handlePropertyGet(req *Request) {
// Per NVMe-oF spec: CDW10 bits 2:0 = ATTRIB (size), CDW11 = OFST (offset)
size8 := (req.capsule.D10 & 1) != 0
offset := req.capsule.D11
@ -128,7 +139,8 @@ func (c *Controller) handlePropertyGet(req *Request) error {
val = uint64(c.regCSTS)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
if size8 {
@ -138,11 +150,11 @@ func (c *Controller) handlePropertyGet(req *Request) error {
} else {
req.resp.DW0 = uint32(val)
}
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// handlePropertySet handles controller register writes.
func (c *Controller) handlePropertySet(req *Request) error {
func (c *Controller) handlePropertySet(req *Request) {
// Per NVMe-oF spec: CDW10 = ATTRIB (size), CDW11 = OFST (offset), CDW12-CDW13 = VALUE
offset := req.capsule.D11
value := uint64(req.capsule.D12) | uint64(req.capsule.D13)<<32
@ -163,16 +175,14 @@ func (c *Controller) handlePropertySet(req *Request) error {
default:
// Ignore writes to other registers
}
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// handleDisconnect processes a Fabric Disconnect.
// Enqueues the response and returns errDisconnect to signal rxLoop exit.
func (c *Controller) handleDisconnect(req *Request) error {
if err := c.sendResponse(req); err != nil {
return err
}
c.shutdown()
return nil
c.enqueueResponse(&response{resp: req.resp})
return errDisconnect
}
// ---------- Subsystem ----------
@ -241,7 +251,7 @@ func propertySetValue(capsule *CapsuleCommand) uint64 {
return uint64(capsule.D12) | uint64(capsule.D13)<<32
}
// propertyGetSize returns true if the PropertyGet requests an 8-byte value.
// propertyGetSize8 returns true if the PropertyGet requests an 8-byte value.
func propertyGetSize8(capsule *CapsuleCommand) bool {
return (capsule.D10 & 1) != 0
}

44
weed/storage/blockvol/nvme/identify.go

@ -8,32 +8,33 @@ import (
const identifySize = 4096
// handleIdentify dispatches Identify commands by CNS type.
func (c *Controller) handleIdentify(req *Request) error {
func (c *Controller) handleIdentify(req *Request) {
cns := uint8(req.capsule.D10 & 0xFF)
switch cns {
case cnsIdentifyController:
return c.identifyController(req)
c.identifyController(req)
case cnsIdentifyNamespace:
return c.identifyNamespace(req)
c.identifyNamespace(req)
case cnsActiveNSList:
return c.identifyActiveNSList(req)
c.identifyActiveNSList(req)
case cnsNSDescriptorList:
return c.identifyNSDescriptors(req)
c.identifyNSDescriptors(req)
default:
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
}
// identifyController returns the 4096-byte Identify Controller data structure.
func (c *Controller) identifyController(req *Request) error {
func (c *Controller) identifyController(req *Request) {
buf := make([]byte, identifySize)
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
// VID (PCI Vendor ID) - use 0 for software target
@ -135,8 +136,11 @@ func (c *Controller) identifyController(req *Request) error {
copy(buf[768:1024], sub.NQN) // buf is already zeroed → NUL-terminated
// IOCCSZ (I/O Queue Command Capsule Supported Size) - offset 1792-1795
// In 16-byte units: 64/16 = 4
binary.LittleEndian.PutUint32(buf[1792:], 4)
// In 16-byte units: (SQE + max inline data) / 16
// Allows host to send write data inline in the command capsule,
// avoiding R2T round-trip for small writes (e.g. 4K).
ioccsz := (64 + c.maxDataLen) / 16
binary.LittleEndian.PutUint32(buf[1792:], ioccsz)
// IORCSZ (I/O Queue Response Capsule Supported Size) - offset 1796-1799
// In 16-byte units: 16/16 = 1
@ -158,15 +162,16 @@ func (c *Controller) identifyController(req *Request) error {
binary.LittleEndian.PutUint16(buf[1804:], 0x01)
req.c2hData = buf
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// identifyNamespace returns the 4096-byte Identify Namespace data for NSID=1.
func (c *Controller) identifyNamespace(req *Request) error {
func (c *Controller) identifyNamespace(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
dev := sub.Dev
@ -214,25 +219,26 @@ func (c *Controller) identifyNamespace(req *Request) error {
binary.LittleEndian.PutUint32(buf[92:], 1)
req.c2hData = buf
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// identifyActiveNSList returns the list of active namespace IDs (just NSID=1).
func (c *Controller) identifyActiveNSList(req *Request) error {
func (c *Controller) identifyActiveNSList(req *Request) {
buf := make([]byte, identifySize)
// Single namespace: NSID=1
binary.LittleEndian.PutUint32(buf[0:], 1)
req.c2hData = buf
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// identifyNSDescriptors returns namespace descriptor list for NSID=1.
func (c *Controller) identifyNSDescriptors(req *Request) error {
func (c *Controller) identifyNSDescriptors(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
buf := make([]byte, identifySize)
@ -247,7 +253,7 @@ func (c *Controller) identifyNSDescriptors(req *Request) error {
copy(buf[off:off+16], sub.NGUID[:])
req.c2hData = buf
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// copyPadded copies src into dst, padding remaining bytes with spaces.

89
weed/storage/blockvol/nvme/io.go

@ -1,11 +1,12 @@
package nvme
// handleRead processes an NVMe Read command.
func (c *Controller) handleRead(req *Request) error {
func (c *Controller) handleRead(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
dev := sub.Dev
@ -18,31 +19,37 @@ func (c *Controller) handleRead(req *Request) error {
nsze := dev.VolumeSize() / uint64(blockSize)
if lba+uint64(nlb) > nsze {
req.resp.Status = uint16(StatusLBAOutOfRange)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
data, err := dev.ReadAt(lba, totalBytes)
if err != nil {
req.resp.Status = uint16(mapBlockError(err))
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
req.c2hData = data
return c.sendC2HDataAndResponse(req)
c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData})
}
// handleWrite processes an NVMe Write command with inline or R2T data.
func (c *Controller) handleWrite(req *Request) error {
// By the time this handler runs, req.payload always contains the write data:
// either inline from the CapsuleCmd PDU, or collected via R2T by collectR2TData.
func (c *Controller) handleWrite(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
// Check ANA state (write-gating)
if !c.isWriteAllowed() {
req.resp.Status = uint16(StatusNSNotReady)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
dev := sub.Dev
@ -55,72 +62,63 @@ func (c *Controller) handleWrite(req *Request) error {
nsze := dev.VolumeSize() / uint64(blockSize)
if lba+uint64(nlb) > nsze {
req.resp.Status = uint16(StatusLBAOutOfRange)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
var writeData []byte
if len(req.payload) > 0 {
// Inline data path: data was in the CapsuleCmd PDU.
if uint32(len(req.payload)) != expectedBytes {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
}
writeData = req.payload
} else {
// R2T flow: send Ready-to-Transfer, then receive H2C Data PDUs.
if err := c.sendR2T(req.capsule.CID, 0, 0, expectedBytes); err != nil {
return err
}
data, err := c.recvH2CData(expectedBytes)
if err != nil {
return err
}
writeData = data
defer putBuffer(data)
if uint32(len(req.payload)) != expectedBytes {
req.resp.Status = uint16(StatusInvalidField)
c.enqueueResponse(&response{resp: req.resp})
return
}
throttleOnWALPressure(dev)
if err := writeWithRetry(dev, lba, writeData); err != nil {
if err := writeWithRetry(dev, lba, req.payload); err != nil {
req.resp.Status = uint16(mapBlockError(err))
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// handleFlush processes an NVMe Flush command.
func (c *Controller) handleFlush(req *Request) error {
func (c *Controller) handleFlush(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
if !c.isWriteAllowed() {
req.resp.Status = uint16(StatusNSNotReady)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
if err := sub.Dev.SyncCache(); err != nil {
req.resp.Status = uint16(mapBlockError(err))
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// handleWriteZeros processes an NVMe Write Zeroes command.
func (c *Controller) handleWriteZeros(req *Request) error {
func (c *Controller) handleWriteZeros(req *Request) {
sub := c.subsystem
if sub == nil {
req.resp.Status = uint16(StatusInvalidField)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
if !c.isWriteAllowed() {
req.resp.Status = uint16(StatusNSNotReady)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
dev := sub.Dev
@ -133,14 +131,16 @@ func (c *Controller) handleWriteZeros(req *Request) error {
nsze := dev.VolumeSize() / uint64(blockSize)
if lba+uint64(nlb) > nsze {
req.resp.Status = uint16(StatusLBAOutOfRange)
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
// D12 bit 25: DEALLOC — if set, use Trim instead of writing zeros
if req.capsule.D12&commandBitDeallocate != 0 {
if err := dev.Trim(lba, totalBytes); err != nil {
req.resp.Status = uint16(mapBlockError(err))
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
} else {
zeroBuf := getBuffer(int(totalBytes))
@ -152,11 +152,12 @@ func (c *Controller) handleWriteZeros(req *Request) error {
putBuffer(zeroBuf)
if err != nil {
req.resp.Status = uint16(mapBlockError(err))
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
return
}
}
return c.sendResponse(req)
c.enqueueResponse(&response{resp: req.resp})
}
// isWriteAllowed checks if the current ANA state allows writes.

120
weed/storage/blockvol/nvme/nvme_test.go

@ -736,10 +736,10 @@ func TestIdentify_Controller(t *testing.T) {
if subNQN != nqn {
t.Fatalf("SubNQN = %q, want %q", subNQN, nqn)
}
// IOCCSZ
// IOCCSZ: (64 + maxDataLen) / 16 = (64 + 32768) / 16 = 2052
ioccsz := binary.LittleEndian.Uint32(data[1792:])
if ioccsz != 4 {
t.Fatalf("IOCCSZ = %d, want 4", ioccsz)
if ioccsz != 2052 {
t.Fatalf("IOCCSZ = %d, want 2052", ioccsz)
}
// IORCSZ
iorcsz := binary.LittleEndian.Uint32(data[1796:])
@ -3439,3 +3439,117 @@ func TestWriteWithRetry_SharedTransientConcurrency(t *testing.T) {
}
}
}
// TestTxLoop_R2TDoneClosedOnError verifies that when txLoop has already
// encountered a write error, a subsequently enqueued R2T response has its
// r2tDone channel closed (not leaked), so collectR2TData does not deadlock.
func TestTxLoop_R2TDoneClosedOnError(t *testing.T) {
done := make(chan struct{})
r2tDone := make(chan struct{})
resp := &response{
r2t: &R2THeader{CCCID: 1, DATAL: 4096},
r2tDone: r2tDone,
}
// completeWaiters must close r2tDone — this is what txLoop calls
// when it discards a response after a write error.
completeWaiters(resp)
// r2tDone must be closed and selectable without blocking.
select {
case <-r2tDone:
// OK — channel was closed.
case <-time.After(time.Second):
t.Fatal("r2tDone was not closed by completeWaiters — would deadlock collectR2TData")
}
_ = done
}
// TestTxLoop_R2TDoneClosedOnError_Integration tests the full txLoop path:
// a prior write error causes a queued R2T to be discarded with r2tDone closed.
func TestTxLoop_R2TDoneClosedOnError_Integration(t *testing.T) {
// Create a controller with a broken writer (always errors).
failWriter := &failingWriter{}
serverConn, clientConn := net.Pipe()
defer serverConn.Close()
defer clientConn.Close()
c := &Controller{
conn: serverConn,
out: NewWriter(failWriter),
respCh: make(chan *response, 16),
done: make(chan struct{}),
}
// Enqueue a normal response that will fail to write.
c.respCh <- &response{resp: CapsuleResponse{CID: 1}}
// Enqueue an R2T with a done channel.
r2tDone := make(chan struct{})
c.respCh <- &response{
r2t: &R2THeader{CCCID: 2, DATAL: 4096},
r2tDone: r2tDone,
}
// Signal txLoop to exit after draining.
close(c.done)
// Run txLoop — it should exit without hanging.
errCh := make(chan error, 1)
go func() {
errCh <- c.txLoop()
}()
select {
case <-errCh:
// txLoop exited.
case <-time.After(3 * time.Second):
t.Fatal("txLoop hung — r2tDone was not closed on discarded R2T")
}
// Verify r2tDone was closed.
select {
case <-r2tDone:
// OK
default:
t.Fatal("r2tDone was not closed")
}
}
// failingWriter is an io.Writer that always returns an error.
type failingWriter struct{}
func (f *failingWriter) Write(p []byte) (int, error) {
return 0, errors.New("injected write failure")
}
// TestShutdown_ReleasePendingCapsuleBuffers verifies that shutdown() releases
// pooled payload buffers from interleaved capsules that were never dispatched.
func TestShutdown_ReleasePendingCapsuleBuffers(t *testing.T) {
c := &Controller{
conn: &net.TCPConn{}, // dummy, Close() will fail but closeOnce still runs
}
// Simulate buffered interleaved capsules with pooled payloads.
buf1 := getBuffer(4096)
buf2 := getBuffer(65536)
c.pendingCapsules = []*Request{
{payload: buf1},
{payload: buf2},
{payload: nil}, // no payload — should not panic
}
c.shutdown()
// After shutdown, pendingCapsules should be nil and payloads released.
if c.pendingCapsules != nil {
t.Fatal("pendingCapsules not cleared after shutdown")
}
}
// TestCompleteWaiters_NilR2TDone verifies completeWaiters does not panic
// when r2tDone is nil (standard non-R2T response).
func TestCompleteWaiters_NilR2TDone(t *testing.T) {
resp := &response{resp: CapsuleResponse{CID: 1}}
completeWaiters(resp) // must not panic
}

11
weed/storage/blockvol/nvme/wire.go

@ -9,7 +9,7 @@ import (
// ---------- Reader ----------
// Reader decodes NVMe/TCP PDUs from a stream.
// Reader decodes NVMe/TCP PDUs from a buffered stream.
//
// Usage:
//
@ -26,9 +26,14 @@ type Reader struct {
padBuf [maxHeaderSize]byte // reuse for padding skip
}
// NewReader wraps an io.Reader for NVMe/TCP PDU decoding.
// NewReader wraps an io.Reader with a default-sized bufio.Reader.
func NewReader(r io.Reader) *Reader {
return &Reader{rd: r}
return &Reader{rd: bufio.NewReader(r)}
}
// NewReaderSize wraps an io.Reader with a specified buffer size.
func NewReaderSize(r io.Reader, size int) *Reader {
return &Reader{rd: bufio.NewReaderSize(r, size)}
}
// Dequeue reads the 8-byte CommonHeader, validates bounds, and returns it.

26
weed/storage/blockvol/testrunner/actions/bench.go

@ -225,13 +225,31 @@ type fioLatency struct {
Percentile map[string]float64 `json:"percentile"`
}
// ParseFioMetric extracts a named metric from fio JSON.
// ParseFioMetric extracts a named metric from fio JSON or returns a plain
// numeric value directly. This allows bench_compare to accept either raw fio
// JSON output or pre-aggregated scalar values (e.g. from bench_stats or
// phase repeat/aggregate).
//
// direction: "read", "write", or "" (auto-detect: use whichever has IOPS > 0).
// Supported metrics: "iops", "bw_bytes", "bw_mb", "lat_mean_us", "lat_p50_us", "lat_p99_us", "lat_p999_us"
func ParseFioMetric(jsonStr, metric, direction string) (float64, error) {
func ParseFioMetric(input, metric, direction string) (float64, error) {
// Try plain numeric value first (from aggregation or bench_stats).
trimmed := strings.TrimSpace(input)
if v, err := strconv.ParseFloat(trimmed, 64); err == nil {
return v, nil
}
// Try quoted numeric string (e.g. "15322.00").
if len(trimmed) >= 2 && trimmed[0] == '"' && trimmed[len(trimmed)-1] == '"' {
if v, err := strconv.ParseFloat(trimmed[1:len(trimmed)-1], 64); err == nil {
return v, nil
}
}
// Parse as fio JSON.
var output fioOutput
if err := json.Unmarshal([]byte(jsonStr), &output); err != nil {
return 0, fmt.Errorf("parse fio JSON: %w", err)
if err := json.Unmarshal([]byte(input), &output); err != nil {
return 0, fmt.Errorf("parse fio metric: input is neither a number nor valid fio JSON: %w", err)
}
if len(output.Jobs) == 0 {
return 0, fmt.Errorf("fio JSON has no jobs")

49
weed/storage/blockvol/testrunner/actions/bench_test.go

@ -183,8 +183,55 @@ func TestParseFioMetric_UnknownMetric(t *testing.T) {
}
}
func TestParseFioMetric_PlainNumber(t *testing.T) {
val, err := ParseFioMetric("15322.00", "iops", "")
if err != nil {
t.Fatalf("parse plain number: %v", err)
}
if val != 15322.00 {
t.Fatalf("got %f, want 15322.00", val)
}
}
func TestParseFioMetric_PlainInteger(t *testing.T) {
val, err := ParseFioMetric("36853", "iops", "")
if err != nil {
t.Fatalf("parse plain integer: %v", err)
}
if val != 36853 {
t.Fatalf("got %f, want 36853", val)
}
}
func TestParseFioMetric_QuotedNumber(t *testing.T) {
val, err := ParseFioMetric(`"15322.00"`, "iops", "")
if err != nil {
t.Fatalf("parse quoted number: %v", err)
}
if val != 15322.00 {
t.Fatalf("got %f, want 15322.00", val)
}
}
func TestParseFioMetric_NumberWithWhitespace(t *testing.T) {
val, err := ParseFioMetric(" 42000.50 \n", "iops", "")
if err != nil {
t.Fatalf("parse number with whitespace: %v", err)
}
if val != 42000.50 {
t.Fatalf("got %f, want 42000.50", val)
}
}
func TestParseFioMetric_InvalidInput(t *testing.T) {
_, err := ParseFioMetric("not a number or json", "iops", "")
if err == nil {
t.Fatal("expected error for invalid input")
}
}
func TestParseFioMetric_InvalidJSON(t *testing.T) {
_, err := ParseFioMetric("not json", "iops", "")
_, err := ParseFioMetric("{invalid json}", "iops", "")
if err == nil {
t.Fatal("expected error for invalid JSON")
}

10
weed/storage/blockvol/testrunner/actions/devops_test.go

@ -80,8 +80,8 @@ func TestAllActions_Registration(t *testing.T) {
if n := len(byTier[tr.TierCore]); n != 11 {
t.Errorf("core: %d, want 11", n)
}
if n := len(byTier[tr.TierBlock]); n != 52 {
t.Errorf("block: %d, want 52", n)
if n := len(byTier[tr.TierBlock]); n != 55 {
t.Errorf("block: %d, want 55", n)
}
if n := len(byTier[tr.TierDevOps]); n != 7 {
t.Errorf("devops: %d, want 7", n)
@ -93,13 +93,13 @@ func TestAllActions_Registration(t *testing.T) {
t.Errorf("k8s: %d, want 14", n)
}
// Total should be 89 (85 existing + 3 pgbench + 1 bench_stats).
// Total should be 92 (89 existing + 3 profiling: pprof_capture, vmstat_capture, iostat_capture).
total := 0
for _, actions := range byTier {
total += len(actions)
}
if total != 89 {
t.Errorf("total actions: %d, want 89", total)
if total != 92 {
t.Errorf("total actions: %d, want 92", total)
}
}

155
weed/storage/blockvol/testrunner/actions/metrics.go

@ -19,6 +19,9 @@ func RegisterMetricsActions(r *tr.Registry) {
r.RegisterFunc("assert_metric_gt", tr.TierBlock, assertMetricGT)
r.RegisterFunc("assert_metric_eq", tr.TierBlock, assertMetricEQ)
r.RegisterFunc("assert_metric_lt", tr.TierBlock, assertMetricLT)
r.RegisterFunc("pprof_capture", tr.TierBlock, pprofCapture)
r.RegisterFunc("vmstat_capture", tr.TierBlock, vmstatCapture)
r.RegisterFunc("iostat_capture", tr.TierBlock, iostatCapture)
}
// scrapeMetrics fetches /metrics from a target's admin port via SSH curl.
@ -153,6 +156,158 @@ func assertMetricLT(ctx context.Context, actx *tr.ActionContext, act tr.Action)
return map[string]string{"value": strconv.FormatFloat(val, 'g', -1, 64)}, nil
}
// pprofCapture fetches a Go pprof profile from a target's admin port and saves
// it to a file on the target node. The profile is fetched via SSH curl to the
// admin server's /debug/pprof/ endpoint.
//
// Params:
// - target (required): target name
// - profile: pprof profile type (default: "profile" = CPU)
// Supported: "profile" (CPU), "heap", "allocs", "block", "mutex", "goroutine"
// - seconds: duration for CPU profile (default: "30")
// - output_dir: directory to save profile on target node (default: "/tmp/pprof")
// - label: filename label (default: profile type)
//
// Returns: value = remote file path of saved profile
func pprofCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
tgt, err := getHATarget(actx, act.Target)
if err != nil {
return nil, err
}
profile := paramDefault(act.Params, "profile", "profile")
seconds := paramDefault(act.Params, "seconds", "30")
outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof")
label := paramDefault(act.Params, "label", profile)
// Validate profile type.
switch profile {
case "profile", "heap", "allocs", "block", "mutex", "goroutine":
// OK
default:
return nil, fmt.Errorf("pprof_capture: unsupported profile type %q", profile)
}
// Create output directory.
if _, _, _, err := tgt.Node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil {
return nil, fmt.Errorf("pprof_capture: mkdir: %w", err)
}
outFile := fmt.Sprintf("%s/%s.pb.gz", outputDir, label)
// Build URL. CPU profile uses ?seconds= parameter.
url := fmt.Sprintf("http://127.0.0.1:%d/debug/pprof/%s", tgt.AdminPort, profile)
if profile == "profile" {
url += "?seconds=" + seconds
}
actx.Log(" pprof %s → %s (%ss)", profile, outFile, seconds)
cmd := fmt.Sprintf("curl -s -o %s '%s'", outFile, url)
// CPU profile can take a while — extend timeout.
_, stderr, code, err := tgt.Node.Run(ctx, cmd)
if err != nil || code != 0 {
return nil, fmt.Errorf("pprof_capture: curl failed: code=%d stderr=%s err=%v", code, stderr, err)
}
return map[string]string{"value": outFile}, nil
}
// vmstatCapture runs vmstat on a node for a duration and saves the output.
// Params:
// - node (required): node to run on
// - seconds: duration (default: "30")
// - interval: vmstat interval (default: "1")
// - output_dir: directory for output (default: "/tmp/pprof")
// - label: filename label (default: "vmstat")
//
// Returns: value = remote file path
func vmstatCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
node, err := getNode(actx, act.Node)
if err != nil {
return nil, err
}
seconds := paramDefault(act.Params, "seconds", "30")
interval := paramDefault(act.Params, "interval", "1")
outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof")
label := paramDefault(act.Params, "label", "vmstat")
if _, _, _, err := node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil {
return nil, fmt.Errorf("vmstat_capture: mkdir: %w", err)
}
outFile := fmt.Sprintf("%s/%s.txt", outputDir, label)
// vmstat <interval> <count>
count, _ := strconv.Atoi(seconds)
intv, _ := strconv.Atoi(interval)
if intv < 1 {
intv = 1
}
iterations := count / intv
if iterations < 1 {
iterations = 1
}
cmd := fmt.Sprintf("vmstat %s %d > %s 2>&1", interval, iterations, outFile)
actx.Log(" vmstat %s×%d → %s", interval, iterations, outFile)
_, _, code, err := node.Run(ctx, cmd)
if err != nil || code != 0 {
return nil, fmt.Errorf("vmstat_capture: code=%d err=%v", code, err)
}
return map[string]string{"value": outFile}, nil
}
// iostatCapture runs iostat -x on a node for a duration and saves the output.
// Params:
// - node (required): node to run on
// - seconds: duration (default: "30")
// - interval: iostat interval (default: "1")
// - output_dir: directory for output (default: "/tmp/pprof")
// - label: filename label (default: "iostat")
//
// Returns: value = remote file path
func iostatCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
node, err := getNode(actx, act.Node)
if err != nil {
return nil, err
}
seconds := paramDefault(act.Params, "seconds", "30")
interval := paramDefault(act.Params, "interval", "1")
outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof")
label := paramDefault(act.Params, "label", "iostat")
if _, _, _, err := node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil {
return nil, fmt.Errorf("iostat_capture: mkdir: %w", err)
}
outFile := fmt.Sprintf("%s/%s.txt", outputDir, label)
count, _ := strconv.Atoi(seconds)
intv, _ := strconv.Atoi(interval)
if intv < 1 {
intv = 1
}
iterations := count / intv
if iterations < 1 {
iterations = 1
}
cmd := fmt.Sprintf("iostat -x %s %d > %s 2>&1", interval, iterations, outFile)
actx.Log(" iostat -x %s×%d → %s", interval, iterations, outFile)
_, _, code, err := node.Run(ctx, cmd)
if err != nil || code != 0 {
return nil, fmt.Errorf("iostat_capture: code=%d err=%v", code, err)
}
return map[string]string{"value": outFile}, nil
}
// collectArtifactsAction explicitly collects artifacts from targets.
func collectArtifactsAction(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) {
dir := act.Params["dir"]

6
weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml

@ -16,9 +16,9 @@ targets:
node: server
vol_size: "1073741824"
wal_size: "536870912"
iscsi_port: 3263
nvme_port: 4420
admin_port: 8083
iscsi_port: 3270
nvme_port: 4430
admin_port: 8090
iqn_suffix: "bench-25g"
nqn_suffix: "bench-25g"

Loading…
Cancel
Save