#!/usr/bin/env python3 import os import sys import tempfile import threading import time NUM_THREADS = 50 TEST_DATA = b"race condition test data\n" def open_and_operate(filepath, barrier, results, index): """ Wait at barrier, then open file, write, read, and store result. """ try: # Wait for all threads to be ready barrier.wait() # All threads try to open simultaneously fd = os.open(filepath, os.O_RDWR) # Write thread-specific data at a unique offset offset = index * len(TEST_DATA) os.lseek(fd, offset, os.SEEK_SET) bytes_written = os.write(fd, TEST_DATA) if bytes_written != len(TEST_DATA): results[index] = ("write_error", "expected {} bytes, wrote {}".format(len(TEST_DATA), bytes_written)) os.close(fd) return # Read back what we wrote os.lseek(fd, offset, os.SEEK_SET) read_data = os.read(fd, len(TEST_DATA)) if read_data != TEST_DATA: results[index] = ("read_error", "data mismatch at offset {}".format(offset)) os.close(fd) return results[index] = ("success", fd) except Exception as e: results[index] = ("exception", str(e)) # Create test file with enough space for all threads (fd, filepath) = tempfile.mkstemp(dir=sys.argv[1]) # Pre-allocate file to avoid issues with concurrent writes extending file total_size = NUM_THREADS * len(TEST_DATA) os.ftruncate(fd, total_size) os.close(fd) # Set up synchronization barrier = threading.Barrier(NUM_THREADS) results = [None] * NUM_THREADS threads = [] # Create and start all threads for i in range(NUM_THREADS): t = threading.Thread(target=open_and_operate, args=(filepath, barrier, results, i)) threads.append(t) for t in threads: t.start() for t in threads: t.join() # Check results failed = False fds_to_close = [] for i, result in enumerate(results): if result is None: print("thread {} returned no result".format(i)) failed = True elif result[0] == "success": fds_to_close.append(result[1]) else: print("thread {} failed: {} - {}".format(i, result[0], result[1])) failed = True if failed: for fd in fds_to_close: os.close(fd) os.unlink(filepath) sys.exit(1) # Verify all data is consistent by reading through one fd if fds_to_close: verify_fd = fds_to_close[0] os.lseek(verify_fd, 0, os.SEEK_SET) all_data = os.read(verify_fd, total_size) expected_data = TEST_DATA * NUM_THREADS if all_data != expected_data: print("final verification failed: data mismatch") print("expected {} bytes, got {} bytes".format(len(expected_data), len(all_data))) for fd in fds_to_close: os.close(fd) os.unlink(filepath) sys.exit(1) # Test cross-fd visibility: write from one fd, read from another if len(fds_to_close) >= 2: fd_a = fds_to_close[0] fd_b = fds_to_close[1] # Write new data from fd_a new_data = b"cross-fd visibility test\n" os.lseek(fd_a, 0, os.SEEK_SET) os.write(fd_a, new_data) # Read from fd_b - should see the new data os.lseek(fd_b, 0, os.SEEK_SET) read_back = os.read(fd_b, len(new_data)) if read_back != new_data: print("cross-fd visibility failed: write from fd_a not visible on fd_b") for fd in fds_to_close: os.close(fd) os.unlink(filepath) sys.exit(1) # Close all file descriptors for fd in fds_to_close: os.close(fd) # Cleanup os.unlink(filepath)