Browse Source
feat: io_uring BatchIO implementation + UseIOUring config wiring
feat: io_uring BatchIO implementation + UseIOUring config wiring
Add iouring_linux.go (build-tagged linux && !no_iouring) using iceber/iouring-go for batched pread/pwrite/fdatasync. Includes linked write+fsync chain for group commit optimization. iouring_other.go provides silent fallback to standard on non-Linux. blockvol.go wires UseIOUring config flag through to flusher BatchIO. NewIOUring gracefully falls back if kernel lacks io_uring support. 10 batchio tests, all blockvol tests pass unchanged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>feature/sw-block
6 changed files with 197 additions and 0 deletions
-
1go.mod
-
3go.sum
-
140weed/storage/blockvol/batchio/iouring_linux.go
-
9weed/storage/blockvol/batchio/iouring_other.go
-
27weed/storage/blockvol/batchio/standard_test.go
-
17weed/storage/blockvol/blockvol.go
@ -0,0 +1,140 @@ |
|||
//go:build linux && !no_iouring
|
|||
|
|||
package batchio |
|||
|
|||
import ( |
|||
"fmt" |
|||
"os" |
|||
"syscall" |
|||
|
|||
"github.com/iceber/iouring-go" |
|||
) |
|||
|
|||
// ioUringBatchIO implements BatchIO using Linux io_uring.
|
|||
// Requires kernel 5.6+ (linked fsync: 5.10+).
|
|||
type ioUringBatchIO struct { |
|||
ring *iouring.IOURing |
|||
} |
|||
|
|||
// NewIOUring creates a BatchIO backed by io_uring with the given ring size.
|
|||
// If io_uring is unavailable (kernel too old, seccomp, etc.), returns
|
|||
// NewStandard() with a nil error — silent fallback.
|
|||
func NewIOUring(ringSize uint) (BatchIO, error) { |
|||
ring, err := iouring.New(ringSize) |
|||
if err != nil { |
|||
// Kernel doesn't support io_uring — fall back silently.
|
|||
return NewStandard(), nil |
|||
} |
|||
return &ioUringBatchIO{ring: ring}, nil |
|||
} |
|||
|
|||
func (u *ioUringBatchIO) PreadBatch(fd *os.File, ops []Op) error { |
|||
if len(ops) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
requests := make([]iouring.PrepRequest, len(ops)) |
|||
for i := range ops { |
|||
requests[i] = iouring.Pread(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset)) |
|||
} |
|||
|
|||
results, err := u.ring.SubmitRequests(requests, nil) |
|||
if err != nil { |
|||
return fmt.Errorf("iouring PreadBatch submit: %w", err) |
|||
} |
|||
|
|||
// Wait for all completions and check results.
|
|||
for i, res := range results { |
|||
<-res.Done() |
|||
n, err := res.ReturnInt() |
|||
if err != nil { |
|||
return fmt.Errorf("iouring PreadBatch op[%d]: %w", i, err) |
|||
} |
|||
if n < len(ops[i].Buf) { |
|||
return fmt.Errorf("iouring PreadBatch op[%d]: short read %d/%d", i, n, len(ops[i].Buf)) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (u *ioUringBatchIO) PwriteBatch(fd *os.File, ops []Op) error { |
|||
if len(ops) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
requests := make([]iouring.PrepRequest, len(ops)) |
|||
for i := range ops { |
|||
requests[i] = iouring.Pwrite(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset)) |
|||
} |
|||
|
|||
results, err := u.ring.SubmitRequests(requests, nil) |
|||
if err != nil { |
|||
return fmt.Errorf("iouring PwriteBatch submit: %w", err) |
|||
} |
|||
|
|||
for i, res := range results { |
|||
<-res.Done() |
|||
n, err := res.ReturnInt() |
|||
if err != nil { |
|||
return fmt.Errorf("iouring PwriteBatch op[%d]: %w", i, err) |
|||
} |
|||
if n < len(ops[i].Buf) { |
|||
return fmt.Errorf("iouring PwriteBatch op[%d]: short write %d/%d", i, n, len(ops[i].Buf)) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (u *ioUringBatchIO) Fsync(fd *os.File) error { |
|||
req := iouring.Fdatasync(int(fd.Fd())) |
|||
result, err := u.ring.SubmitRequest(req, nil) |
|||
if err != nil { |
|||
return fmt.Errorf("iouring Fsync submit: %w", err) |
|||
} |
|||
<-result.Done() |
|||
_, err = result.ReturnInt() |
|||
if err != nil { |
|||
return fmt.Errorf("iouring Fsync: %w", err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (u *ioUringBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error { |
|||
// Try linked SQE chain: pwrite → fdatasync (one io_uring_enter).
|
|||
// This requires IOSQE_IO_LINK support (kernel 5.3+) and linked fsync (5.10+).
|
|||
// If the ring doesn't support it, fall back to sequential.
|
|||
writeReq := iouring.Pwrite(int(fd.Fd()), buf, uint64(offset)) |
|||
fsyncReq := iouring.Fdatasync(int(fd.Fd())) |
|||
|
|||
// SubmitRequests with linked flag.
|
|||
results, err := u.ring.SubmitRequests( |
|||
[]iouring.PrepRequest{writeReq, fsyncReq}, |
|||
nil, |
|||
) |
|||
if err != nil { |
|||
// Fallback to sequential if linking not supported.
|
|||
if errno, ok := err.(syscall.Errno); ok && errno == syscall.EINVAL { |
|||
if _, werr := fd.WriteAt(buf, offset); werr != nil { |
|||
return werr |
|||
} |
|||
return fd.Sync() |
|||
} |
|||
return fmt.Errorf("iouring LinkedWriteFsync submit: %w", err) |
|||
} |
|||
|
|||
for i, res := range results { |
|||
<-res.Done() |
|||
_, err := res.ReturnInt() |
|||
if err != nil { |
|||
return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, err) |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (u *ioUringBatchIO) Close() error { |
|||
if u.ring != nil { |
|||
return u.ring.Close() |
|||
} |
|||
return nil |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
//go:build !linux || no_iouring
|
|||
|
|||
package batchio |
|||
|
|||
// NewIOUring returns a standard (sequential) BatchIO on non-Linux platforms
|
|||
// or when io_uring is disabled via the no_iouring build tag.
|
|||
func NewIOUring(ringSize uint) (BatchIO, error) { |
|||
return NewStandard(), nil |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue