diff --git a/weed/storage/blockvol/batchio/batchio.go b/weed/storage/blockvol/batchio/batchio.go index b7dcf25d6..693385a98 100644 --- a/weed/storage/blockvol/batchio/batchio.go +++ b/weed/storage/blockvol/batchio/batchio.go @@ -33,12 +33,15 @@ type BatchIO interface { // Op.Offset. Returns the first error encountered. PwriteBatch(fd *os.File, ops []Op) error - // Fsync issues fdatasync on the file. + // Fsync issues fdatasync on the file, flushing data to disk without + // updating file metadata (mtime, size). On non-Linux platforms where + // fdatasync is unavailable, falls back to fsync. Both backends use + // identical sync semantics. Fsync(fd *os.File) error - // LinkedWriteFsync writes buf at offset then fsyncs, as an atomic pair. - // On io_uring this is a linked SQE chain (one syscall). - // On standard, this is sequential write + fdatasync. + // LinkedWriteFsync writes buf at offset then issues fdatasync as a pair. + // On io_uring this is a linked SQE chain (one io_uring_enter syscall). + // On standard, this is sequential pwrite + fdatasync. LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error // Close releases resources (io_uring ring, etc). No-op for standard. diff --git a/weed/storage/blockvol/batchio/fdatasync_linux.go b/weed/storage/blockvol/batchio/fdatasync_linux.go new file mode 100644 index 000000000..1ccd2c96a --- /dev/null +++ b/weed/storage/blockvol/batchio/fdatasync_linux.go @@ -0,0 +1,14 @@ +//go:build linux + +package batchio + +import ( + "os" + "syscall" +) + +// fdatasync flushes file data to disk without updating metadata (mtime, size). +// On Linux, this uses the fdatasync(2) syscall directly. +func fdatasync(fd *os.File) error { + return syscall.Fdatasync(int(fd.Fd())) +} diff --git a/weed/storage/blockvol/batchio/fdatasync_other.go b/weed/storage/blockvol/batchio/fdatasync_other.go new file mode 100644 index 000000000..167717b19 --- /dev/null +++ b/weed/storage/blockvol/batchio/fdatasync_other.go @@ -0,0 +1,11 @@ +//go:build !linux + +package batchio + +import "os" + +// fdatasync flushes file data to disk. On non-Linux platforms, fdatasync is +// not available, so this falls back to fsync via os.File.Sync(). +func fdatasync(fd *os.File) error { + return fd.Sync() +} diff --git a/weed/storage/blockvol/batchio/iouring_linux.go b/weed/storage/blockvol/batchio/iouring_linux.go index 9f24f13e0..abb53e135 100644 --- a/weed/storage/blockvol/batchio/iouring_linux.go +++ b/weed/storage/blockvol/batchio/iouring_linux.go @@ -5,7 +5,6 @@ package batchio import ( "fmt" "os" - "syscall" "github.com/iceber/iouring-go" ) @@ -13,7 +12,8 @@ import ( // ioUringBatchIO implements BatchIO using Linux io_uring. // Requires kernel 5.6+ (linked fsync: 5.10+). type ioUringBatchIO struct { - ring *iouring.IOURing + ring *iouring.IOURing + ringSize int // max SQEs per submission } // NewIOUring creates a BatchIO backed by io_uring with the given ring size. @@ -25,17 +25,32 @@ func NewIOUring(ringSize uint) (BatchIO, error) { // Kernel doesn't support io_uring — fall back silently. return NewStandard(), nil } - return &ioUringBatchIO{ring: ring}, nil + return &ioUringBatchIO{ring: ring, ringSize: int(ringSize)}, nil } func (u *ioUringBatchIO) PreadBatch(fd *os.File, ops []Op) error { if len(ops) == 0 { return nil } + // Submit in chunks that fit the ring to avoid "too many requests" rejection. + for start := 0; start < len(ops); start += u.ringSize { + end := start + u.ringSize + if end > len(ops) { + end = len(ops) + } + if err := u.preadChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} +func (u *ioUringBatchIO) preadChunk(fd *os.File, ops []Op) error { requests := make([]iouring.PrepRequest, len(ops)) + // fd.Fd() is safe: the os.File stays live through this call. + fdInt := int(fd.Fd()) for i := range ops { - requests[i] = iouring.Pread(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset)) + requests[i] = iouring.Pread(fdInt, ops[i].Buf, uint64(ops[i].Offset)) } results, err := u.ring.SubmitRequests(requests, nil) @@ -43,7 +58,6 @@ func (u *ioUringBatchIO) PreadBatch(fd *os.File, ops []Op) error { return fmt.Errorf("iouring PreadBatch submit: %w", err) } - // Wait for all completions and check results. for i, res := range results { <-res.Done() n, err := res.ReturnInt() @@ -61,10 +75,24 @@ func (u *ioUringBatchIO) PwriteBatch(fd *os.File, ops []Op) error { if len(ops) == 0 { return nil } + // Submit in chunks that fit the ring to avoid "too many requests" rejection. + for start := 0; start < len(ops); start += u.ringSize { + end := start + u.ringSize + if end > len(ops) { + end = len(ops) + } + if err := u.pwriteChunk(fd, ops[start:end]); err != nil { + return err + } + } + return nil +} +func (u *ioUringBatchIO) pwriteChunk(fd *os.File, ops []Op) error { requests := make([]iouring.PrepRequest, len(ops)) + fdInt := int(fd.Fd()) for i := range ops { - requests[i] = iouring.Pwrite(int(fd.Fd()), ops[i].Buf, uint64(ops[i].Offset)) + requests[i] = iouring.Pwrite(fdInt, ops[i].Buf, uint64(ops[i].Offset)) } results, err := u.ring.SubmitRequests(requests, nil) @@ -85,6 +113,10 @@ func (u *ioUringBatchIO) PwriteBatch(fd *os.File, ops []Op) error { return nil } +// Fsync issues fdatasync via io_uring. fdatasync flushes data to disk without +// updating file metadata (size, mtime). This is sufficient for pre-allocated +// extent files where metadata does not change during writes, and matches the +// standard path behavior (see standard_linux.go). func (u *ioUringBatchIO) Fsync(fd *os.File) error { req := iouring.Fdatasync(int(fd.Fd())) result, err := u.ring.SubmitRequest(req, nil) @@ -99,34 +131,33 @@ func (u *ioUringBatchIO) Fsync(fd *os.File) error { return nil } +// LinkedWriteFsync submits a pwrite + fdatasync as a linked SQE chain. +// The kernel executes them in order in a single io_uring_enter() call. +// Requires kernel 5.10+ for linked fsync. Falls back to sequential on error. func (u *ioUringBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error { - // Try linked SQE chain: pwrite → fdatasync (one io_uring_enter). - // This requires IOSQE_IO_LINK support (kernel 5.3+) and linked fsync (5.10+). - // If the ring doesn't support it, fall back to sequential. - writeReq := iouring.Pwrite(int(fd.Fd()), buf, uint64(offset)) - fsyncReq := iouring.Fdatasync(int(fd.Fd())) - - // SubmitRequests with linked flag. - results, err := u.ring.SubmitRequests( + fdInt := int(fd.Fd()) + writeReq := iouring.Pwrite(fdInt, buf, uint64(offset)) + fsyncReq := iouring.Fdatasync(fdInt) + + // SubmitLinkRequests sets IOSQE_IO_LINK on all SQEs except the last, + // ensuring the fdatasync executes only after the pwrite completes. + results, err := u.ring.SubmitLinkRequests( []iouring.PrepRequest{writeReq, fsyncReq}, nil, ) if err != nil { - // Fallback to sequential if linking not supported. - if errno, ok := err.(syscall.Errno); ok && errno == syscall.EINVAL { - if _, werr := fd.WriteAt(buf, offset); werr != nil { - return werr - } - return fd.Sync() + // Fallback to sequential if linked submission fails. + if _, werr := fd.WriteAt(buf, offset); werr != nil { + return werr } - return fmt.Errorf("iouring LinkedWriteFsync submit: %w", err) + return fdatasync(fd) } for i, res := range results { <-res.Done() - _, err := res.ReturnInt() - if err != nil { - return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, err) + _, rerr := res.ReturnInt() + if rerr != nil { + return fmt.Errorf("iouring LinkedWriteFsync op[%d]: %w", i, rerr) } } return nil diff --git a/weed/storage/blockvol/batchio/standard.go b/weed/storage/blockvol/batchio/standard.go index f6e2543ff..d2b4b5210 100644 --- a/weed/storage/blockvol/batchio/standard.go +++ b/weed/storage/blockvol/batchio/standard.go @@ -3,7 +3,7 @@ package batchio import "os" // standardBatchIO implements BatchIO with sequential os.File calls. -// This is functionally identical to calling ReadAt/WriteAt/Sync directly. +// This is functionally identical to calling ReadAt/WriteAt/fdatasync directly. type standardBatchIO struct{} // NewStandard returns a BatchIO that uses sequential pread/pwrite/fdatasync. @@ -30,15 +30,18 @@ func (s *standardBatchIO) PwriteBatch(fd *os.File, ops []Op) error { return nil } +// Fsync issues fdatasync to flush data to disk. Uses fdatasync(2) on Linux +// for parity with the io_uring path. Falls back to fsync on other platforms. func (s *standardBatchIO) Fsync(fd *os.File) error { - return fd.Sync() + return fdatasync(fd) } +// LinkedWriteFsync writes buf at offset then issues fdatasync, sequentially. func (s *standardBatchIO) LinkedWriteFsync(fd *os.File, buf []byte, offset int64) error { if _, err := fd.WriteAt(buf, offset); err != nil { return err } - return fd.Sync() + return fdatasync(fd) } func (s *standardBatchIO) Close() error { diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index f4a2f8e32..d32812b37 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -1127,8 +1127,9 @@ func (v *BlockVol) Close() error { } var flushErr error if v.flusher != nil { - v.flusher.Stop() // stop background goroutine first (no concurrent flush) + v.flusher.Stop() // stop background goroutine first (no concurrent flush) flushErr = v.flusher.FlushOnce() // then do final flush safely + v.flusher.CloseBatchIO() // release io_uring ring / kernel resources } // Close snapshot delta files. diff --git a/weed/storage/blockvol/flusher.go b/weed/storage/blockvol/flusher.go index 8ed79ad82..5ecc6ab98 100644 --- a/weed/storage/blockvol/flusher.go +++ b/weed/storage/blockvol/flusher.go @@ -434,6 +434,15 @@ func (f *Flusher) CheckpointLSN() uint64 { return f.checkpointLSN } +// CloseBatchIO releases the batch I/O backend resources (e.g. io_uring ring). +// Must be called after Stop() and the final FlushOnce(). +func (f *Flusher) CloseBatchIO() error { + if f.bio != nil { + return f.bio.Close() + } + return nil +} + // SetFD replaces the file descriptor used for extent writes. Test-only. func (f *Flusher) SetFD(fd *os.File) { f.mu.Lock()