|
|
|
@ -205,8 +205,24 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> |
|
|
|
table_read = pq.read_table(filename, filesystem=s3) |
|
|
|
if table_read.num_rows != num_rows: |
|
|
|
return False, f"pq.read_table: Row count mismatch (expected {num_rows}, got {table_read.num_rows})" |
|
|
|
if not table_read.equals(table): |
|
|
|
return False, "pq.read_table: Table contents mismatch" |
|
|
|
|
|
|
|
# Check schema first |
|
|
|
if not table_read.schema.equals(table.schema): |
|
|
|
return False, f"pq.read_table: Schema mismatch (expected {table.schema}, got {table_read.schema})" |
|
|
|
|
|
|
|
# Sort both tables by 'id' column before comparison to handle potential row order differences |
|
|
|
table_sorted = table.sort_by([('id', 'ascending')]) |
|
|
|
table_read_sorted = table_read.sort_by([('id', 'ascending')]) |
|
|
|
|
|
|
|
if not table_read_sorted.equals(table_sorted): |
|
|
|
# Provide more detailed error information |
|
|
|
error_details = [] |
|
|
|
for col_name in table.column_names: |
|
|
|
col_original = table_sorted.column(col_name) |
|
|
|
col_read = table_read_sorted.column(col_name) |
|
|
|
if not col_original.equals(col_read): |
|
|
|
error_details.append(f"column '{col_name}' differs") |
|
|
|
return False, f"pq.read_table: Table contents mismatch ({', '.join(error_details)})" |
|
|
|
logging.info(f" ✓ pq.read_table: {table_read.num_rows:,} rows") |
|
|
|
|
|
|
|
# Test Method 2: Read with pq.ParquetDataset |
|
|
|
@ -215,8 +231,17 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> |
|
|
|
table_dataset = dataset.read() |
|
|
|
if table_dataset.num_rows != num_rows: |
|
|
|
return False, f"pq.ParquetDataset: Row count mismatch (expected {num_rows}, got {table_dataset.num_rows})" |
|
|
|
if not table_dataset.equals(table): |
|
|
|
return False, "pq.ParquetDataset: Table contents mismatch" |
|
|
|
|
|
|
|
# Sort before comparison |
|
|
|
table_dataset_sorted = table_dataset.sort_by([('id', 'ascending')]) |
|
|
|
if not table_dataset_sorted.equals(table_sorted): |
|
|
|
error_details = [] |
|
|
|
for col_name in table.column_names: |
|
|
|
col_original = table_sorted.column(col_name) |
|
|
|
col_read = table_dataset_sorted.column(col_name) |
|
|
|
if not col_original.equals(col_read): |
|
|
|
error_details.append(f"column '{col_name}' differs") |
|
|
|
return False, f"pq.ParquetDataset: Table contents mismatch ({', '.join(error_details)})" |
|
|
|
logging.info(f" ✓ pq.ParquetDataset: {table_dataset.num_rows:,} rows") |
|
|
|
|
|
|
|
# Test Method 3: Read with pads.dataset |
|
|
|
@ -225,8 +250,17 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> |
|
|
|
table_pads = dataset_pads.to_table() |
|
|
|
if table_pads.num_rows != num_rows: |
|
|
|
return False, f"pads.dataset: Row count mismatch (expected {num_rows}, got {table_pads.num_rows})" |
|
|
|
if not table_pads.equals(table): |
|
|
|
return False, "pads.dataset: Table contents mismatch" |
|
|
|
|
|
|
|
# Sort before comparison |
|
|
|
table_pads_sorted = table_pads.sort_by([('id', 'ascending')]) |
|
|
|
if not table_pads_sorted.equals(table_sorted): |
|
|
|
error_details = [] |
|
|
|
for col_name in table.column_names: |
|
|
|
col_original = table_sorted.column(col_name) |
|
|
|
col_read = table_pads_sorted.column(col_name) |
|
|
|
if not col_original.equals(col_read): |
|
|
|
error_details.append(f"column '{col_name}' differs") |
|
|
|
return False, f"pads.dataset: Table contents mismatch ({', '.join(error_details)})" |
|
|
|
logging.info(f" ✓ pads.dataset: {table_pads.num_rows:,} rows") |
|
|
|
|
|
|
|
return True, "All read methods passed" |
|
|
|
@ -242,10 +276,24 @@ def cleanup_test_files(s3: pafs.S3FileSystem) -> None: |
|
|
|
test_path = f"{BUCKET_NAME}/{TEST_DIR}" |
|
|
|
logging.info(f"Cleaning up test directory: {test_path}") |
|
|
|
|
|
|
|
# Delete the test directory and all its contents |
|
|
|
s3.delete_dir(test_path) |
|
|
|
|
|
|
|
logging.info("✓ Test directory cleaned up") |
|
|
|
# List and delete files individually to handle implicit directories |
|
|
|
try: |
|
|
|
file_selector = pafs.FileSelector(test_path, recursive=True) |
|
|
|
files = s3.get_file_info(file_selector) |
|
|
|
|
|
|
|
# Delete files first (not directories) |
|
|
|
for file_info in files: |
|
|
|
if file_info.type == pafs.FileType.File: |
|
|
|
s3.delete_file(file_info.path) |
|
|
|
logging.debug(f" Deleted file: {file_info.path}") |
|
|
|
|
|
|
|
logging.info("✓ Test directory cleaned up") |
|
|
|
except OSError as e: |
|
|
|
# Handle the case where the path doesn't exist or is inaccessible |
|
|
|
if "does not exist" in str(e).lower() or "not found" in str(e).lower(): |
|
|
|
logging.info("✓ Test directory already clean or doesn't exist") |
|
|
|
else: |
|
|
|
raise |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to cleanup test directory") |
|
|
|
|
|
|
|
|