From 4c5f9f2b9d08f6dff9ab025c4562a598990b6d0f Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Tue, 10 Mar 2026 15:09:41 -0700 Subject: [PATCH] feat: CP10B-1 NVMe/TCP RX/TX split + CP10B-2 bench/profiling fixes RX/TX split: rxLoop reads PDUs, txLoop writes responses via respCh. Handlers refactored to void + enqueueResponse pattern. IOCCSZ fix enables inline write data (100K IOPS vs 15K before). R2T deadlock fix via completeWaiters. Shutdown cleans up pendingCapsules buffers. Bench: ParseFioMetric accepts plain/quoted numbers for aggregated medians. Profiling actions: pprof_capture, vmstat_capture, iostat_capture. 196 NVMe tests, 92 testrunner actions. Co-Authored-By: Claude Opus 4.6 --- weed/storage/blockvol/nvme/admin.go | 42 +- weed/storage/blockvol/nvme/controller.go | 457 ++++++++++++------ weed/storage/blockvol/nvme/fabric.go | 56 ++- weed/storage/blockvol/nvme/identify.go | 44 +- weed/storage/blockvol/nvme/io.go | 89 ++-- weed/storage/blockvol/nvme/nvme_test.go | 120 ++++- weed/storage/blockvol/nvme/wire.go | 11 +- .../blockvol/testrunner/actions/bench.go | 26 +- .../blockvol/testrunner/actions/bench_test.go | 49 +- .../testrunner/actions/devops_test.go | 10 +- .../blockvol/testrunner/actions/metrics.go | 155 ++++++ .../testrunner/scenarios/cp103-25g-ab.yaml | 6 +- 12 files changed, 792 insertions(+), 273 deletions(-) diff --git a/weed/storage/blockvol/nvme/admin.go b/weed/storage/blockvol/nvme/admin.go index 82505e31a..a58fa5fa7 100644 --- a/weed/storage/blockvol/nvme/admin.go +++ b/weed/storage/blockvol/nvme/admin.go @@ -5,7 +5,7 @@ import ( ) // handleSetFeatures processes SetFeatures admin commands. -func (c *Controller) handleSetFeatures(req *Request) error { +func (c *Controller) handleSetFeatures(req *Request) { fid := uint8(req.capsule.D10 & 0xFF) switch fid { @@ -31,25 +31,23 @@ func (c *Controller) handleSetFeatures(req *Request) error { // Response DW0: (NCQR-1) | ((NSQR-1) << 16) req.resp.DW0 = uint32(ncqr-1) | (uint32(nsqr-1) << 16) - return c.sendResponse(req) case fidKeepAliveTimer: // D11 contains KATO in milliseconds c.katoMs = req.capsule.D11 - return c.sendResponse(req) case fidAsyncEventConfig: // Stub: accept but don't deliver events - return c.sendResponse(req) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) } + + c.enqueueResponse(&response{resp: req.resp}) } // handleGetFeatures returns stored feature values. -func (c *Controller) handleGetFeatures(req *Request) error { +func (c *Controller) handleGetFeatures(req *Request) { fid := uint8(req.capsule.D10 & 0xFF) switch fid { @@ -59,24 +57,22 @@ func (c *Controller) handleGetFeatures(req *Request) error { n = c.maxIOQueues } req.resp.DW0 = uint32(n-1) | (uint32(n-1) << 16) - return c.sendResponse(req) case fidKeepAliveTimer: req.resp.DW0 = c.katoMs - return c.sendResponse(req) case fidAsyncEventConfig: req.resp.DW0 = 0 - return c.sendResponse(req) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) } + + c.enqueueResponse(&response{resp: req.resp}) } // handleGetLogPage returns log page data. -func (c *Controller) handleGetLogPage(req *Request) error { +func (c *Controller) handleGetLogPage(req *Request) { // D10 bits 7:0 = Log Page Identifier // D10 bits 27:16 and D11 bits 15:0 = Number of Dwords (NUMD) lid := uint8(req.capsule.D10 & 0xFF) @@ -87,28 +83,28 @@ func (c *Controller) handleGetLogPage(req *Request) error { switch lid { case logPageError: - return c.logPageError(req, length) + c.logPageError(req, length) case logPageSMART: - return c.logPageSMART(req, length) + c.logPageSMART(req, length) case logPageANA: - return c.logPageANA(req, length) + c.logPageANA(req, length) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } } // logPageError returns an empty error log page. -func (c *Controller) logPageError(req *Request, length uint32) error { +func (c *Controller) logPageError(req *Request, length uint32) { if length > 64 { length = 64 } req.c2hData = make([]byte, length) - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // logPageSMART returns a 512-byte SMART/Health log. -func (c *Controller) logPageSMART(req *Request, length uint32) error { +func (c *Controller) logPageSMART(req *Request, length uint32) { if length > 512 { length = 512 } @@ -130,11 +126,11 @@ func (c *Controller) logPageSMART(req *Request, length uint32) error { buf[5] = 0 req.c2hData = buf[:length] - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // logPageANA returns the ANA log page with a single group. -func (c *Controller) logPageANA(req *Request, length uint32) error { +func (c *Controller) logPageANA(req *Request, length uint32) { // ANA log page format (32 bytes for single group): // [0:8] CHGCNT (uint64) // [8:10] NGRPS = 1 (uint16) @@ -167,7 +163,7 @@ func (c *Controller) logPageANA(req *Request, length uint32) error { length = anaLogSize } req.c2hData = buf[:length] - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // anaState returns the current ANA state based on the subsystem's device. @@ -192,7 +188,7 @@ func (c *Controller) anaChangeCount() uint64 { } // handleKeepAlive resets the KATO timer and returns success. -func (c *Controller) handleKeepAlive(req *Request) error { +func (c *Controller) handleKeepAlive(req *Request) { c.resetKATO() - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } diff --git a/weed/storage/blockvol/nvme/controller.go b/weed/storage/blockvol/nvme/controller.go index bb5b5eb6a..c3e2e9b31 100644 --- a/weed/storage/blockvol/nvme/controller.go +++ b/weed/storage/blockvol/nvme/controller.go @@ -1,6 +1,7 @@ package nvme import ( + "errors" "fmt" "io" "log" @@ -10,6 +11,10 @@ import ( "time" ) +// errDisconnect is a sentinel returned by handleDisconnect to signal +// the rxLoop to exit gracefully after enqueuing the disconnect response. +var errDisconnect = errors.New("disconnect") + // controllerState tracks the lifecycle of an NVMe controller session. type controllerState int @@ -31,7 +36,27 @@ type Request struct { status StatusWord } +// response represents a pending response to be written by the txLoop. +// Either a standard CapsuleResp (with optional C2H data), or an R2T PDU. +type response struct { + // Standard response fields + resp CapsuleResponse + c2hData []byte // non-nil for read responses + + // R2T variant (when r2t is non-nil, resp/c2hData are ignored) + r2t *R2THeader + r2tDone chan struct{} // closed after R2T is flushed to wire +} + // Controller handles one NVMe/TCP connection (one queue per connection). +// +// After the IC handshake, the connection is split into two goroutines: +// - rxLoop: reads PDUs from the wire, dispatches commands +// - txLoop: drains the response channel, writes responses to the wire +// +// IO commands are dispatched to goroutines for concurrent processing. +// Admin commands run synchronously in the rxLoop (infrequent, state-changing). +// All responses flow through respCh — only txLoop writes to c.out. type Controller struct { mu sync.Mutex @@ -45,8 +70,9 @@ type Controller struct { // Queue state (one queue per TCP connection) queueID uint16 queueSize uint16 - sqhd uint16 // Submission Queue Head pointer - flowCtlOff bool // CATTR bit2: SQ flow control disabled + rxSQHD uint16 // Submission Queue Head (updated by rxLoop only) + sqhdVal atomic.Uint32 // Latest SQHD for txLoop (lower 16 bits) + flowCtlOff bool // CATTR bit2: SQ flow control disabled // Controller identity cntlID uint16 @@ -63,9 +89,10 @@ type Controller struct { katoTimer *time.Timer katoMu sync.Mutex - // Async completion (IO queues) - waiting chan *Request // pre-allocated request pool - completions chan *Request // completed requests to send + // RX/TX split + respCh chan *response // responses from handlers → txLoop + done chan struct{} // closed when connection is shutting down + cmdWg sync.WaitGroup // tracks in-flight IO command goroutines // Backend subsystem *Subsystem @@ -78,11 +105,11 @@ type Controller struct { maxDataLen uint32 // C2H/H2C data chunk size (from Config) // Command interleaving: capsules received during R2T H2CData collection. - // Drained by Serve() before reading the next PDU from the wire. + // Drained by rxLoop before reading the next PDU from the wire. pendingCapsules []*Request // Lifecycle - wg sync.WaitGroup + wg sync.WaitGroup closeOnce sync.Once } @@ -94,7 +121,7 @@ func newController(conn net.Conn, server *Server) *Controller { } c := &Controller{ conn: conn, - in: NewReader(conn), + in: NewReaderSize(conn, int(maxData)+maxHeaderSize), out: NewWriterSize(conn, int(maxData)+maxHeaderSize), state: stateConnected, server: server, @@ -108,14 +135,80 @@ func newController(conn net.Conn, server *Server) *Controller { } // Serve is the main event loop for this controller connection. +// +// Phase 1: IC handshake (synchronous, direct c.out access). +// Phase 2: Start txLoop goroutine, enter rxLoop. func (c *Controller) Serve() error { defer c.shutdown() - // IC handshake timeout + // Phase 1: IC handshake with timeout (synchronous). if err := c.conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { return err } + hdr, err := c.in.Dequeue() + if err != nil { + return fmt.Errorf("IC handshake: %w", err) + } + if hdr.Type != pduICReq { + return fmt.Errorf("expected ICReq (0x%x), got 0x%x", pduICReq, hdr.Type) + } + if err := c.handleIC(); err != nil { + return fmt.Errorf("IC handshake: %w", err) + } + if err := c.conn.SetReadDeadline(time.Time{}); err != nil { + return err + } + + // Phase 2: Start TX loop, enter RX loop. + c.respCh = make(chan *response, 128) + c.done = make(chan struct{}) + + txErrCh := make(chan error, 1) + go func() { + txErrCh <- c.txLoop() + }() + + rxErr := c.rxLoop() + + // Wait for in-flight IO command goroutines to finish and enqueue responses. + c.cmdWg.Wait() + // Signal txLoop to drain remaining responses and exit. + close(c.done) + txErr := <-txErrCh + + // errDisconnect is a graceful exit, not an error. + if errors.Is(rxErr, errDisconnect) { + rxErr = nil + } + if rxErr != nil { + return rxErr + } + return txErr +} + +// handleIC processes the IC handshake. +func (c *Controller) handleIC() error { + var req ICRequest + if err := c.in.Receive(&req); err != nil { + return err + } + + resp := ICResponse{ + PDUFormatVersion: 0, + MaxH2CDataLength: c.maxDataLen, + } + if err := c.out.SendHeaderOnly(pduICResp, &resp, icBodySize); err != nil { + return err + } + + c.state = stateICComplete + return nil +} + +// rxLoop reads PDUs from the wire and dispatches commands. +// Admin commands (QID=0) run synchronously. IO commands dispatch to goroutines. +func (c *Controller) rxLoop() error { for { if c.closed.Load() { return nil @@ -125,8 +218,9 @@ func (c *Controller) Serve() error { for len(c.pendingCapsules) > 0 { req := c.pendingCapsules[0] c.pendingCapsules = c.pendingCapsules[1:] - if err := c.dispatchPending(req); err != nil { - return fmt.Errorf("pending capsule: %w", err) + c.advanceSQHD() + if err := c.dispatchFromRx(req); err != nil { + return err } } @@ -139,27 +233,24 @@ func (c *Controller) Serve() error { } switch hdr.Type { - case pduICReq: - if err := c.handleIC(); err != nil { - return fmt.Errorf("IC handshake: %w", err) - } - // Clear read deadline after successful IC - if err := c.conn.SetReadDeadline(time.Time{}); err != nil { - return err - } - case pduCapsuleCmd: - if err := c.handleCapsule(); err != nil { + if c.state < stateICComplete { + return fmt.Errorf("capsule command before IC handshake") + } + req, err := c.parseCapsule() + if err != nil { return fmt.Errorf("capsule: %w", err) } + c.advanceSQHD() + if err := c.dispatchFromRx(req); err != nil { + return err + } case pduH2CData: - // H2CData PDUs are only expected after R2T, handled inline - // by recvH2CData. If we see one here, it's unexpected. return fmt.Errorf("unexpected H2CData PDU outside R2T flow") case pduH2CTermReq: - return nil // host terminated + return nil default: return fmt.Errorf("unexpected PDU type: 0x%x", hdr.Type) @@ -167,85 +258,145 @@ func (c *Controller) Serve() error { } } -// handleIC processes the IC handshake. -func (c *Controller) handleIC() error { - var req ICRequest - if err := c.in.Receive(&req); err != nil { - return err - } - - resp := ICResponse{ - PDUFormatVersion: 0, - MaxH2CDataLength: c.maxDataLen, - } - if err := c.out.SendHeaderOnly(pduICResp, &resp, icBodySize); err != nil { - return err +// txLoop drains the response channel and writes responses to the wire. +// Only txLoop touches c.out after the IC handshake. +func (c *Controller) txLoop() error { + var firstErr error + for { + select { + case resp := <-c.respCh: + if firstErr != nil { + completeWaiters(resp) + continue // discard — connection already failed + } + if err := c.writeResponse(resp); err != nil { + firstErr = err + c.conn.Close() // force rxLoop EOF + } + case <-c.done: + // Drain remaining responses. + for { + select { + case resp := <-c.respCh: + if firstErr == nil { + if err := c.writeResponse(resp); err != nil { + firstErr = err + } + } else { + completeWaiters(resp) + } + default: + return firstErr + } + } + } } - - c.state = stateICComplete - return nil } -// handleCapsule dispatches a CapsuleCmd PDU. -func (c *Controller) handleCapsule() error { - // Reject capsule commands before IC handshake is complete. - if c.state < stateICComplete { - return fmt.Errorf("capsule command before IC handshake") - } - +// parseCapsule reads a CapsuleCmd PDU (specific header + optional inline data). +func (c *Controller) parseCapsule() (*Request, error) { var capsule CapsuleCommand if err := c.in.Receive(&capsule); err != nil { - return err + return nil, err } - // Read optional inline data var payload []byte if dataLen := c.in.Length(); dataLen > 0 { payload = getBuffer(int(dataLen)) if err := c.in.ReceiveData(payload); err != nil { putBuffer(payload) - return err + return nil, err } } - // Advance SQHD - c.sqhd++ - if c.sqhd >= c.queueSize && c.queueSize > 0 { - c.sqhd = 0 - } - req := &Request{ capsule: capsule, payload: payload, } req.resp.CID = capsule.CID req.resp.QueueID = c.queueID - // SQHD is set in sendResponse/sendC2HDataAndResponse using the - // latest c.flowCtlOff value, so Connect responses correctly get - // SQHD=0xFFFF when the host requests flowCtlOff via CATTR. req.resp.Status = uint16(StatusSuccess) + return req, nil +} + +// advanceSQHD increments the submission queue head pointer (rxLoop only). +func (c *Controller) advanceSQHD() { + sqhd := c.rxSQHD + 1 + if sqhd >= c.queueSize && c.queueSize > 0 { + sqhd = 0 + } + c.rxSQHD = sqhd + c.sqhdVal.Store(uint32(sqhd)) +} +// dispatchFromRx routes a parsed capsule to the appropriate handler. +// Admin queue: synchronous in rxLoop. IO queue: concurrent goroutine. +func (c *Controller) dispatchFromRx(req *Request) error { if c.queueID == 0 { return c.dispatchAdmin(req) } - return c.dispatchIO(req) + + // For Write commands without inline data, collect R2T data in rxLoop + // before dispatching to the goroutine. + if req.capsule.OpCode == ioWrite && len(req.payload) == 0 { + if err := c.collectR2TData(req); err != nil { + return err + } + } + + c.cmdWg.Add(1) + go func() { + defer c.cmdWg.Done() + c.dispatchIO(req) + }() + return nil } -// dispatchPending processes a capsule that was buffered during R2T data -// collection. The capsule and payload are already fully read — only -// SQHD advance and command dispatch remain. -func (c *Controller) dispatchPending(req *Request) error { - c.sqhd++ - if c.sqhd >= c.queueSize && c.queueSize > 0 { - c.sqhd = 0 +// collectR2TData handles the R2T/H2C flow in the rxLoop for Write commands +// that don't carry inline data. Sends R2T through the txLoop, then reads +// H2C Data PDUs inline. Sets req.payload with the collected data. +func (c *Controller) collectR2TData(req *Request) error { + sub := c.subsystem + if sub == nil { + // Let handleWrite deal with the nil subsystem. + return nil } - if c.queueID == 0 { - return c.dispatchAdmin(req) + + nlb := req.capsule.LbaLength() + expectedBytes := uint32(nlb) * sub.Dev.BlockSize() + + // Send R2T through txLoop and wait for it to be flushed. + r2tDone := make(chan struct{}) + select { + case c.respCh <- &response{ + r2t: &R2THeader{ + CCCID: req.capsule.CID, + TAG: 0, + DATAO: 0, + DATAL: expectedBytes, + }, + r2tDone: r2tDone, + }: + case <-c.done: + return nil + } + + select { + case <-r2tDone: + case <-c.done: + return nil } - return c.dispatchIO(req) + + // Read H2C Data PDUs from the wire. + data, err := c.recvH2CData(expectedBytes) + if err != nil { + return err + } + req.payload = data + return nil } -// dispatchAdmin handles admin queue commands synchronously. +// dispatchAdmin handles admin queue commands synchronously in the rxLoop. func (c *Controller) dispatchAdmin(req *Request) error { defer func() { if req.payload != nil { @@ -261,26 +412,26 @@ func (c *Controller) dispatchAdmin(req *Request) error { switch capsule.OpCode { case adminIdentify: - return c.handleIdentify(req) + c.handleIdentify(req) case adminSetFeatures: - return c.handleSetFeatures(req) + c.handleSetFeatures(req) case adminGetFeatures: - return c.handleGetFeatures(req) + c.handleGetFeatures(req) case adminGetLogPage: - return c.handleGetLogPage(req) + c.handleGetLogPage(req) case adminKeepAlive: - return c.handleKeepAlive(req) + c.handleKeepAlive(req) case adminAsyncEvent: - // Stub: just succeed (don't deliver events in CP10-1) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) default: req.resp.Status = uint16(StatusInvalidOpcode) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } + return nil } -// dispatchIO handles IO queue commands. -func (c *Controller) dispatchIO(req *Request) error { +// dispatchIO handles IO queue commands (runs in a goroutine). +func (c *Controller) dispatchIO(req *Request) { defer func() { if req.payload != nil { putBuffer(req.payload) @@ -291,95 +442,101 @@ func (c *Controller) dispatchIO(req *Request) error { switch capsule.OpCode { case ioRead: - return c.handleRead(req) + c.handleRead(req) case ioWrite: - return c.handleWrite(req) + c.handleWrite(req) case ioFlush: - return c.handleFlush(req) + c.handleFlush(req) case ioWriteZeros: - return c.handleWriteZeros(req) + c.handleWriteZeros(req) default: req.resp.Status = uint16(StatusInvalidOpcode) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } } -// sendC2HDataAndResponse sends C2HData PDUs followed by a CapsuleResp. -// All chunks and the final response are batched in the bufio buffer, -// then flushed to the wire in a single FlushBuf() call. -func (c *Controller) sendC2HDataAndResponse(req *Request) error { - if len(req.c2hData) > 0 { - data := req.c2hData - offset := uint32(0) - chunkSize := c.maxDataLen - - for offset < uint32(len(data)) { - end := offset + chunkSize - if end > uint32(len(data)) { - end = uint32(len(data)) - } - chunk := data[offset:end] - - hdr := C2HDataHeader{ - CCCID: req.capsule.CID, - DATAO: offset, - DATAL: uint32(len(chunk)), - } +// completeWaiters closes any synchronization channels on a discarded response. +// Must be called when txLoop drops a response after a write error, so that +// senders blocked on r2tDone (e.g. collectR2TData) are unblocked. +func completeWaiters(resp *response) { + if resp.r2tDone != nil { + close(resp.r2tDone) + } +} - flags := uint8(0) - if end >= uint32(len(data)) { - flags = c2hFlagLast - } +// enqueueResponse sends a response to the txLoop for writing. +// Safe to call from any goroutine. If the connection is shutting down, +// the response is silently discarded. +func (c *Controller) enqueueResponse(resp *response) { + select { + case c.respCh <- resp: + case <-c.done: + } +} - if err := c.out.writeHeaderAndData(pduC2HData, flags, &hdr, c2hDataHdrSize, chunk); err != nil { - return err - } - offset = end +// writeResponse writes a single response to the wire (txLoop only). +func (c *Controller) writeResponse(resp *response) error { + // R2T variant: write R2T header and signal caller. + if resp.r2t != nil { + err := c.out.SendHeaderOnly(pduR2T, resp.r2t, r2tHdrSize) + if resp.r2tDone != nil { + close(resp.r2tDone) } + return err } - // Write CapsuleResp to bufio buffer + // Standard response: set SQHD from latest value. if c.flowCtlOff { - req.resp.SQHD = 0xFFFF + resp.resp.SQHD = 0xFFFF } else { - req.resp.SQHD = c.sqhd + resp.resp.SQHD = uint16(c.sqhdVal.Load()) } c.resetKATO() - if err := c.out.writeHeaderAndData(pduCapsuleResp, 0, &req.resp, capsuleRespSize, nil); err != nil { - return err - } - // Single flush: all C2H chunks + CapsuleResp in one syscall - return c.out.FlushBuf() -} - -// sendResponse sends a CapsuleResp PDU. -// SQHD is set here (not in handleCapsule) so that flowCtlOff changes -// made during command dispatch (e.g. Fabric Connect) take effect -// on the same response. -func (c *Controller) sendResponse(req *Request) error { - if c.flowCtlOff { - req.resp.SQHD = 0xFFFF - } else { - req.resp.SQHD = c.sqhd + if len(resp.c2hData) > 0 { + return c.writeC2HAndResp(resp) } - c.resetKATO() - return c.out.SendHeaderOnly(pduCapsuleResp, &req.resp, capsuleRespSize) + return c.out.SendHeaderOnly(pduCapsuleResp, &resp.resp, capsuleRespSize) } -// ---------- R2T / H2C Data ---------- +// writeC2HAndResp writes C2H data chunks followed by CapsuleResp, batched. +func (c *Controller) writeC2HAndResp(resp *response) error { + data := resp.c2hData + offset := uint32(0) + chunkSize := c.maxDataLen + + for offset < uint32(len(data)) { + end := offset + chunkSize + if end > uint32(len(data)) { + end = uint32(len(data)) + } + chunk := data[offset:end] + + hdr := C2HDataHeader{ + CCCID: resp.resp.CID, + DATAO: offset, + DATAL: uint32(len(chunk)), + } + + flags := uint8(0) + if end >= uint32(len(data)) { + flags = c2hFlagLast + } -// sendR2T sends a Ready-to-Transfer PDU requesting data from the host. -func (c *Controller) sendR2T(cid uint16, tag uint16, offset, length uint32) error { - r2t := R2THeader{ - CCCID: cid, - TAG: tag, - DATAO: offset, - DATAL: length, + if err := c.out.writeHeaderAndData(pduC2HData, flags, &hdr, c2hDataHdrSize, chunk); err != nil { + return err + } + offset = end } - return c.out.SendHeaderOnly(pduR2T, &r2t, r2tHdrSize) + + if err := c.out.writeHeaderAndData(pduCapsuleResp, 0, &resp.resp, capsuleRespSize, nil); err != nil { + return err + } + return c.out.FlushBuf() } +// ---------- R2T / H2C Data ---------- + // recvH2CData reads H2CData PDU(s) from the wire and returns the accumulated data. // Reads exactly `totalBytes` of data, potentially across multiple H2C PDUs. // @@ -511,6 +668,16 @@ func (c *Controller) shutdown() { c.stopKATO() c.state = stateClosed c.conn.Close() + + // Release pooled buffers from interleaved capsules that were never dispatched. + for _, req := range c.pendingCapsules { + if req.payload != nil { + putBuffer(req.payload) + req.payload = nil + } + } + c.pendingCapsules = nil + if c.server != nil { if c.isAdmin && c.cntlID != 0 { c.server.unregisterAdmin(c.cntlID) diff --git a/weed/storage/blockvol/nvme/fabric.go b/weed/storage/blockvol/nvme/fabric.go index 373aaf4d9..99a376eca 100644 --- a/weed/storage/blockvol/nvme/fabric.go +++ b/weed/storage/blockvol/nvme/fabric.go @@ -5,24 +5,29 @@ import ( ) // handleFabricCommand dispatches Fabric-specific commands by FCType. +// Returns errDisconnect on Disconnect to signal the rxLoop to exit gracefully. func (c *Controller) handleFabricCommand(req *Request) error { switch req.capsule.FCType { case fcConnect: - return c.handleConnect(req) + c.handleConnect(req) + return nil case fcPropertyGet: - return c.handlePropertyGet(req) + c.handlePropertyGet(req) + return nil case fcPropertySet: - return c.handlePropertySet(req) + c.handlePropertySet(req) + return nil case fcDisconnect: return c.handleDisconnect(req) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return nil } } // handleConnect processes a Fabric Connect command. -func (c *Controller) handleConnect(req *Request) error { +func (c *Controller) handleConnect(req *Request) { capsule := &req.capsule // Parse QueueID, QueueSize, KATO, CATTR from capsule dwords. @@ -38,7 +43,8 @@ func (c *Controller) handleConnect(req *Request) error { // Parse ConnectData from payload if len(req.payload) < connectDataSize { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } var cd ConnectData cd.Unmarshal(req.payload) @@ -48,7 +54,8 @@ func (c *Controller) handleConnect(req *Request) error { sub := c.server.findSubsystem(cd.SubNQN) if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } c.subsystem = sub @@ -76,7 +83,8 @@ func (c *Controller) handleConnect(req *Request) error { // Return CNTLID in DW0 req.resp.DW0 = uint32(c.cntlID) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } // IO queue connect — look up admin session from server registry. @@ -85,17 +93,20 @@ func (c *Controller) handleConnect(req *Request) error { admin := c.server.lookupAdmin(cd.CNTLID) if admin == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } // Validate SubNQN and HostNQN match the admin session. if cd.SubNQN != admin.subNQN { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } if cd.HostNQN != admin.hostNQN { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } c.cntlID = cd.CNTLID @@ -107,11 +118,11 @@ func (c *Controller) handleConnect(req *Request) error { c.state = stateIOActive req.resp.DW0 = uint32(c.cntlID) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // handlePropertyGet returns a controller register value. -func (c *Controller) handlePropertyGet(req *Request) error { +func (c *Controller) handlePropertyGet(req *Request) { // Per NVMe-oF spec: CDW10 bits 2:0 = ATTRIB (size), CDW11 = OFST (offset) size8 := (req.capsule.D10 & 1) != 0 offset := req.capsule.D11 @@ -128,7 +139,8 @@ func (c *Controller) handlePropertyGet(req *Request) error { val = uint64(c.regCSTS) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } if size8 { @@ -138,11 +150,11 @@ func (c *Controller) handlePropertyGet(req *Request) error { } else { req.resp.DW0 = uint32(val) } - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // handlePropertySet handles controller register writes. -func (c *Controller) handlePropertySet(req *Request) error { +func (c *Controller) handlePropertySet(req *Request) { // Per NVMe-oF spec: CDW10 = ATTRIB (size), CDW11 = OFST (offset), CDW12-CDW13 = VALUE offset := req.capsule.D11 value := uint64(req.capsule.D12) | uint64(req.capsule.D13)<<32 @@ -163,16 +175,14 @@ func (c *Controller) handlePropertySet(req *Request) error { default: // Ignore writes to other registers } - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // handleDisconnect processes a Fabric Disconnect. +// Enqueues the response and returns errDisconnect to signal rxLoop exit. func (c *Controller) handleDisconnect(req *Request) error { - if err := c.sendResponse(req); err != nil { - return err - } - c.shutdown() - return nil + c.enqueueResponse(&response{resp: req.resp}) + return errDisconnect } // ---------- Subsystem ---------- @@ -241,7 +251,7 @@ func propertySetValue(capsule *CapsuleCommand) uint64 { return uint64(capsule.D12) | uint64(capsule.D13)<<32 } -// propertyGetSize returns true if the PropertyGet requests an 8-byte value. +// propertyGetSize8 returns true if the PropertyGet requests an 8-byte value. func propertyGetSize8(capsule *CapsuleCommand) bool { return (capsule.D10 & 1) != 0 } diff --git a/weed/storage/blockvol/nvme/identify.go b/weed/storage/blockvol/nvme/identify.go index cbe0f0950..619e24dd2 100644 --- a/weed/storage/blockvol/nvme/identify.go +++ b/weed/storage/blockvol/nvme/identify.go @@ -8,32 +8,33 @@ import ( const identifySize = 4096 // handleIdentify dispatches Identify commands by CNS type. -func (c *Controller) handleIdentify(req *Request) error { +func (c *Controller) handleIdentify(req *Request) { cns := uint8(req.capsule.D10 & 0xFF) switch cns { case cnsIdentifyController: - return c.identifyController(req) + c.identifyController(req) case cnsIdentifyNamespace: - return c.identifyNamespace(req) + c.identifyNamespace(req) case cnsActiveNSList: - return c.identifyActiveNSList(req) + c.identifyActiveNSList(req) case cnsNSDescriptorList: - return c.identifyNSDescriptors(req) + c.identifyNSDescriptors(req) default: req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } } // identifyController returns the 4096-byte Identify Controller data structure. -func (c *Controller) identifyController(req *Request) error { +func (c *Controller) identifyController(req *Request) { buf := make([]byte, identifySize) sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } // VID (PCI Vendor ID) - use 0 for software target @@ -135,8 +136,11 @@ func (c *Controller) identifyController(req *Request) error { copy(buf[768:1024], sub.NQN) // buf is already zeroed → NUL-terminated // IOCCSZ (I/O Queue Command Capsule Supported Size) - offset 1792-1795 - // In 16-byte units: 64/16 = 4 - binary.LittleEndian.PutUint32(buf[1792:], 4) + // In 16-byte units: (SQE + max inline data) / 16 + // Allows host to send write data inline in the command capsule, + // avoiding R2T round-trip for small writes (e.g. 4K). + ioccsz := (64 + c.maxDataLen) / 16 + binary.LittleEndian.PutUint32(buf[1792:], ioccsz) // IORCSZ (I/O Queue Response Capsule Supported Size) - offset 1796-1799 // In 16-byte units: 16/16 = 1 @@ -158,15 +162,16 @@ func (c *Controller) identifyController(req *Request) error { binary.LittleEndian.PutUint16(buf[1804:], 0x01) req.c2hData = buf - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // identifyNamespace returns the 4096-byte Identify Namespace data for NSID=1. -func (c *Controller) identifyNamespace(req *Request) error { +func (c *Controller) identifyNamespace(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } dev := sub.Dev @@ -214,25 +219,26 @@ func (c *Controller) identifyNamespace(req *Request) error { binary.LittleEndian.PutUint32(buf[92:], 1) req.c2hData = buf - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // identifyActiveNSList returns the list of active namespace IDs (just NSID=1). -func (c *Controller) identifyActiveNSList(req *Request) error { +func (c *Controller) identifyActiveNSList(req *Request) { buf := make([]byte, identifySize) // Single namespace: NSID=1 binary.LittleEndian.PutUint32(buf[0:], 1) req.c2hData = buf - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // identifyNSDescriptors returns namespace descriptor list for NSID=1. -func (c *Controller) identifyNSDescriptors(req *Request) error { +func (c *Controller) identifyNSDescriptors(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } buf := make([]byte, identifySize) @@ -247,7 +253,7 @@ func (c *Controller) identifyNSDescriptors(req *Request) error { copy(buf[off:off+16], sub.NGUID[:]) req.c2hData = buf - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // copyPadded copies src into dst, padding remaining bytes with spaces. diff --git a/weed/storage/blockvol/nvme/io.go b/weed/storage/blockvol/nvme/io.go index abb38e182..f7bead786 100644 --- a/weed/storage/blockvol/nvme/io.go +++ b/weed/storage/blockvol/nvme/io.go @@ -1,11 +1,12 @@ package nvme // handleRead processes an NVMe Read command. -func (c *Controller) handleRead(req *Request) error { +func (c *Controller) handleRead(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } dev := sub.Dev @@ -18,31 +19,37 @@ func (c *Controller) handleRead(req *Request) error { nsze := dev.VolumeSize() / uint64(blockSize) if lba+uint64(nlb) > nsze { req.resp.Status = uint16(StatusLBAOutOfRange) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } data, err := dev.ReadAt(lba, totalBytes) if err != nil { req.resp.Status = uint16(mapBlockError(err)) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } req.c2hData = data - return c.sendC2HDataAndResponse(req) + c.enqueueResponse(&response{resp: req.resp, c2hData: req.c2hData}) } // handleWrite processes an NVMe Write command with inline or R2T data. -func (c *Controller) handleWrite(req *Request) error { +// By the time this handler runs, req.payload always contains the write data: +// either inline from the CapsuleCmd PDU, or collected via R2T by collectR2TData. +func (c *Controller) handleWrite(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } // Check ANA state (write-gating) if !c.isWriteAllowed() { req.resp.Status = uint16(StatusNSNotReady) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } dev := sub.Dev @@ -55,72 +62,63 @@ func (c *Controller) handleWrite(req *Request) error { nsze := dev.VolumeSize() / uint64(blockSize) if lba+uint64(nlb) > nsze { req.resp.Status = uint16(StatusLBAOutOfRange) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } - var writeData []byte - - if len(req.payload) > 0 { - // Inline data path: data was in the CapsuleCmd PDU. - if uint32(len(req.payload)) != expectedBytes { - req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) - } - writeData = req.payload - } else { - // R2T flow: send Ready-to-Transfer, then receive H2C Data PDUs. - if err := c.sendR2T(req.capsule.CID, 0, 0, expectedBytes); err != nil { - return err - } - data, err := c.recvH2CData(expectedBytes) - if err != nil { - return err - } - writeData = data - defer putBuffer(data) + if uint32(len(req.payload)) != expectedBytes { + req.resp.Status = uint16(StatusInvalidField) + c.enqueueResponse(&response{resp: req.resp}) + return } throttleOnWALPressure(dev) - if err := writeWithRetry(dev, lba, writeData); err != nil { + if err := writeWithRetry(dev, lba, req.payload); err != nil { req.resp.Status = uint16(mapBlockError(err)) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // handleFlush processes an NVMe Flush command. -func (c *Controller) handleFlush(req *Request) error { +func (c *Controller) handleFlush(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } if !c.isWriteAllowed() { req.resp.Status = uint16(StatusNSNotReady) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } if err := sub.Dev.SyncCache(); err != nil { req.resp.Status = uint16(mapBlockError(err)) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // handleWriteZeros processes an NVMe Write Zeroes command. -func (c *Controller) handleWriteZeros(req *Request) error { +func (c *Controller) handleWriteZeros(req *Request) { sub := c.subsystem if sub == nil { req.resp.Status = uint16(StatusInvalidField) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } if !c.isWriteAllowed() { req.resp.Status = uint16(StatusNSNotReady) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } dev := sub.Dev @@ -133,14 +131,16 @@ func (c *Controller) handleWriteZeros(req *Request) error { nsze := dev.VolumeSize() / uint64(blockSize) if lba+uint64(nlb) > nsze { req.resp.Status = uint16(StatusLBAOutOfRange) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } // D12 bit 25: DEALLOC — if set, use Trim instead of writing zeros if req.capsule.D12&commandBitDeallocate != 0 { if err := dev.Trim(lba, totalBytes); err != nil { req.resp.Status = uint16(mapBlockError(err)) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } } else { zeroBuf := getBuffer(int(totalBytes)) @@ -152,11 +152,12 @@ func (c *Controller) handleWriteZeros(req *Request) error { putBuffer(zeroBuf) if err != nil { req.resp.Status = uint16(mapBlockError(err)) - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) + return } } - return c.sendResponse(req) + c.enqueueResponse(&response{resp: req.resp}) } // isWriteAllowed checks if the current ANA state allows writes. diff --git a/weed/storage/blockvol/nvme/nvme_test.go b/weed/storage/blockvol/nvme/nvme_test.go index 75493819f..d0d2a965b 100644 --- a/weed/storage/blockvol/nvme/nvme_test.go +++ b/weed/storage/blockvol/nvme/nvme_test.go @@ -736,10 +736,10 @@ func TestIdentify_Controller(t *testing.T) { if subNQN != nqn { t.Fatalf("SubNQN = %q, want %q", subNQN, nqn) } - // IOCCSZ + // IOCCSZ: (64 + maxDataLen) / 16 = (64 + 32768) / 16 = 2052 ioccsz := binary.LittleEndian.Uint32(data[1792:]) - if ioccsz != 4 { - t.Fatalf("IOCCSZ = %d, want 4", ioccsz) + if ioccsz != 2052 { + t.Fatalf("IOCCSZ = %d, want 2052", ioccsz) } // IORCSZ iorcsz := binary.LittleEndian.Uint32(data[1796:]) @@ -3439,3 +3439,117 @@ func TestWriteWithRetry_SharedTransientConcurrency(t *testing.T) { } } } + +// TestTxLoop_R2TDoneClosedOnError verifies that when txLoop has already +// encountered a write error, a subsequently enqueued R2T response has its +// r2tDone channel closed (not leaked), so collectR2TData does not deadlock. +func TestTxLoop_R2TDoneClosedOnError(t *testing.T) { + done := make(chan struct{}) + + r2tDone := make(chan struct{}) + resp := &response{ + r2t: &R2THeader{CCCID: 1, DATAL: 4096}, + r2tDone: r2tDone, + } + + // completeWaiters must close r2tDone — this is what txLoop calls + // when it discards a response after a write error. + completeWaiters(resp) + + // r2tDone must be closed and selectable without blocking. + select { + case <-r2tDone: + // OK — channel was closed. + case <-time.After(time.Second): + t.Fatal("r2tDone was not closed by completeWaiters — would deadlock collectR2TData") + } + _ = done +} + +// TestTxLoop_R2TDoneClosedOnError_Integration tests the full txLoop path: +// a prior write error causes a queued R2T to be discarded with r2tDone closed. +func TestTxLoop_R2TDoneClosedOnError_Integration(t *testing.T) { + // Create a controller with a broken writer (always errors). + failWriter := &failingWriter{} + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + defer clientConn.Close() + c := &Controller{ + conn: serverConn, + out: NewWriter(failWriter), + respCh: make(chan *response, 16), + done: make(chan struct{}), + } + + // Enqueue a normal response that will fail to write. + c.respCh <- &response{resp: CapsuleResponse{CID: 1}} + + // Enqueue an R2T with a done channel. + r2tDone := make(chan struct{}) + c.respCh <- &response{ + r2t: &R2THeader{CCCID: 2, DATAL: 4096}, + r2tDone: r2tDone, + } + + // Signal txLoop to exit after draining. + close(c.done) + + // Run txLoop — it should exit without hanging. + errCh := make(chan error, 1) + go func() { + errCh <- c.txLoop() + }() + + select { + case <-errCh: + // txLoop exited. + case <-time.After(3 * time.Second): + t.Fatal("txLoop hung — r2tDone was not closed on discarded R2T") + } + + // Verify r2tDone was closed. + select { + case <-r2tDone: + // OK + default: + t.Fatal("r2tDone was not closed") + } +} + +// failingWriter is an io.Writer that always returns an error. +type failingWriter struct{} + +func (f *failingWriter) Write(p []byte) (int, error) { + return 0, errors.New("injected write failure") +} + +// TestShutdown_ReleasePendingCapsuleBuffers verifies that shutdown() releases +// pooled payload buffers from interleaved capsules that were never dispatched. +func TestShutdown_ReleasePendingCapsuleBuffers(t *testing.T) { + c := &Controller{ + conn: &net.TCPConn{}, // dummy, Close() will fail but closeOnce still runs + } + + // Simulate buffered interleaved capsules with pooled payloads. + buf1 := getBuffer(4096) + buf2 := getBuffer(65536) + c.pendingCapsules = []*Request{ + {payload: buf1}, + {payload: buf2}, + {payload: nil}, // no payload — should not panic + } + + c.shutdown() + + // After shutdown, pendingCapsules should be nil and payloads released. + if c.pendingCapsules != nil { + t.Fatal("pendingCapsules not cleared after shutdown") + } +} + +// TestCompleteWaiters_NilR2TDone verifies completeWaiters does not panic +// when r2tDone is nil (standard non-R2T response). +func TestCompleteWaiters_NilR2TDone(t *testing.T) { + resp := &response{resp: CapsuleResponse{CID: 1}} + completeWaiters(resp) // must not panic +} diff --git a/weed/storage/blockvol/nvme/wire.go b/weed/storage/blockvol/nvme/wire.go index 222dd42a2..b5b061f7c 100644 --- a/weed/storage/blockvol/nvme/wire.go +++ b/weed/storage/blockvol/nvme/wire.go @@ -9,7 +9,7 @@ import ( // ---------- Reader ---------- -// Reader decodes NVMe/TCP PDUs from a stream. +// Reader decodes NVMe/TCP PDUs from a buffered stream. // // Usage: // @@ -26,9 +26,14 @@ type Reader struct { padBuf [maxHeaderSize]byte // reuse for padding skip } -// NewReader wraps an io.Reader for NVMe/TCP PDU decoding. +// NewReader wraps an io.Reader with a default-sized bufio.Reader. func NewReader(r io.Reader) *Reader { - return &Reader{rd: r} + return &Reader{rd: bufio.NewReader(r)} +} + +// NewReaderSize wraps an io.Reader with a specified buffer size. +func NewReaderSize(r io.Reader, size int) *Reader { + return &Reader{rd: bufio.NewReaderSize(r, size)} } // Dequeue reads the 8-byte CommonHeader, validates bounds, and returns it. diff --git a/weed/storage/blockvol/testrunner/actions/bench.go b/weed/storage/blockvol/testrunner/actions/bench.go index df51eae9e..bf94c27c6 100644 --- a/weed/storage/blockvol/testrunner/actions/bench.go +++ b/weed/storage/blockvol/testrunner/actions/bench.go @@ -225,13 +225,31 @@ type fioLatency struct { Percentile map[string]float64 `json:"percentile"` } -// ParseFioMetric extracts a named metric from fio JSON. +// ParseFioMetric extracts a named metric from fio JSON or returns a plain +// numeric value directly. This allows bench_compare to accept either raw fio +// JSON output or pre-aggregated scalar values (e.g. from bench_stats or +// phase repeat/aggregate). +// // direction: "read", "write", or "" (auto-detect: use whichever has IOPS > 0). // Supported metrics: "iops", "bw_bytes", "bw_mb", "lat_mean_us", "lat_p50_us", "lat_p99_us", "lat_p999_us" -func ParseFioMetric(jsonStr, metric, direction string) (float64, error) { +func ParseFioMetric(input, metric, direction string) (float64, error) { + // Try plain numeric value first (from aggregation or bench_stats). + trimmed := strings.TrimSpace(input) + if v, err := strconv.ParseFloat(trimmed, 64); err == nil { + return v, nil + } + + // Try quoted numeric string (e.g. "15322.00"). + if len(trimmed) >= 2 && trimmed[0] == '"' && trimmed[len(trimmed)-1] == '"' { + if v, err := strconv.ParseFloat(trimmed[1:len(trimmed)-1], 64); err == nil { + return v, nil + } + } + + // Parse as fio JSON. var output fioOutput - if err := json.Unmarshal([]byte(jsonStr), &output); err != nil { - return 0, fmt.Errorf("parse fio JSON: %w", err) + if err := json.Unmarshal([]byte(input), &output); err != nil { + return 0, fmt.Errorf("parse fio metric: input is neither a number nor valid fio JSON: %w", err) } if len(output.Jobs) == 0 { return 0, fmt.Errorf("fio JSON has no jobs") diff --git a/weed/storage/blockvol/testrunner/actions/bench_test.go b/weed/storage/blockvol/testrunner/actions/bench_test.go index c4dd7eeb9..7a5df09da 100644 --- a/weed/storage/blockvol/testrunner/actions/bench_test.go +++ b/weed/storage/blockvol/testrunner/actions/bench_test.go @@ -183,8 +183,55 @@ func TestParseFioMetric_UnknownMetric(t *testing.T) { } } +func TestParseFioMetric_PlainNumber(t *testing.T) { + val, err := ParseFioMetric("15322.00", "iops", "") + if err != nil { + t.Fatalf("parse plain number: %v", err) + } + if val != 15322.00 { + t.Fatalf("got %f, want 15322.00", val) + } +} + +func TestParseFioMetric_PlainInteger(t *testing.T) { + val, err := ParseFioMetric("36853", "iops", "") + if err != nil { + t.Fatalf("parse plain integer: %v", err) + } + if val != 36853 { + t.Fatalf("got %f, want 36853", val) + } +} + +func TestParseFioMetric_QuotedNumber(t *testing.T) { + val, err := ParseFioMetric(`"15322.00"`, "iops", "") + if err != nil { + t.Fatalf("parse quoted number: %v", err) + } + if val != 15322.00 { + t.Fatalf("got %f, want 15322.00", val) + } +} + +func TestParseFioMetric_NumberWithWhitespace(t *testing.T) { + val, err := ParseFioMetric(" 42000.50 \n", "iops", "") + if err != nil { + t.Fatalf("parse number with whitespace: %v", err) + } + if val != 42000.50 { + t.Fatalf("got %f, want 42000.50", val) + } +} + +func TestParseFioMetric_InvalidInput(t *testing.T) { + _, err := ParseFioMetric("not a number or json", "iops", "") + if err == nil { + t.Fatal("expected error for invalid input") + } +} + func TestParseFioMetric_InvalidJSON(t *testing.T) { - _, err := ParseFioMetric("not json", "iops", "") + _, err := ParseFioMetric("{invalid json}", "iops", "") if err == nil { t.Fatal("expected error for invalid JSON") } diff --git a/weed/storage/blockvol/testrunner/actions/devops_test.go b/weed/storage/blockvol/testrunner/actions/devops_test.go index 1e27003fe..9aab7277c 100644 --- a/weed/storage/blockvol/testrunner/actions/devops_test.go +++ b/weed/storage/blockvol/testrunner/actions/devops_test.go @@ -80,8 +80,8 @@ func TestAllActions_Registration(t *testing.T) { if n := len(byTier[tr.TierCore]); n != 11 { t.Errorf("core: %d, want 11", n) } - if n := len(byTier[tr.TierBlock]); n != 52 { - t.Errorf("block: %d, want 52", n) + if n := len(byTier[tr.TierBlock]); n != 55 { + t.Errorf("block: %d, want 55", n) } if n := len(byTier[tr.TierDevOps]); n != 7 { t.Errorf("devops: %d, want 7", n) @@ -93,13 +93,13 @@ func TestAllActions_Registration(t *testing.T) { t.Errorf("k8s: %d, want 14", n) } - // Total should be 89 (85 existing + 3 pgbench + 1 bench_stats). + // Total should be 92 (89 existing + 3 profiling: pprof_capture, vmstat_capture, iostat_capture). total := 0 for _, actions := range byTier { total += len(actions) } - if total != 89 { - t.Errorf("total actions: %d, want 89", total) + if total != 92 { + t.Errorf("total actions: %d, want 92", total) } } diff --git a/weed/storage/blockvol/testrunner/actions/metrics.go b/weed/storage/blockvol/testrunner/actions/metrics.go index 8b4286fa0..6ab574dff 100644 --- a/weed/storage/blockvol/testrunner/actions/metrics.go +++ b/weed/storage/blockvol/testrunner/actions/metrics.go @@ -19,6 +19,9 @@ func RegisterMetricsActions(r *tr.Registry) { r.RegisterFunc("assert_metric_gt", tr.TierBlock, assertMetricGT) r.RegisterFunc("assert_metric_eq", tr.TierBlock, assertMetricEQ) r.RegisterFunc("assert_metric_lt", tr.TierBlock, assertMetricLT) + r.RegisterFunc("pprof_capture", tr.TierBlock, pprofCapture) + r.RegisterFunc("vmstat_capture", tr.TierBlock, vmstatCapture) + r.RegisterFunc("iostat_capture", tr.TierBlock, iostatCapture) } // scrapeMetrics fetches /metrics from a target's admin port via SSH curl. @@ -153,6 +156,158 @@ func assertMetricLT(ctx context.Context, actx *tr.ActionContext, act tr.Action) return map[string]string{"value": strconv.FormatFloat(val, 'g', -1, 64)}, nil } +// pprofCapture fetches a Go pprof profile from a target's admin port and saves +// it to a file on the target node. The profile is fetched via SSH curl to the +// admin server's /debug/pprof/ endpoint. +// +// Params: +// - target (required): target name +// - profile: pprof profile type (default: "profile" = CPU) +// Supported: "profile" (CPU), "heap", "allocs", "block", "mutex", "goroutine" +// - seconds: duration for CPU profile (default: "30") +// - output_dir: directory to save profile on target node (default: "/tmp/pprof") +// - label: filename label (default: profile type) +// +// Returns: value = remote file path of saved profile +func pprofCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) { + tgt, err := getHATarget(actx, act.Target) + if err != nil { + return nil, err + } + + profile := paramDefault(act.Params, "profile", "profile") + seconds := paramDefault(act.Params, "seconds", "30") + outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof") + label := paramDefault(act.Params, "label", profile) + + // Validate profile type. + switch profile { + case "profile", "heap", "allocs", "block", "mutex", "goroutine": + // OK + default: + return nil, fmt.Errorf("pprof_capture: unsupported profile type %q", profile) + } + + // Create output directory. + if _, _, _, err := tgt.Node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil { + return nil, fmt.Errorf("pprof_capture: mkdir: %w", err) + } + + outFile := fmt.Sprintf("%s/%s.pb.gz", outputDir, label) + + // Build URL. CPU profile uses ?seconds= parameter. + url := fmt.Sprintf("http://127.0.0.1:%d/debug/pprof/%s", tgt.AdminPort, profile) + if profile == "profile" { + url += "?seconds=" + seconds + } + + actx.Log(" pprof %s → %s (%ss)", profile, outFile, seconds) + + cmd := fmt.Sprintf("curl -s -o %s '%s'", outFile, url) + // CPU profile can take a while — extend timeout. + _, stderr, code, err := tgt.Node.Run(ctx, cmd) + if err != nil || code != 0 { + return nil, fmt.Errorf("pprof_capture: curl failed: code=%d stderr=%s err=%v", code, stderr, err) + } + + return map[string]string{"value": outFile}, nil +} + +// vmstatCapture runs vmstat on a node for a duration and saves the output. +// Params: +// - node (required): node to run on +// - seconds: duration (default: "30") +// - interval: vmstat interval (default: "1") +// - output_dir: directory for output (default: "/tmp/pprof") +// - label: filename label (default: "vmstat") +// +// Returns: value = remote file path +func vmstatCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) { + node, err := getNode(actx, act.Node) + if err != nil { + return nil, err + } + + seconds := paramDefault(act.Params, "seconds", "30") + interval := paramDefault(act.Params, "interval", "1") + outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof") + label := paramDefault(act.Params, "label", "vmstat") + + if _, _, _, err := node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil { + return nil, fmt.Errorf("vmstat_capture: mkdir: %w", err) + } + + outFile := fmt.Sprintf("%s/%s.txt", outputDir, label) + + // vmstat + count, _ := strconv.Atoi(seconds) + intv, _ := strconv.Atoi(interval) + if intv < 1 { + intv = 1 + } + iterations := count / intv + if iterations < 1 { + iterations = 1 + } + + cmd := fmt.Sprintf("vmstat %s %d > %s 2>&1", interval, iterations, outFile) + actx.Log(" vmstat %s×%d → %s", interval, iterations, outFile) + + _, _, code, err := node.Run(ctx, cmd) + if err != nil || code != 0 { + return nil, fmt.Errorf("vmstat_capture: code=%d err=%v", code, err) + } + + return map[string]string{"value": outFile}, nil +} + +// iostatCapture runs iostat -x on a node for a duration and saves the output. +// Params: +// - node (required): node to run on +// - seconds: duration (default: "30") +// - interval: iostat interval (default: "1") +// - output_dir: directory for output (default: "/tmp/pprof") +// - label: filename label (default: "iostat") +// +// Returns: value = remote file path +func iostatCapture(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) { + node, err := getNode(actx, act.Node) + if err != nil { + return nil, err + } + + seconds := paramDefault(act.Params, "seconds", "30") + interval := paramDefault(act.Params, "interval", "1") + outputDir := paramDefault(act.Params, "output_dir", "/tmp/pprof") + label := paramDefault(act.Params, "label", "iostat") + + if _, _, _, err := node.RunRoot(ctx, "mkdir -p "+outputDir); err != nil { + return nil, fmt.Errorf("iostat_capture: mkdir: %w", err) + } + + outFile := fmt.Sprintf("%s/%s.txt", outputDir, label) + + count, _ := strconv.Atoi(seconds) + intv, _ := strconv.Atoi(interval) + if intv < 1 { + intv = 1 + } + iterations := count / intv + if iterations < 1 { + iterations = 1 + } + + cmd := fmt.Sprintf("iostat -x %s %d > %s 2>&1", interval, iterations, outFile) + actx.Log(" iostat -x %s×%d → %s", interval, iterations, outFile) + + _, _, code, err := node.Run(ctx, cmd) + if err != nil || code != 0 { + return nil, fmt.Errorf("iostat_capture: code=%d err=%v", code, err) + } + + return map[string]string{"value": outFile}, nil +} + // collectArtifactsAction explicitly collects artifacts from targets. func collectArtifactsAction(ctx context.Context, actx *tr.ActionContext, act tr.Action) (map[string]string, error) { dir := act.Params["dir"] diff --git a/weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml b/weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml index 7b99b03db..62f12905d 100644 --- a/weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml +++ b/weed/storage/blockvol/testrunner/scenarios/cp103-25g-ab.yaml @@ -16,9 +16,9 @@ targets: node: server vol_size: "1073741824" wal_size: "536870912" - iscsi_port: 3263 - nvme_port: 4420 - admin_port: 8083 + iscsi_port: 3270 + nvme_port: 4430 + admin_port: 8090 iqn_suffix: "bench-25g" nqn_suffix: "bench-25g"