#!/usr/bin/env python3 import os import sys import tempfile import errno from posix_parity import compare_calls from posix_parity import fail from posix_parity import join from posix_parity import touch def errno_name(err): if err is None: return "None" return errno.errorcode.get(err, str(err)) def format_oserror(prefix, exc): return f"{prefix}: errno={exc.errno}({errno_name(exc.errno)}) msg={exc.strerror}" def has_attr(name): def _cmp(lhs, rhs): return (name in lhs) == (name in rhs) return _cmp def main(): if len(sys.argv) != 2: print("usage: TEST_posix_xattr ", file=sys.stderr) return 1 mount = sys.argv[1] xname = "user.mergerfs.posix" xvalue = b"parity-check" with tempfile.TemporaryDirectory() as native: merge_file = join(mount, "posix-xattr/file") native_file = join(native, "posix-xattr/file") merge_missing = join(mount, "posix-xattr/missing") native_missing = join(native, "posix-xattr/missing") merge_notdir = join(mount, "posix-xattr/notdir") native_notdir = join(native, "posix-xattr/notdir") touch(merge_file, b"x") touch(native_file, b"x") touch(merge_notdir, b"x") touch(native_notdir, b"x") err = compare_calls( "setxattr success", lambda: os.setxattr(merge_file, xname, xvalue), lambda: os.setxattr(native_file, xname, xvalue), ) if err: try: os.setxattr(merge_file, xname, xvalue) except OSError as exc: err += " | " + format_oserror("mergerfs setxattr", exc) try: os.setxattr(native_file, xname, xvalue) except OSError as exc: err += " | " + format_oserror("native setxattr", exc) return fail(err) err = compare_calls( "getxattr success", lambda: os.getxattr(merge_file, xname), lambda: os.getxattr(native_file, xname), lambda lhs, rhs: lhs == rhs, ) if err: try: os.getxattr(merge_file, xname) except OSError as exc: err += " | " + format_oserror("mergerfs getxattr", exc) try: os.getxattr(native_file, xname) except OSError as exc: err += " | " + format_oserror("native getxattr", exc) return fail(err) err = compare_calls( "listxattr includes key", lambda: os.listxattr(merge_file), lambda: os.listxattr(native_file), has_attr(xname), ) if err: try: m_list = os.listxattr(merge_file) err += f" | mergerfs list={m_list!r}" except OSError as exc: err += " | " + format_oserror("mergerfs listxattr", exc) try: n_list = os.listxattr(native_file) err += f" | native list={n_list!r}" except OSError as exc: err += " | " + format_oserror("native listxattr", exc) return fail(err) err = compare_calls( "removexattr success", lambda: os.removexattr(merge_file, xname), lambda: os.removexattr(native_file, xname), ) if err: try: os.removexattr(merge_file, xname) except OSError as exc: err += " | " + format_oserror("mergerfs removexattr", exc) try: os.removexattr(native_file, xname) except OSError as exc: err += " | " + format_oserror("native removexattr", exc) return fail(err) err = compare_calls( "getxattr missing attr", lambda: os.getxattr(merge_file, xname), lambda: os.getxattr(native_file, xname), ) if err: try: os.getxattr(merge_file, xname) except OSError as exc: err += " | " + format_oserror("mergerfs getxattr missing", exc) try: os.getxattr(native_file, xname) except OSError as exc: err += " | " + format_oserror("native getxattr missing", exc) return fail(err) err = compare_calls( "setxattr ENOENT", lambda: os.setxattr(merge_missing, xname, xvalue), lambda: os.setxattr(native_missing, xname, xvalue), ) if err: return fail(err) err = compare_calls( "getxattr ENOTDIR", lambda: os.getxattr(join(merge_notdir, "child"), xname), lambda: os.getxattr(join(native_notdir, "child"), xname), ) if err: return fail(err) return 0 if __name__ == "__main__": raise SystemExit(main())