#!/usr/bin/env python3 import fcntl import os import struct import sys import tempfile from posix_parity import compare_calls from posix_parity import fail from posix_parity import join from posix_parity import touch def pack_flock(l_type, l_whence=0, l_start=0, l_len=0, l_pid=0): # struct flock (common glibc x86_64 layout) return struct.pack("hhqqi", l_type, l_whence, l_start, l_len, l_pid) def main(): if len(sys.argv) != 2: print("usage: TEST_posix_locking ", file=sys.stderr) return 1 mount = sys.argv[1] with tempfile.TemporaryDirectory() as native: merge_file = join(mount, "posix-locking/file") native_file = join(native, "posix-locking/file") touch(merge_file, b"x") touch(native_file, b"x") mfd1 = os.open(merge_file, os.O_RDWR) mfd2 = os.open(merge_file, os.O_RDWR) nfd1 = os.open(native_file, os.O_RDWR) nfd2 = os.open(native_file, os.O_RDWR) try: err = compare_calls( "flock EX nonblock", lambda: fcntl.flock(mfd1, fcntl.LOCK_EX | fcntl.LOCK_NB), lambda: fcntl.flock(nfd1, fcntl.LOCK_EX | fcntl.LOCK_NB), ) if err: return fail(err) err = compare_calls( "flock SH nonblock second fd", lambda: fcntl.flock(mfd2, fcntl.LOCK_SH | fcntl.LOCK_NB), lambda: fcntl.flock(nfd2, fcntl.LOCK_SH | fcntl.LOCK_NB), ) if err: return fail(err) err = compare_calls("flock unlock", lambda: fcntl.flock(mfd1, fcntl.LOCK_UN), lambda: fcntl.flock(nfd1, fcntl.LOCK_UN)) if err: return fail(err) err = compare_calls("flock unlock second fd", lambda: fcntl.flock(mfd2, fcntl.LOCK_UN), lambda: fcntl.flock(nfd2, fcntl.LOCK_UN)) if err: return fail(err) # POSIX record lock via fcntl wrlk = pack_flock(fcntl.F_WRLCK) unlck = pack_flock(fcntl.F_UNLCK) err = compare_calls("fcntl F_SETLK write lock", lambda: fcntl.fcntl(mfd1, fcntl.F_SETLK, wrlk), lambda: fcntl.fcntl(nfd1, fcntl.F_SETLK, wrlk)) if err: return fail(err) err = compare_calls("fcntl F_SETLK write lock second fd", lambda: fcntl.fcntl(mfd2, fcntl.F_SETLK, wrlk), lambda: fcntl.fcntl(nfd2, fcntl.F_SETLK, wrlk)) if err: return fail(err) err = compare_calls("fcntl F_SETLK unlock", lambda: fcntl.fcntl(mfd1, fcntl.F_SETLK, unlck), lambda: fcntl.fcntl(nfd1, fcntl.F_SETLK, unlck)) if err: return fail(err) bad_m = os.open(merge_file, os.O_RDONLY) bad_n = os.open(native_file, os.O_RDONLY) os.close(bad_m) os.close(bad_n) err = compare_calls( "fcntl EBADF", lambda: fcntl.fcntl(bad_m, fcntl.F_SETLK, wrlk), lambda: fcntl.fcntl(bad_n, fcntl.F_SETLK, wrlk), ) if err: return fail(err) finally: os.close(mfd1) os.close(mfd2) os.close(nfd1) os.close(nfd2) return 0 if __name__ == "__main__": raise SystemExit(main())