From 0d4019a3d43a00fd6fa88c19af96727bceaa2214 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 19 Nov 2025 13:16:13 -0800 Subject: [PATCH] Update test_pyarrow_native_s3.py --- test/s3/parquet/test_pyarrow_native_s3.py | 68 +++++++++++++++++++---- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/test/s3/parquet/test_pyarrow_native_s3.py b/test/s3/parquet/test_pyarrow_native_s3.py index 812b2f448..ef3ea2d2e 100755 --- a/test/s3/parquet/test_pyarrow_native_s3.py +++ b/test/s3/parquet/test_pyarrow_native_s3.py @@ -205,8 +205,24 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> table_read = pq.read_table(filename, filesystem=s3) if table_read.num_rows != num_rows: return False, f"pq.read_table: Row count mismatch (expected {num_rows}, got {table_read.num_rows})" - if not table_read.equals(table): - return False, "pq.read_table: Table contents mismatch" + + # Check schema first + if not table_read.schema.equals(table.schema): + return False, f"pq.read_table: Schema mismatch (expected {table.schema}, got {table_read.schema})" + + # Sort both tables by 'id' column before comparison to handle potential row order differences + table_sorted = table.sort_by([('id', 'ascending')]) + table_read_sorted = table_read.sort_by([('id', 'ascending')]) + + if not table_read_sorted.equals(table_sorted): + # Provide more detailed error information + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_read_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pq.read_table: Table contents mismatch ({', '.join(error_details)})" logging.info(f" ✓ pq.read_table: {table_read.num_rows:,} rows") # Test Method 2: Read with pq.ParquetDataset @@ -215,8 +231,17 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> table_dataset = dataset.read() if table_dataset.num_rows != num_rows: return False, f"pq.ParquetDataset: Row count mismatch (expected {num_rows}, got {table_dataset.num_rows})" - if not table_dataset.equals(table): - return False, "pq.ParquetDataset: Table contents mismatch" + + # Sort before comparison + table_dataset_sorted = table_dataset.sort_by([('id', 'ascending')]) + if not table_dataset_sorted.equals(table_sorted): + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_dataset_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pq.ParquetDataset: Table contents mismatch ({', '.join(error_details)})" logging.info(f" ✓ pq.ParquetDataset: {table_dataset.num_rows:,} rows") # Test Method 3: Read with pads.dataset @@ -225,8 +250,17 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> table_pads = dataset_pads.to_table() if table_pads.num_rows != num_rows: return False, f"pads.dataset: Row count mismatch (expected {num_rows}, got {table_pads.num_rows})" - if not table_pads.equals(table): - return False, "pads.dataset: Table contents mismatch" + + # Sort before comparison + table_pads_sorted = table_pads.sort_by([('id', 'ascending')]) + if not table_pads_sorted.equals(table_sorted): + error_details = [] + for col_name in table.column_names: + col_original = table_sorted.column(col_name) + col_read = table_pads_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"pads.dataset: Table contents mismatch ({', '.join(error_details)})" logging.info(f" ✓ pads.dataset: {table_pads.num_rows:,} rows") return True, "All read methods passed" @@ -242,10 +276,24 @@ def cleanup_test_files(s3: pafs.S3FileSystem) -> None: test_path = f"{BUCKET_NAME}/{TEST_DIR}" logging.info(f"Cleaning up test directory: {test_path}") - # Delete the test directory and all its contents - s3.delete_dir(test_path) - - logging.info("✓ Test directory cleaned up") + # List and delete files individually to handle implicit directories + try: + file_selector = pafs.FileSelector(test_path, recursive=True) + files = s3.get_file_info(file_selector) + + # Delete files first (not directories) + for file_info in files: + if file_info.type == pafs.FileType.File: + s3.delete_file(file_info.path) + logging.debug(f" Deleted file: {file_info.path}") + + logging.info("✓ Test directory cleaned up") + except OSError as e: + # Handle the case where the path doesn't exist or is inaccessible + if "does not exist" in str(e).lower() or "not found" in str(e).lower(): + logging.info("✓ Test directory already clean or doesn't exist") + else: + raise except Exception: logging.exception("Failed to cleanup test directory")