#!/usr/bin/env python3 import os import sys import tempfile from posix_parity import compare_calls from posix_parity import fail from posix_parity import join from posix_parity import touch def main(): if len(sys.argv) != 2: print("usage: TEST_posix_copy_file_range ", file=sys.stderr) return 1 if not hasattr(os, "copy_file_range"): return 0 mount = sys.argv[1] with tempfile.TemporaryDirectory() as native: merge_src = join(mount, "posix-cfr/src") merge_dst = join(mount, "posix-cfr/dst") native_src = join(native, "posix-cfr/src") native_dst = join(native, "posix-cfr/dst") payload = b"0123456789abcdefghijklmnopqrstuvwxyz" touch(merge_src, payload) touch(native_src, payload) touch(merge_dst, b"") touch(native_dst, b"") msfd = os.open(merge_src, os.O_RDONLY) mdfd = os.open(merge_dst, os.O_WRONLY) nsfd = os.open(native_src, os.O_RDONLY) ndfd = os.open(native_dst, os.O_WRONLY) try: err = compare_calls( "copy_file_range success", lambda: os.copy_file_range(msfd, mdfd, 16), lambda: os.copy_file_range(nsfd, ndfd, 16), lambda a, b: a == b, ) if err: return fail(err) finally: os.close(msfd) os.close(mdfd) os.close(nsfd) os.close(ndfd) with open(merge_dst, "rb") as mf, open(native_dst, "rb") as nf: mdata = mf.read() ndata = nf.read() if mdata != ndata: return fail(f"copy_file_range dst mismatch mergerfs={mdata!r} native={ndata!r}") mdfd = os.open(merge_dst, os.O_WRONLY) ndfd = os.open(native_dst, os.O_WRONLY) try: err = compare_calls( "copy_file_range EBADF src", lambda: os.copy_file_range(-1, mdfd, 1), lambda: os.copy_file_range(-1, ndfd, 1), ) if err: return fail(err) finally: os.close(mdfd) os.close(ndfd) # Exercise very large copy lengths to validate the 64-bit copy result # path when available. Use sparse files to keep runtime and disk usage low. large_len = (5 << 30) + 4096 merge_large_src = join(mount, "posix-cfr/src-large") merge_large_dst = join(mount, "posix-cfr/dst-large") native_large_src = join(native, "posix-cfr/src-large") native_large_dst = join(native, "posix-cfr/dst-large") touch(merge_large_src, b"") touch(merge_large_dst, b"") touch(native_large_src, b"") touch(native_large_dst, b"") msfd = os.open(merge_large_src, os.O_RDWR) mdfd = os.open(merge_large_dst, os.O_RDWR) nsfd = os.open(native_large_src, os.O_RDWR) ndfd = os.open(native_large_dst, os.O_RDWR) try: # Make sparse files with one non-zero byte at the end so both sides # have comparable logical contents. os.ftruncate(msfd, large_len) os.ftruncate(nsfd, large_len) os.pwrite(msfd, b"z", large_len - 1) os.pwrite(nsfd, b"z", large_len - 1) err = compare_calls( "copy_file_range large sparse", lambda: os.copy_file_range(msfd, mdfd, large_len), lambda: os.copy_file_range(nsfd, ndfd, large_len), lambda a, b: a == b, ) if err: return fail(err) mstat = os.fstat(mdfd) nstat = os.fstat(ndfd) if mstat.st_size != nstat.st_size: return fail( "copy_file_range large sparse dst size mismatch " f"mergerfs={mstat.st_size} native={nstat.st_size}" ) finally: os.close(msfd) os.close(mdfd) os.close(nsfd) os.close(ndfd) return 0 if __name__ == "__main__": raise SystemExit(main())