mirror of https://github.com/trapexit/mergerfs.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
3.6 KiB
132 lines
3.6 KiB
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
NUM_THREADS = 50
|
|
TEST_DATA = b"race condition test data\n"
|
|
|
|
def open_and_operate(filepath, barrier, results, index):
|
|
"""
|
|
Wait at barrier, then open file, write, read, and store result.
|
|
"""
|
|
try:
|
|
# Wait for all threads to be ready
|
|
barrier.wait()
|
|
|
|
# All threads try to open simultaneously
|
|
fd = os.open(filepath, os.O_RDWR)
|
|
|
|
# Write thread-specific data at a unique offset
|
|
offset = index * len(TEST_DATA)
|
|
os.lseek(fd, offset, os.SEEK_SET)
|
|
bytes_written = os.write(fd, TEST_DATA)
|
|
|
|
if bytes_written != len(TEST_DATA):
|
|
results[index] = ("write_error", "expected {} bytes, wrote {}".format(len(TEST_DATA), bytes_written))
|
|
os.close(fd)
|
|
return
|
|
|
|
# Read back what we wrote
|
|
os.lseek(fd, offset, os.SEEK_SET)
|
|
read_data = os.read(fd, len(TEST_DATA))
|
|
|
|
if read_data != TEST_DATA:
|
|
results[index] = ("read_error", "data mismatch at offset {}".format(offset))
|
|
os.close(fd)
|
|
return
|
|
|
|
results[index] = ("success", fd)
|
|
|
|
except Exception as e:
|
|
results[index] = ("exception", str(e))
|
|
|
|
# Create test file with enough space for all threads
|
|
(fd, filepath) = tempfile.mkstemp(dir=sys.argv[1])
|
|
|
|
# Pre-allocate file to avoid issues with concurrent writes extending file
|
|
total_size = NUM_THREADS * len(TEST_DATA)
|
|
os.ftruncate(fd, total_size)
|
|
os.close(fd)
|
|
|
|
# Set up synchronization
|
|
barrier = threading.Barrier(NUM_THREADS)
|
|
results = [None] * NUM_THREADS
|
|
threads = []
|
|
|
|
# Create and start all threads
|
|
for i in range(NUM_THREADS):
|
|
t = threading.Thread(target=open_and_operate, args=(filepath, barrier, results, i))
|
|
threads.append(t)
|
|
|
|
for t in threads:
|
|
t.start()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Check results
|
|
failed = False
|
|
fds_to_close = []
|
|
|
|
for i, result in enumerate(results):
|
|
if result is None:
|
|
print("thread {} returned no result".format(i))
|
|
failed = True
|
|
elif result[0] == "success":
|
|
fds_to_close.append(result[1])
|
|
else:
|
|
print("thread {} failed: {} - {}".format(i, result[0], result[1]))
|
|
failed = True
|
|
|
|
if failed:
|
|
for fd in fds_to_close:
|
|
os.close(fd)
|
|
os.unlink(filepath)
|
|
sys.exit(1)
|
|
|
|
# Verify all data is consistent by reading through one fd
|
|
if fds_to_close:
|
|
verify_fd = fds_to_close[0]
|
|
os.lseek(verify_fd, 0, os.SEEK_SET)
|
|
all_data = os.read(verify_fd, total_size)
|
|
|
|
expected_data = TEST_DATA * NUM_THREADS
|
|
if all_data != expected_data:
|
|
print("final verification failed: data mismatch")
|
|
print("expected {} bytes, got {} bytes".format(len(expected_data), len(all_data)))
|
|
for fd in fds_to_close:
|
|
os.close(fd)
|
|
os.unlink(filepath)
|
|
sys.exit(1)
|
|
|
|
# Test cross-fd visibility: write from one fd, read from another
|
|
if len(fds_to_close) >= 2:
|
|
fd_a = fds_to_close[0]
|
|
fd_b = fds_to_close[1]
|
|
|
|
# Write new data from fd_a
|
|
new_data = b"cross-fd visibility test\n"
|
|
os.lseek(fd_a, 0, os.SEEK_SET)
|
|
os.write(fd_a, new_data)
|
|
|
|
# Read from fd_b - should see the new data
|
|
os.lseek(fd_b, 0, os.SEEK_SET)
|
|
read_back = os.read(fd_b, len(new_data))
|
|
|
|
if read_back != new_data:
|
|
print("cross-fd visibility failed: write from fd_a not visible on fd_b")
|
|
for fd in fds_to_close:
|
|
os.close(fd)
|
|
os.unlink(filepath)
|
|
sys.exit(1)
|
|
|
|
# Close all file descriptors
|
|
for fd in fds_to_close:
|
|
os.close(fd)
|
|
|
|
# Cleanup
|
|
os.unlink(filepath)
|