#!/usr/bin/env python3 import os import sys import tempfile import time # Test create + write + read (passthrough.io=rw) (fd, filepath) = tempfile.mkstemp(dir=sys.argv[1]) test_data = b"passthrough io test data for create\n" * 100 # Write to newly created file bytes_written = os.write(fd, test_data) if bytes_written != len(test_data): print("create write failed: expected {} bytes, wrote {}".format(len(test_data), bytes_written)) sys.exit(1) # Seek and read back os.lseek(fd, 0, os.SEEK_SET) read_data = os.read(fd, len(test_data)) if read_data != test_data: print("create read failed: data mismatch") sys.exit(1) os.close(fd) # Test open existing file + write + read fd = os.open(filepath, os.O_RDWR) # Read existing data os.lseek(fd, 0, os.SEEK_SET) read_data = os.read(fd, len(test_data)) if read_data != test_data: print("open read failed: data mismatch") sys.exit(1) # Write more data at end os.lseek(fd, 0, os.SEEK_END) more_data = b"additional passthrough data for open\n" * 50 bytes_written = os.write(fd, more_data) if bytes_written != len(more_data): print("open write failed: expected {} bytes, wrote {}".format(len(more_data), bytes_written)) sys.exit(1) # Verify all data os.lseek(fd, 0, os.SEEK_SET) all_data = os.read(fd, len(test_data) + len(more_data)) if all_data != test_data + more_data: print("open final read failed: data mismatch") sys.exit(1) # Test multiple opens of same file (single backing_id feature) # When a file is already open with passthrough, subsequent opens # must reuse the same backing_id rather than creating new ones fd2 = os.open(filepath, os.O_RDWR) # Read from second fd - should see same data os.lseek(fd2, 0, os.SEEK_SET) read_data2 = os.read(fd2, len(test_data) + len(more_data)) if read_data2 != test_data + more_data: print("second open read failed: data mismatch") os.close(fd) os.close(fd2) sys.exit(1) # Write from second fd os.lseek(fd2, 0, os.SEEK_END) extra_data = b"data from second file descriptor\n" * 25 bytes_written = os.write(fd2, extra_data) if bytes_written != len(extra_data): print("second open write failed: expected {} bytes, wrote {}".format(len(extra_data), bytes_written)) os.close(fd) os.close(fd2) sys.exit(1) # Verify write is visible from first fd (shared backing) os.lseek(fd, 0, os.SEEK_SET) combined_data = os.read(fd, len(test_data) + len(more_data) + len(extra_data)) expected_data = test_data + more_data + extra_data if combined_data != expected_data: print("cross-fd read failed: data mismatch (writes from fd2 not visible on fd)") os.close(fd) os.close(fd2) sys.exit(1) # Open a third fd while others are still open fd3 = os.open(filepath, os.O_RDONLY) os.lseek(fd3, 0, os.SEEK_SET) read_data3 = os.read(fd3, len(expected_data)) if read_data3 != expected_data: print("third open read failed: data mismatch") os.close(fd) os.close(fd2) os.close(fd3) sys.exit(1) os.close(fd3) os.close(fd2) os.close(fd) # Cleanup os.unlink(filepath)