Browse Source

fix: BatchIO review fixes — linked SQE, ring overflow, resource leak, sync parity

1. HIGH: LinkedWriteFsync now uses SubmitLinkRequests (IOSQE_IO_LINK)
   instead of SubmitRequests, ensuring write+fdatasync execute as a
   linked chain in the kernel. Falls back to sequential on error.

2. HIGH: PreadBatch/PwriteBatch chunk ops by ring capacity to prevent
   "too many requests" rejection when dirty map exceeds ring size (256).

3. MED: CloseBatchIO() added to Flusher, called in BlockVol.Close()
   after final flush to release io_uring ring / kernel resources.

4. MED: Sync parity — both standard and io_uring paths now use
   fdatasync (via platform-specific fdatasync_linux.go / fdatasync_other.go).
   Standard path previously used fsync; now matches io_uring semantics.
   On non-Linux, fdatasync falls back to fsync (only option available).

10 batchio tests, all blockvol tests pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 21 hours ago
parent
commit
66d5ba0a84
  1. 11
      weed/storage/blockvol/batchio/batchio.go
  2. 14
      weed/storage/blockvol/batchio/fdatasync_linux.go
  3. 11
      weed/storage/blockvol/batchio/fdatasync_other.go
  4. 79
      weed/storage/blockvol/batchio/iouring_linux.go
  5. 9
      weed/storage/blockvol/batchio/standard.go
  6. 3
      weed/storage/blockvol/blockvol.go
  7. 9
      weed/storage/blockvol/flusher.go

11
weed/storage/blockvol/batchio/batchio.go

@ -33,12 +33,15 @@ type BatchIO interface {
// Op.Offset. Returns the first error encountered.
PwriteBatch(fd *os.File, ops []Op) error
// Fsync issues fdatasync on the file.
// Fsync issues fdatasync on the file, flushing data to disk without
// updating file metadata (mtime, size). On non-Linux platforms where
// fdatasync is unavailable, falls back to fsync. Both backends use
// identical sync semantics.
Fsync(fd *os.File) error
// LinkedWriteFsync writes buf at offset then fsyncs, as an atomic pair.
// On io_uring this is a linked SQE chain (one syscall).
// On standard, this is sequential write + fdatasync.
// LinkedWriteFsync writes buf at offset then issues fdatasync as a pair.
// On io_uring this is a linked SQE chain (one io_uring_enter syscall).
// On standard, this is sequential pwrite + fdatasync.
LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error
// Close releases resources (io_uring ring, etc). No-op for standard.

14
weed/storage/blockvol/batchio/fdatasync_linux.go

@ -0,0 +1,14 @@
//go:build linux
package batchio
import (
"os"
"syscall"
)
// fdatasync flushes file data to disk without updating metadata (mtime, size).
// On Linux, this uses the fdatasync(2) syscall directly.
func fdatasync(fd *os.File) error {
return syscall.Fdatasync(int(fd.Fd()))
}

11
weed/storage/blockvol/batchio/fdatasync_other.go

@ -0,0 +1,11 @@
//go:build !linux
package batchio
import "os"
// fdatasync flushes file data to disk. On non-Linux platforms, fdatasync is
// not available, so this falls back to fsync via os.File.Sync().
func fdatasync(fd *os.File) error {
return fd.Sync()
}

79
weed/storage/blockvol/batchio/iouring_linux.go

@ -5,7 +5,6 @@ package batchio
import (
"fmt"
"os"
"syscall"
"github.com/iceber/iouring-go"
)
@ -13,7 +12,8 @@ import (
// ioUringBatchIO implements BatchIO using Linux io_uring.
// Requires kernel 5.6+ (linked fsync: 5.10+).
type ioUringBatchIO struct {
ring *iouring.IOURing
ring *iouring.IOURing
ringSize int // max SQEs per submission
}
// NewIOUring creates a BatchIO backed by io_uring with the given ring size.
@ -25,17 +25,32 @@ func NewIOUring(ringSize uint) (BatchIO, error) {
// Kernel doesn't support io_uring — fall back silently.
return NewStandard(), nil
}
return &ioUringBatchIO{ring: ring}, nil
return &ioUringBatchIO{ring: ring, ringSize: int(ringSize)}, nil
}
func (u *ioUringBatchIO) PreadBatch(fd *os.File, ops []Op) error {
if len(ops) == 0 {
return nil
}
// Submit in chunks that fit the ring to avoid "too many requests" rejection.
for start := 0; start < len(ops); start += u.ringSize {
end := start + u.ringSize
if end > len(ops) {
end = len(ops)
}
if err := u.preadChunk(fd, ops[start:end]); err != nil {
return err
}
}
return nil
}
func (u *ioUringBatchIO) preadChunk(fd *os.File, ops []Op) error {
requests := make([]iouring.PrepRequest, len(ops))
// fd.Fd() is safe: the os.File stays live through this call.
fdInt := int(fd.Fd())
for i := range ops {
requests[i] = iouring.Pread(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset))
requests[i] = iouring.Pread(fdInt, ops[i].Buf, uint64(ops[i].Offset))
}
results, err := u.ring.SubmitRequests(requests, nil)
@ -43,7 +58,6 @@ func (u *ioUringBatchIO) PreadBatch(fd *os.File, ops []Op) error {
return fmt.Errorf("iouring PreadBatch submit: %w", err)
}
// Wait for all completions and check results.
for i, res := range results {
<-res.Done()
n, err := res.ReturnInt()
@ -61,10 +75,24 @@ func (u *ioUringBatchIO) PwriteBatch(fd *os.File, ops []Op) error {
if len(ops) == 0 {
return nil
}
// Submit in chunks that fit the ring to avoid "too many requests" rejection.
for start := 0; start < len(ops); start += u.ringSize {
end := start + u.ringSize
if end > len(ops) {
end = len(ops)
}
if err := u.pwriteChunk(fd, ops[start:end]); err != nil {
return err
}
}
return nil
}
func (u *ioUringBatchIO) pwriteChunk(fd *os.File, ops []Op) error {
requests := make([]iouring.PrepRequest, len(ops))
fdInt := int(fd.Fd())
for i := range ops {
requests[i] = iouring.Pwrite(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset))
requests[i] = iouring.Pwrite(fdInt, ops[i].Buf, uint64(ops[i].Offset))
}
results, err := u.ring.SubmitRequests(requests, nil)
@ -85,6 +113,10 @@ func (u *ioUringBatchIO) PwriteBatch(fd *os.File, ops []Op) error {
return nil
}
// Fsync issues fdatasync via io_uring. fdatasync flushes data to disk without
// updating file metadata (size, mtime). This is sufficient for pre-allocated
// extent files where metadata does not change during writes, and matches the
// standard path behavior (see standard_linux.go).
func (u *ioUringBatchIO) Fsync(fd *os.File) error {
req := iouring.Fdatasync(int(fd.Fd()))
result, err := u.ring.SubmitRequest(req, nil)
@ -99,34 +131,33 @@ func (u *ioUringBatchIO) Fsync(fd *os.File) error {
return nil
}
// LinkedWriteFsync submits a pwrite + fdatasync as a linked SQE chain.
// The kernel executes them in order in a single io_uring_enter() call.
// Requires kernel 5.10+ for linked fsync. Falls back to sequential on error.
func (u *ioUringBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error {
// Try linked SQE chain: pwrite → fdatasync (one io_uring_enter).
// This requires IOSQE_IO_LINK support (kernel 5.3+) and linked fsync (5.10+).
// If the ring doesn't support it, fall back to sequential.
writeReq := iouring.Pwrite(int(fd.Fd()), buf, uint64(offset))
fsyncReq := iouring.Fdatasync(int(fd.Fd()))
// SubmitRequests with linked flag.
results, err := u.ring.SubmitRequests(
fdInt := int(fd.Fd())
writeReq := iouring.Pwrite(fdInt, buf, uint64(offset))
fsyncReq := iouring.Fdatasync(fdInt)
// SubmitLinkRequests sets IOSQE_IO_LINK on all SQEs except the last,
// ensuring the fdatasync executes only after the pwrite completes.
results, err := u.ring.SubmitLinkRequests(
[]iouring.PrepRequest{writeReq, fsyncReq},
nil,
)
if err != nil {
// Fallback to sequential if linking not supported.
if errno, ok := err.(syscall.Errno); ok && errno == syscall.EINVAL {
if _, werr := fd.WriteAt(buf, offset); werr != nil {
return werr
}
return fd.Sync()
// Fallback to sequential if linked submission fails.
if _, werr := fd.WriteAt(buf, offset); werr != nil {
return werr
}
return fmt.Errorf("iouring LinkedWriteFsync submit: %w", err)
return fdatasync(fd)
}
for i, res := range results {
<-res.Done()
_, err := res.ReturnInt()
if err != nil {
return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, err)
_, rerr := res.ReturnInt()
if rerr != nil {
return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, rerr)
}
}
return nil

9
weed/storage/blockvol/batchio/standard.go

@ -3,7 +3,7 @@ package batchio
import "os"
// standardBatchIO implements BatchIO with sequential os.File calls.
// This is functionally identical to calling ReadAt/WriteAt/Sync directly.
// This is functionally identical to calling ReadAt/WriteAt/fdatasync directly.
type standardBatchIO struct{}
// NewStandard returns a BatchIO that uses sequential pread/pwrite/fdatasync.
@ -30,15 +30,18 @@ func (s *standardBatchIO) PwriteBatch(fd *os.File, ops []Op) error {
return nil
}
// Fsync issues fdatasync to flush data to disk. Uses fdatasync(2) on Linux
// for parity with the io_uring path. Falls back to fsync on other platforms.
func (s *standardBatchIO) Fsync(fd *os.File) error {
return fd.Sync()
return fdatasync(fd)
}
// LinkedWriteFsync writes buf at offset then issues fdatasync, sequentially.
func (s *standardBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error {
if _, err := fd.WriteAt(buf, offset); err != nil {
return err
}
return fd.Sync()
return fdatasync(fd)
}
func (s *standardBatchIO) Close() error {

3
weed/storage/blockvol/blockvol.go

@ -1127,8 +1127,9 @@ func (v *BlockVol) Close() error {
}
var flushErr error
if v.flusher != nil {
v.flusher.Stop() // stop background goroutine first (no concurrent flush)
v.flusher.Stop() // stop background goroutine first (no concurrent flush)
flushErr = v.flusher.FlushOnce() // then do final flush safely
v.flusher.CloseBatchIO() // release io_uring ring / kernel resources
}
// Close snapshot delta files.

9
weed/storage/blockvol/flusher.go

@ -434,6 +434,15 @@ func (f *Flusher) CheckpointLSN() uint64 {
return f.checkpointLSN
}
// CloseBatchIO releases the batch I/O backend resources (e.g. io_uring ring).
// Must be called after Stop() and the final FlushOnce().
func (f *Flusher) CloseBatchIO() error {
if f.bio != nil {
return f.bio.Close()
}
return nil
}
// SetFD replaces the file descriptor used for extent writes. Test-only.
func (f *Flusher) SetFD(fd *os.File) {
f.mu.Lock()

Loading…
Cancel
Save