You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

170 lines
4.8 KiB

#!/usr/bin/env python3
import ctypes
import os
import sys
import tempfile
from posix_parity import compare_calls
from posix_parity import fail
from posix_parity import join
from posix_parity import touch
class Dirent(ctypes.Structure):
_fields_ = [
("d_ino", ctypes.c_ulong),
("d_off", ctypes.c_long),
("d_reclen", ctypes.c_ushort),
("d_type", ctypes.c_ubyte),
("d_name", ctypes.c_char * 256),
]
libc = ctypes.CDLL(None, use_errno=True)
libc.opendir.argtypes = [ctypes.c_char_p]
libc.opendir.restype = ctypes.c_void_p
libc.readdir.argtypes = [ctypes.c_void_p]
libc.readdir.restype = ctypes.POINTER(Dirent)
libc.telldir.argtypes = [ctypes.c_void_p]
libc.telldir.restype = ctypes.c_long
libc.seekdir.argtypes = [ctypes.c_void_p, ctypes.c_long]
libc.seekdir.restype = None
libc.rewinddir.argtypes = [ctypes.c_void_p]
libc.rewinddir.restype = None
libc.closedir.argtypes = [ctypes.c_void_p]
libc.closedir.restype = ctypes.c_int
def opendir_or_raise(path):
ctypes.set_errno(0)
dp = libc.opendir(path.encode())
if not dp:
err = ctypes.get_errno()
raise OSError(err, os.strerror(err), path)
return dp
def decode_name(ent):
return bytes(ent.d_name).split(b"\0", 1)[0].decode("utf-8", errors="surrogateescape")
def read_all_names(dp):
names = []
while True:
entp = libc.readdir(dp)
if not entp:
break
names.append(decode_name(entp.contents))
return names
def next_non_dot(dp):
while True:
entp = libc.readdir(dp)
if not entp:
return None
name = decode_name(entp.contents)
if name not in (".", ".."):
return name
def names_set(path):
dp = opendir_or_raise(path)
try:
return set(n for n in read_all_names(dp) if n not in (".", ".."))
finally:
libc.closedir(dp)
def seek_tell_consistent(path):
dp = opendir_or_raise(path)
try:
_first = next_non_dot(dp)
if _first is None:
return True
pos = libc.telldir(dp)
second = next_non_dot(dp)
if second is None:
return True
libc.seekdir(dp, pos)
again = next_non_dot(dp)
return second == again
finally:
libc.closedir(dp)
def deleted_visibility_signature(dirpath, victim_path):
dp = opendir_or_raise(dirpath)
try:
before = set(n for n in read_all_names(dp) if n not in (".", ".."))
os.unlink(victim_path)
libc.rewinddir(dp)
after = set(n for n in read_all_names(dp) if n not in (".", ".."))
return ("victim" in before, "victim" in after)
finally:
libc.closedir(dp)
def main():
if len(sys.argv) != 2:
print("usage: TEST_posix_readdir <mountpoint>", file=sys.stderr)
return 1
mount = sys.argv[1]
with tempfile.TemporaryDirectory() as native:
merge_dir = join(mount, "posix-readdir/dir")
native_dir = join(native, "posix-readdir/dir")
merge_notdir = join(mount, "posix-readdir/notdir")
native_notdir = join(native, "posix-readdir/notdir")
for idx in range(8):
touch(join(merge_dir, f"f{idx}"), b"x")
touch(join(native_dir, f"f{idx}"), b"x")
touch(merge_notdir, b"x")
touch(native_notdir, b"x")
err = compare_calls(
"opendir ENOENT",
lambda: opendir_or_raise(join(mount, "posix-readdir/missing")),
lambda: opendir_or_raise(join(native, "posix-readdir/missing")),
)
if err:
return fail(err)
err = compare_calls(
"opendir ENOTDIR",
lambda: opendir_or_raise(join(merge_notdir, "child")),
lambda: opendir_or_raise(join(native_notdir, "child")),
)
if err:
return fail(err)
merge_names = names_set(merge_dir)
native_names = names_set(native_dir)
if merge_names != native_names:
return fail(f"readdir names mismatch mergerfs={sorted(merge_names)} native={sorted(native_names)}")
merge_consistent = seek_tell_consistent(merge_dir)
native_consistent = seek_tell_consistent(native_dir)
if merge_consistent != native_consistent:
return fail(
f"seekdir/telldir consistency mismatch mergerfs={merge_consistent} native={native_consistent}"
)
touch(join(merge_dir, "victim"), b"x")
touch(join(native_dir, "victim"), b"x")
merge_sig = deleted_visibility_signature(merge_dir, join(merge_dir, "victim"))
native_sig = deleted_visibility_signature(native_dir, join(native_dir, "victim"))
if merge_sig != native_sig:
return fail(f"deleted-entry visibility mismatch mergerfs={merge_sig} native={native_sig}")
return 0
if __name__ == "__main__":
raise SystemExit(main())