From cd1e0afa3b37f8b405b8e174300434fd271cf105 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Tue, 10 Mar 2026 18:11:39 -0700 Subject: [PATCH] feat: three io_uring backends for A/B/C benchmarking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split iouring_linux.go into three build-tagged implementations: 1. iouring_iceber_linux.go (-tags iouring_iceber) iceber/iouring-go library. Goroutine-based completion model. Known -72% write regression due to per-op channel overhead. 2. iouring_giouring_linux.go (-tags iouring_giouring) pawelgaczynski/giouring — direct liburing port. No goroutines, no channels. Direct SQE/CQE ring manipulation. Kernel 6.0+. 3. iouring_raw_linux.go (default on Linux, no tags needed) Raw syscall wrappers — io_uring_setup/io_uring_enter + mmap. Zero dependencies. ~300 LOC. Kernel 5.6+. Build commands for benchmarking: go build -tags iouring_iceber ./... # option A go build -tags iouring_giouring ./... # option B go build ./... # option C (raw, default) go build -tags no_iouring ./... # disable all io_uring All variants implement the same BatchIO interface. Cross-compile verified for all four tag combinations. Co-Authored-By: Claude Opus 4.6 --- go.mod | 1 + go.sum | 2 + .../batchio/iouring_giouring_linux.go | 208 ++++++++ ...uring_linux.go => iouring_iceber_linux.go} | 2 +- .../blockvol/batchio/iouring_raw_linux.go | 444 ++++++++++++++++++ 5 files changed, 656 insertions(+), 1 deletion(-) create mode 100644 weed/storage/blockvol/batchio/iouring_giouring_linux.go rename weed/storage/blockvol/batchio/{iouring_linux.go => iouring_iceber_linux.go} (99%) create mode 100644 weed/storage/blockvol/batchio/iouring_raw_linux.go diff --git a/go.mod b/go.mod index a91f2a225..3bba53002 100644 --- a/go.mod +++ b/go.mod @@ -257,6 +257,7 @@ require ( github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/parquet-go/bitpack v1.0.0 // indirect github.com/parquet-go/jsonlite v1.0.0 // indirect + github.com/pawelgaczynski/giouring v0.0.0-20230826085535-69588b89acb9 // indirect github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741 // indirect github.com/pierrre/geohash v1.0.0 // indirect github.com/pquerna/otp v1.5.0 // indirect diff --git a/go.sum b/go.sum index f9f9c76dd..19dda8989 100644 --- a/go.sum +++ b/go.sum @@ -1680,6 +1680,8 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pawelgaczynski/giouring v0.0.0-20230826085535-69588b89acb9 h1:Cu/CW2nKeqXinVjf5Bq1FeBD4jWG/msC5UazjjgAvsU= +github.com/pawelgaczynski/giouring v0.0.0-20230826085535-69588b89acb9/go.mod h1:HwOQqYv/WE3RMp4iTQsS6ou8WP3wKO9UXD0oDqB3NPU= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= diff --git a/weed/storage/blockvol/batchio/iouring_giouring_linux.go b/weed/storage/blockvol/batchio/iouring_giouring_linux.go new file mode 100644 index 000000000..0b6149876 --- /dev/null +++ b/weed/storage/blockvol/batchio/iouring_giouring_linux.go @@ -0,0 +1,208 @@ +//go:build linux && iouring_giouring + +package batchio + +import ( + "fmt" + "os" + "unsafe" + + "github.com/pawelgaczynski/giouring" +) + +// giouringBatchIO implements BatchIO using giouring (direct liburing port). +// No goroutines or channels — direct SQE/CQE ring manipulation. +// Requires kernel 6.0+. +type giouringBatchIO struct { + ring *giouring.Ring + ringSize int +} + +// NewIOUring creates a BatchIO backed by giouring with the given ring size. +// Returns ErrIOUringUnavailable if io_uring cannot be initialized. +func NewIOUring(ringSize uint) (BatchIO, error) { + ring, err := giouring.CreateRing(uint32(ringSize)) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrIOUringUnavailable, err) + } + return &giouringBatchIO{ring: ring, ringSize: int(ringSize)}, nil +} + +func (g *giouringBatchIO) PreadBatch(fd *os.File, ops []Op) error { + if len(ops) == 0 { + return nil + } + for start := 0; start < len(ops); start += g.ringSize { + end := start + g.ringSize + if end > len(ops) { + end = len(ops) + } + if err := g.preadChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} + +func (g *giouringBatchIO) preadChunk(fd *os.File, ops []Op) error { + fdInt := int(fd.Fd()) + for i := range ops { + sqe := g.ring.GetSQE() + if sqe == nil { + return fmt.Errorf("iouring PreadBatch: SQ full at op %d", i) + } + sqe.PrepareRead(fdInt, uintptr(unsafe.Pointer(&ops[i].Buf[0])), uint32(len(ops[i].Buf)), uint64(ops[i].Offset)) + sqe.SetData64(uint64(i)) + } + + _, err := g.ring.SubmitAndWait(uint32(len(ops))) + if err != nil { + return fmt.Errorf("iouring PreadBatch submit: %w", err) + } + + for i := range ops { + cqe, err := g.ring.WaitCQE() + if err != nil { + return fmt.Errorf("iouring PreadBatch wait op[%d]: %w", i, err) + } + if cqe.Res < 0 { + g.ring.CQESeen(cqe) + return fmt.Errorf("iouring PreadBatch op[%d]: %w", i, errFromRes(cqe.Res)) + } + if int(cqe.Res) < len(ops[cqe.UserData].Buf) { + g.ring.CQESeen(cqe) + return fmt.Errorf("iouring PreadBatch op[%d]: short read %d/%d", cqe.UserData, cqe.Res, len(ops[cqe.UserData].Buf)) + } + g.ring.CQESeen(cqe) + } + return nil +} + +func (g *giouringBatchIO) PwriteBatch(fd *os.File, ops []Op) error { + if len(ops) == 0 { + return nil + } + for start := 0; start < len(ops); start += g.ringSize { + end := start + g.ringSize + if end > len(ops) { + end = len(ops) + } + if err := g.pwriteChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} + +func (g *giouringBatchIO) pwriteChunk(fd *os.File, ops []Op) error { + fdInt := int(fd.Fd()) + for i := range ops { + sqe := g.ring.GetSQE() + if sqe == nil { + return fmt.Errorf("iouring PwriteBatch: SQ full at op %d", i) + } + sqe.PrepareWrite(fdInt, uintptr(unsafe.Pointer(&ops[i].Buf[0])), uint32(len(ops[i].Buf)), uint64(ops[i].Offset)) + sqe.SetData64(uint64(i)) + } + + _, err := g.ring.SubmitAndWait(uint32(len(ops))) + if err != nil { + return fmt.Errorf("iouring PwriteBatch submit: %w", err) + } + + for i := range ops { + cqe, err := g.ring.WaitCQE() + if err != nil { + return fmt.Errorf("iouring PwriteBatch wait op[%d]: %w", i, err) + } + if cqe.Res < 0 { + g.ring.CQESeen(cqe) + return fmt.Errorf("iouring PwriteBatch op[%d]: %w", cqe.UserData, errFromRes(cqe.Res)) + } + if int(cqe.Res) < len(ops[cqe.UserData].Buf) { + g.ring.CQESeen(cqe) + return fmt.Errorf("iouring PwriteBatch op[%d]: short write %d/%d", cqe.UserData, cqe.Res, len(ops[cqe.UserData].Buf)) + } + g.ring.CQESeen(cqe) + } + return nil +} + +// Fsync issues fdatasync via io_uring. +func (g *giouringBatchIO) Fsync(fd *os.File) error { + sqe := g.ring.GetSQE() + if sqe == nil { + return fmt.Errorf("iouring Fsync: SQ full") + } + sqe.PrepareFsync(int(fd.Fd()), giouring.FsyncDatasync) + + _, err := g.ring.SubmitAndWait(1) + if err != nil { + return fmt.Errorf("iouring Fsync submit: %w", err) + } + + cqe, err := g.ring.WaitCQE() + if err != nil { + return fmt.Errorf("iouring Fsync wait: %w", err) + } + defer g.ring.CQESeen(cqe) + if cqe.Res < 0 { + return fmt.Errorf("iouring Fsync: %w", errFromRes(cqe.Res)) + } + return nil +} + +// LinkedWriteFsync submits pwrite + fdatasync as a linked SQE chain. +func (g *giouringBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error { + fdInt := int(fd.Fd()) + + // SQE 1: pwrite with IO_LINK flag + sqe1 := g.ring.GetSQE() + if sqe1 == nil { + return fmt.Errorf("iouring LinkedWriteFsync: SQ full (write)") + } + sqe1.PrepareWrite(fdInt, uintptr(unsafe.Pointer(&buf[0])), uint32(len(buf)), uint64(offset)) + sqe1.SetFlags(uint32(giouring.SqeIOLink)) + + // SQE 2: fdatasync (no link flag — last in chain) + sqe2 := g.ring.GetSQE() + if sqe2 == nil { + return fmt.Errorf("iouring LinkedWriteFsync: SQ full (fsync)") + } + sqe2.PrepareFsync(fdInt, giouring.FsyncDatasync) + + _, err := g.ring.SubmitAndWait(2) + if err != nil { + // Fallback to sequential. + if _, werr := fd.WriteAt(buf, offset); werr != nil { + return werr + } + return fdatasync(fd) + } + + // Collect both CQEs. + for i := 0; i < 2; i++ { + cqe, err := g.ring.WaitCQE() + if err != nil { + return fmt.Errorf("iouring LinkedWriteFsync wait op[%d]: %w", i, err) + } + if cqe.Res < 0 { + g.ring.CQESeen(cqe) + return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, errFromRes(cqe.Res)) + } + g.ring.CQESeen(cqe) + } + return nil +} + +func (g *giouringBatchIO) Close() error { + if g.ring != nil { + g.ring.QueueExit() + } + return nil +} + +// errFromRes converts a negative io_uring result code to a Go error. +func errFromRes(res int32) error { + return fmt.Errorf("errno %d", -res) +} diff --git a/weed/storage/blockvol/batchio/iouring_linux.go b/weed/storage/blockvol/batchio/iouring_iceber_linux.go similarity index 99% rename from weed/storage/blockvol/batchio/iouring_linux.go rename to weed/storage/blockvol/batchio/iouring_iceber_linux.go index b927945a6..6b821f5b5 100644 --- a/weed/storage/blockvol/batchio/iouring_linux.go +++ b/weed/storage/blockvol/batchio/iouring_iceber_linux.go @@ -1,4 +1,4 @@ -//go:build linux && !no_iouring +//go:build linux && iouring_iceber package batchio diff --git a/weed/storage/blockvol/batchio/iouring_raw_linux.go b/weed/storage/blockvol/batchio/iouring_raw_linux.go new file mode 100644 index 000000000..d877f762f --- /dev/null +++ b/weed/storage/blockvol/batchio/iouring_raw_linux.go @@ -0,0 +1,444 @@ +//go:build linux && !no_iouring && !iouring_iceber && !iouring_giouring + +package batchio + +import ( + "fmt" + "os" + "sync" + "syscall" + "unsafe" +) + +// Raw io_uring syscall numbers. +const ( + sysIOUringSetup = 425 + sysIOUringEnter = 426 + sysIOUringRegister = 427 +) + +// io_uring opcodes. +const ( + opNop = 0 + opReadv = 1 + opWritev = 2 + opFsync = 3 + opRead = 22 + opWrite = 23 +) + +// io_uring SQE flags. +const ( + sqeFlagIOLink = 1 << 2 +) + +// io_uring fsync flags. +const ( + fsyncDatasync = 1 << 0 +) + +// io_uring_enter flags. +const ( + enterGetEvents = 1 << 0 +) + +// io_uring setup offsets (from kernel include/uapi/linux/io_uring.h). +const ( + offSQDropped = 0 + offSQFlags = 4 + offSQArrayOff = 8 +) + +// sqe is the submission queue entry (64 bytes). +type sqe struct { + opcode uint8 + flags uint8 + ioprio uint16 + fd int32 + off uint64 + addr uint64 + len uint32 + opcFlags uint32 + userData uint64 + bufIG uint16 + personality uint16 + spliceFdIn int32 + addr3 uint64 + _pad [8]byte +} + +// cqe is the completion queue entry (16 bytes). +type cqe struct { + userData uint64 + res int32 + flags uint32 +} + +// ioUringParams is passed to io_uring_setup. +type ioUringParams struct { + sqEntries uint32 + cqEntries uint32 + flags uint32 + sqThreadCPU uint32 + sqThreadIdle uint32 + features uint32 + wqFd uint32 + resv [3]uint32 + sqOff sqRingOffsets + cqOff cqRingOffsets +} + +type sqRingOffsets struct { + head uint32 + tail uint32 + ringMask uint32 + ringEntries uint32 + flags uint32 + dropped uint32 + array uint32 + resv1 uint32 + userAddr uint64 +} + +type cqRingOffsets struct { + head uint32 + tail uint32 + ringMask uint32 + ringEntries uint32 + overflow uint32 + cqes uint32 + flags uint32 + resv1 uint32 + userAddr uint64 +} + +// rawRing is a minimal io_uring ring for batch I/O. +type rawRing struct { + fd int + ringSize int + + // SQ ring mapped memory + sqRingPtr uintptr + sqRingLen int + sqHead *uint32 + sqTail *uint32 + sqMask uint32 + sqArray *uint32 // sqArray[0] through sqArray[entries-1] + + // SQE array + sqePtr uintptr + sqeLen int + sqes *sqe // base of SQE array + + // CQ ring mapped memory + cqRingPtr uintptr + cqRingLen int + cqHead *uint32 + cqTail *uint32 + cqMask uint32 + cqes *cqe // base of CQE array + + mu sync.Mutex // serializes submit+wait cycles +} + +// rawBatchIO implements BatchIO using raw io_uring syscalls. +// No external dependencies. ~200 LOC of direct kernel interaction. +type rawBatchIO struct { + ring *rawRing + ringSize int +} + +// NewIOUring creates a BatchIO backed by raw io_uring syscalls. +// Returns ErrIOUringUnavailable if io_uring cannot be initialized. +func NewIOUring(ringSize uint) (BatchIO, error) { + ring, err := newRawRing(int(ringSize)) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrIOUringUnavailable, err) + } + return &rawBatchIO{ring: ring, ringSize: int(ringSize)}, nil +} + +func newRawRing(entries int) (*rawRing, error) { + var params ioUringParams + fd, _, errno := syscall.Syscall(sysIOUringSetup, uintptr(entries), uintptr(unsafe.Pointer(¶ms)), 0) + if errno != 0 { + return nil, fmt.Errorf("io_uring_setup: %v", errno) + } + + r := &rawRing{ + fd: int(fd), + ringSize: int(params.sqEntries), + } + + // Map SQ ring. + sqRingSize := int(params.sqOff.array + params.sqEntries*4) + sqRingPtr, _, errno := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(sqRingSize), + syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_POPULATE, + fd, 0) + if errno != 0 { + syscall.Close(r.fd) + return nil, fmt.Errorf("mmap sq ring: %v", errno) + } + r.sqRingPtr = sqRingPtr + r.sqRingLen = sqRingSize + r.sqHead = (*uint32)(unsafe.Pointer(sqRingPtr + uintptr(params.sqOff.head))) + r.sqTail = (*uint32)(unsafe.Pointer(sqRingPtr + uintptr(params.sqOff.tail))) + r.sqMask = *(*uint32)(unsafe.Pointer(sqRingPtr + uintptr(params.sqOff.ringMask))) + r.sqArray = (*uint32)(unsafe.Pointer(sqRingPtr + uintptr(params.sqOff.array))) + + // Map SQE array. + sqeSize := int(params.sqEntries) * int(unsafe.Sizeof(sqe{})) + sqePtr, _, errno := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(sqeSize), + syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_POPULATE, + fd, 0x10000000) // IORING_OFF_SQES + if errno != 0 { + syscall.Munmap(unsafeSlice(sqRingPtr, sqRingSize)) + syscall.Close(r.fd) + return nil, fmt.Errorf("mmap sqes: %v", errno) + } + r.sqePtr = sqePtr + r.sqeLen = sqeSize + r.sqes = (*sqe)(unsafe.Pointer(sqePtr)) + + // Map CQ ring. + cqRingSize := int(params.cqOff.cqes + params.cqEntries*uint32(unsafe.Sizeof(cqe{}))) + cqRingPtr, _, errno := syscall.Syscall6(syscall.SYS_MMAP, 0, uintptr(cqRingSize), + syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_POPULATE, + fd, 0x8000000) // IORING_OFF_CQ_RING + if errno != 0 { + syscall.Munmap(unsafeSlice(sqePtr, sqeSize)) + syscall.Munmap(unsafeSlice(sqRingPtr, sqRingSize)) + syscall.Close(r.fd) + return nil, fmt.Errorf("mmap cq ring: %v", errno) + } + r.cqRingPtr = cqRingPtr + r.cqRingLen = cqRingSize + r.cqHead = (*uint32)(unsafe.Pointer(cqRingPtr + uintptr(params.cqOff.head))) + r.cqTail = (*uint32)(unsafe.Pointer(cqRingPtr + uintptr(params.cqOff.tail))) + r.cqMask = *(*uint32)(unsafe.Pointer(cqRingPtr + uintptr(params.cqOff.ringMask))) + r.cqes = (*cqe)(unsafe.Pointer(cqRingPtr + uintptr(params.cqOff.cqes))) + + return r, nil +} + +func (r *rawRing) close() { + syscall.Munmap(unsafeSlice(r.cqRingPtr, r.cqRingLen)) + syscall.Munmap(unsafeSlice(r.sqePtr, r.sqeLen)) + syscall.Munmap(unsafeSlice(r.sqRingPtr, r.sqRingLen)) + syscall.Close(r.fd) +} + +// getSQE returns a pointer to the next SQE slot, or nil if full. +func (r *rawRing) getSQE(idx int) *sqe { + return (*sqe)(unsafe.Pointer(r.sqePtr + uintptr(idx)*unsafe.Sizeof(sqe{}))) +} + +// sqArraySlot returns a pointer to sqArray[idx]. +func (r *rawRing) sqArraySlot(idx int) *uint32 { + return (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(r.sqArray)) + uintptr(idx)*4)) +} + +// getCQE returns a pointer to cqes[idx]. +func (r *rawRing) getCQE(idx uint32) *cqe { + return (*cqe)(unsafe.Pointer(uintptr(unsafe.Pointer(r.cqes)) + uintptr(idx)*unsafe.Sizeof(cqe{}))) +} + +// submitAndWait submits n SQEs and waits for n CQEs. Returns CQE results. +func (r *rawRing) submitAndWait(n int) ([]cqe, error) { + // Set SQ array indices and advance tail. + tail := *r.sqTail + for i := 0; i < n; i++ { + *r.sqArraySlot(int(tail+uint32(i)) & int(r.sqMask)) = (tail + uint32(i)) & r.sqMask + } + // Memory barrier: ensure SQE writes are visible before updating tail. + *r.sqTail = tail + uint32(n) + + // io_uring_enter: submit and wait. + _, _, errno := syscall.Syscall6(sysIOUringEnter, uintptr(r.fd), + uintptr(n), uintptr(n), enterGetEvents, 0, 0) + if errno != 0 { + return nil, fmt.Errorf("io_uring_enter: %v", errno) + } + + // Read CQEs. + results := make([]cqe, n) + head := *r.cqHead + for i := 0; i < n; i++ { + c := r.getCQE(head & r.cqMask) + results[i] = *c + head++ + } + *r.cqHead = head + + return results, nil +} + +func (b *rawBatchIO) PreadBatch(fd *os.File, ops []Op) error { + if len(ops) == 0 { + return nil + } + b.ring.mu.Lock() + defer b.ring.mu.Unlock() + + for start := 0; start < len(ops); start += b.ringSize { + end := start + b.ringSize + if end > len(ops) { + end = len(ops) + } + if err := b.preadChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} + +func (b *rawBatchIO) preadChunk(fd *os.File, ops []Op) error { + fdInt := int(fd.Fd()) + for i := range ops { + s := b.ring.getSQE(i) + *s = sqe{} // zero + s.opcode = opRead + s.fd = int32(fdInt) + s.addr = uint64(uintptr(unsafe.Pointer(&ops[i].Buf[0]))) + s.len = uint32(len(ops[i].Buf)) + s.off = uint64(ops[i].Offset) + s.userData = uint64(i) + } + + results, err := b.ring.submitAndWait(len(ops)) + if err != nil { + return fmt.Errorf("iouring PreadBatch: %w", err) + } + + for _, r := range results { + if r.res < 0 { + return fmt.Errorf("iouring PreadBatch op[%d]: errno %d", r.userData, -r.res) + } + idx := r.userData + if int(r.res) < len(ops[idx].Buf) { + return fmt.Errorf("iouring PreadBatch op[%d]: short read %d/%d", idx, r.res, len(ops[idx].Buf)) + } + } + return nil +} + +func (b *rawBatchIO) PwriteBatch(fd *os.File, ops []Op) error { + if len(ops) == 0 { + return nil + } + b.ring.mu.Lock() + defer b.ring.mu.Unlock() + + for start := 0; start < len(ops); start += b.ringSize { + end := start + b.ringSize + if end > len(ops) { + end = len(ops) + } + if err := b.pwriteChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} + +func (b *rawBatchIO) pwriteChunk(fd *os.File, ops []Op) error { + fdInt := int(fd.Fd()) + for i := range ops { + s := b.ring.getSQE(i) + *s = sqe{} + s.opcode = opWrite + s.fd = int32(fdInt) + s.addr = uint64(uintptr(unsafe.Pointer(&ops[i].Buf[0]))) + s.len = uint32(len(ops[i].Buf)) + s.off = uint64(ops[i].Offset) + s.userData = uint64(i) + } + + results, err := b.ring.submitAndWait(len(ops)) + if err != nil { + return fmt.Errorf("iouring PwriteBatch: %w", err) + } + + for _, r := range results { + if r.res < 0 { + return fmt.Errorf("iouring PwriteBatch op[%d]: errno %d", r.userData, -r.res) + } + idx := r.userData + if int(r.res) < len(ops[idx].Buf) { + return fmt.Errorf("iouring PwriteBatch op[%d]: short write %d/%d", idx, r.res, len(ops[idx].Buf)) + } + } + return nil +} + +func (b *rawBatchIO) Fsync(fd *os.File) error { + b.ring.mu.Lock() + defer b.ring.mu.Unlock() + + s := b.ring.getSQE(0) + *s = sqe{} + s.opcode = opFsync + s.fd = int32(fd.Fd()) + s.opcFlags = fsyncDatasync + + results, err := b.ring.submitAndWait(1) + if err != nil { + return fmt.Errorf("iouring Fsync: %w", err) + } + if results[0].res < 0 { + return fmt.Errorf("iouring Fsync: errno %d", -results[0].res) + } + return nil +} + +func (b *rawBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error { + b.ring.mu.Lock() + defer b.ring.mu.Unlock() + + fdInt := int32(fd.Fd()) + + // SQE 0: pwrite with IO_LINK + s0 := b.ring.getSQE(0) + *s0 = sqe{} + s0.opcode = opWrite + s0.flags = sqeFlagIOLink + s0.fd = fdInt + s0.addr = uint64(uintptr(unsafe.Pointer(&buf[0]))) + s0.len = uint32(len(buf)) + s0.off = uint64(offset) + + // SQE 1: fdatasync + s1 := b.ring.getSQE(1) + *s1 = sqe{} + s1.opcode = opFsync + s1.fd = fdInt + s1.opcFlags = fsyncDatasync + + results, err := b.ring.submitAndWait(2) + if err != nil { + // Fallback to sequential. + if _, werr := fd.WriteAt(buf, offset); werr != nil { + return werr + } + return fdatasync(fd) + } + + for i, r := range results { + if r.res < 0 { + return fmt.Errorf("iouring LinkedWriteFsync op[%d]: errno %d", i, -r.res) + } + } + return nil +} + +func (b *rawBatchIO) Close() error { + if b.ring != nil { + b.ring.close() + } + return nil +} + +// unsafeSlice creates a byte slice from a pointer and length for munmap. +func unsafeSlice(ptr uintptr, length int) []byte { + return unsafe.Slice((*byte)(unsafe.Pointer(ptr)), length) +}