diff --git a/weed/storage/blockvol/iscsi/session.go b/weed/storage/blockvol/iscsi/session.go index 898a869aa..2429d33fb 100644 --- a/weed/storage/blockvol/iscsi/session.go +++ b/weed/storage/blockvol/iscsi/session.go @@ -48,6 +48,11 @@ type Session struct { // Data sequencing dataInWriter *DataInWriter + // PDU queue for commands received during Data-Out collection. + // The initiator pipelines commands; we may read a SCSI Command + // while waiting for Data-Out PDUs. + pending []*PDU + // Shutdown closed atomic.Bool closeErr error @@ -80,7 +85,7 @@ func (s *Session) HandleConnection() error { defer s.close() for !s.closed.Load() { - pdu, err := ReadPDU(s.conn) + pdu, err := s.nextPDU() if err != nil { if s.closed.Load() { return nil @@ -101,6 +106,17 @@ func (s *Session) HandleConnection() error { return nil } +// nextPDU returns the next PDU to process, draining the pending queue +// (populated during Data-Out collection) before reading from the connection. +func (s *Session) nextPDU() (*PDU, error) { + if len(s.pending) > 0 { + pdu := s.pending[0] + s.pending = s.pending[1:] + return pdu, nil + } + return ReadPDU(s.conn) +} + // Close terminates the session. func (s *Session) Close() error { s.closed.Store(true) @@ -274,14 +290,17 @@ func (s *Session) collectDataOut(collector *DataOutCollector, itt uint32) error } r2tSN++ - // Read Data-Out PDUs until F-bit + // Read Data-Out PDUs until F-bit. + // The initiator may pipeline other commands; queue them for later. for { doPDU, err := ReadPDU(s.conn) if err != nil { return err } if doPDU.Opcode() != OpSCSIDataOut { - return fmt.Errorf("expected Data-Out, got %s", OpcodeName(doPDU.Opcode())) + // Not our Data-Out — queue for later dispatch + s.pending = append(s.pending, doPDU) + continue } if err := collector.AddDataOut(doPDU); err != nil { return err