Browse Source

feat: tri-state IOBackend config with explicit logging and CLI flag

Replace UseIOUring bool with IOBackend IOBackendMode (tri-state):
- "standard" (default): sequential pread/pwrite/fdatasync
- "auto": try io_uring, fall back to standard with warning log
- "io_uring": require io_uring, fail startup if unavailable

NewIOUring now returns ErrIOUringUnavailable instead of silently
falling back — callers decide whether to fail or fall back based
on the requested mode. All mode transitions are logged:
  io backend: requested=auto selected=standard reason=...
  io backend: requested=io_uring selected=io_uring

CLI: --io-backend=standard|auto|io_uring added to iscsi-target.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 2 weeks ago
parent
commit
9d0ec8efa3
  1. 10
      weed/storage/blockvol/batchio/batchio.go
  2. 6
      weed/storage/blockvol/batchio/iouring_linux.go
  3. 4
      weed/storage/blockvol/batchio/iouring_other.go
  4. 34
      weed/storage/blockvol/batchio/standard_test.go
  5. 46
      weed/storage/blockvol/blockvol.go
  6. 18
      weed/storage/blockvol/config.go
  7. 9
      weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go

10
weed/storage/blockvol/batchio/batchio.go

@ -13,7 +13,15 @@
// bio.Fsync(fd)
package batchio
import "os"
import (
"errors"
"os"
)
// ErrIOUringUnavailable is returned by NewIOUring when io_uring cannot be
// initialized (unsupported kernel, seccomp, non-Linux platform, etc.).
// Callers decide whether to fall back to standard or fail.
var ErrIOUringUnavailable = errors.New("batchio: io_uring unavailable")
// Op represents a single I/O operation: read or write buf at offset.
type Op struct {

6
weed/storage/blockvol/batchio/iouring_linux.go

@ -17,13 +17,11 @@ type ioUringBatchIO struct {
}
// NewIOUring creates a BatchIO backed by io_uring with the given ring size.
// If io_uring is unavailable (kernel too old, seccomp, etc.), returns
// NewStandard() with a nil error — silent fallback.
// Returns ErrIOUringUnavailable if io_uring cannot be initialized.
func NewIOUring(ringSize uint) (BatchIO, error) {
ring, err := iouring.New(ringSize)
if err != nil {
// Kernel doesn't support io_uring — fall back silently.
return NewStandard(), nil
return nil, fmt.Errorf("%w: %v", ErrIOUringUnavailable, err)
}
return &ioUringBatchIO{ring: ring, ringSize: int(ringSize)}, nil
}

4
weed/storage/blockvol/batchio/iouring_other.go

@ -2,8 +2,8 @@
package batchio
// NewIOUring returns a standard (sequential) BatchIO on non-Linux platforms
// NewIOUring returns ErrIOUringUnavailable on non-Linux platforms
// or when io_uring is disabled via the no_iouring build tag.
func NewIOUring(ringSize uint) (BatchIO, error) {
return NewStandard(), nil
return nil, ErrIOUringUnavailable
}

34
weed/storage/blockvol/batchio/standard_test.go

@ -2,6 +2,7 @@ package batchio
import (
"bytes"
"errors"
"os"
"path/filepath"
"testing"
@ -217,29 +218,16 @@ func TestStandard_PwriteBatch_ErrorOnReadOnly(t *testing.T) {
}
}
func TestNewIOUring_Fallback(t *testing.T) {
func TestNewIOUring_Unavailable(t *testing.T) {
// On non-Linux (or without io_uring support), NewIOUring returns
// a working BatchIO (standard fallback).
bio, err := NewIOUring(256)
if err != nil {
t.Fatalf("NewIOUring: %v", err)
}
defer bio.Close()
// Verify it works by doing a write+read cycle.
f := tempFile(t)
data := []byte("iouring fallback test")
if err := bio.PwriteBatch(f, []Op{{Buf: data, Offset: 0}}); err != nil {
t.Fatalf("PwriteBatch: %v", err)
}
if err := bio.Fsync(f); err != nil {
t.Fatalf("Fsync: %v", err)
}
got := make([]byte, len(data))
if err := bio.PreadBatch(f, []Op{{Buf: got, Offset: 0}}); err != nil {
t.Fatalf("PreadBatch: %v", err)
}
if !bytes.Equal(got, data) {
t.Errorf("got %q, want %q", got, data)
// ErrIOUringUnavailable instead of silently falling back.
_, err := NewIOUring(256)
if err == nil {
// io_uring is actually available (Linux with kernel 5.6+).
// This test validates the error path, so skip on supported systems.
t.Skip("io_uring is available on this system")
}
if !errors.Is(err, ErrIOUringUnavailable) {
t.Fatalf("expected ErrIOUringUnavailable, got: %v", err)
}
}

46
weed/storage/blockvol/blockvol.go

@ -152,6 +152,12 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
Metrics: v.Metrics,
})
go v.groupCommit.Run()
bio, _, err := newBatchIO(cfg.IOBackend, log.Default())
if err != nil {
fd.Close()
os.Remove(path)
return nil, fmt.Errorf("blockvol: %w", err)
}
v.flusher = NewFlusher(FlusherConfig{
FD: fd,
Super: &v.super,
@ -159,7 +165,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
DirtyMap: dm,
Interval: cfg.FlushInterval,
Metrics: v.Metrics,
BatchIO: newBatchIO(cfg.UseIOUring),
BatchIO: bio,
})
go v.flusher.Run()
v.walAdmission = NewWALAdmission(WALAdmissionConfig{
@ -238,6 +244,11 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
Metrics: v.Metrics,
})
go v.groupCommit.Run()
bio, _, err := newBatchIO(cfg.IOBackend, log.Default())
if err != nil {
fd.Close()
return nil, fmt.Errorf("blockvol: %w", err)
}
v.flusher = NewFlusher(FlusherConfig{
FD: fd,
Super: &v.super,
@ -245,7 +256,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
DirtyMap: dirtyMap,
Interval: cfg.FlushInterval,
Metrics: v.Metrics,
BatchIO: newBatchIO(cfg.UseIOUring),
BatchIO: bio,
})
go v.flusher.Run()
@ -1147,15 +1158,32 @@ func (v *BlockVol) Close() error {
return closeErr
}
// newBatchIO creates a BatchIO backend based on config.
// When useIOUring is true, attempts io_uring with silent fallback to standard.
func newBatchIO(useIOUring bool) batchio.BatchIO {
if useIOUring {
// newBatchIO creates a BatchIO backend based on the IOBackend config mode.
//
// Returns (backend, selectedName, error):
// - "standard": always succeeds
// - "auto": tries io_uring, falls back to standard with warning
// - "io_uring": requires io_uring, returns error if unavailable
func newBatchIO(mode IOBackendMode, logger *log.Logger) (batchio.BatchIO, string, error) {
switch mode {
case IOBackendIOUring:
bio, err := batchio.NewIOUring(256)
if err != nil {
return nil, "", fmt.Errorf("io_uring requested but unavailable: %w", err)
}
logger.Printf("io backend: requested=io_uring selected=io_uring")
return bio, "io_uring", nil
case IOBackendAuto:
bio, err := batchio.NewIOUring(256)
if err != nil {
return batchio.NewStandard()
logger.Printf("io backend: requested=auto selected=standard reason=%v", err)
return batchio.NewStandard(), "standard", nil
}
return bio
logger.Printf("io backend: requested=auto selected=io_uring")
return bio, "io_uring", nil
default: // IOBackendStandard or empty
return batchio.NewStandard(), "standard", nil
}
return batchio.NewStandard()
}

18
weed/storage/blockvol/config.go

@ -19,9 +19,25 @@ type BlockVolConfig struct {
WALSoftWatermark float64 // WAL fraction above which writes begin throttling (default 0.7)
WALHardWatermark float64 // WAL fraction above which writes block until drain (default 0.9)
WALMaxConcurrentWrites int // max concurrent writers in WAL append path (default 16)
UseIOUring bool // opt-in: use io_uring for batch flusher I/O (Linux 5.6+ only)
IOBackend IOBackendMode // flusher I/O backend: "auto", "standard", "io_uring" (default "standard")
}
// IOBackendMode selects the batch I/O backend for the flusher.
type IOBackendMode string
const (
// IOBackendStandard uses sequential pread/pwrite/fdatasync (default).
IOBackendStandard IOBackendMode = "standard"
// IOBackendAuto uses io_uring if available, falls back to standard.
// Logs which backend was selected.
IOBackendAuto IOBackendMode = "auto"
// IOBackendIOUring requires io_uring. Fails volume open/create if unavailable.
// Use for benchmarking to guarantee the io_uring path is active.
IOBackendIOUring IOBackendMode = "io_uring"
)
// DefaultConfig returns a BlockVolConfig with production defaults.
func DefaultConfig() BlockVolConfig {
return BlockVolConfig{

9
weed/storage/blockvol/iscsi/cmd/iscsi-target/main.go

@ -43,6 +43,7 @@ func main() {
nqn := flag.String("nqn", "", "NVMe NQN (defaults to nqn.2024-01.com.seaweedfs:vol.<sanitized iqn suffix>)")
walMaxCW := flag.Int("wal-max-concurrent-writes", 0, "max concurrent writers in WAL append path (0 = use default 16)")
nvmeIOQueues := flag.Int("nvme-io-queues", 0, "max NVMe IO queues (0 = use default 4)")
ioBackend := flag.String("io-backend", "standard", "flusher I/O backend: standard, auto, io_uring")
flag.Parse()
if *volPath == "" {
@ -59,14 +60,14 @@ func main() {
logger := log.New(os.Stdout, "[iscsi] ", log.LstdFlags)
// Build config with optional WAL concurrency override.
var cfgs []blockvol.BlockVolConfig
// Build config.
cfg := blockvol.DefaultConfig()
cfg.IOBackend = blockvol.IOBackendMode(*ioBackend)
if *walMaxCW > 0 {
cfg := blockvol.DefaultConfig()
cfg.WALMaxConcurrentWrites = *walMaxCW
cfgs = append(cfgs, cfg)
logger.Printf("WALMaxConcurrentWrites = %d", *walMaxCW)
}
cfgs := []blockvol.BlockVolConfig{cfg}
var vol *blockvol.BlockVol
var err error

Loading…
Cancel
Save