|
|
|
@ -32,7 +32,6 @@ import os |
|
|
|
import secrets |
|
|
|
import sys |
|
|
|
import logging |
|
|
|
from datetime import datetime |
|
|
|
from typing import Optional |
|
|
|
|
|
|
|
import pyarrow as pa |
|
|
|
@ -171,6 +170,7 @@ def ensure_bucket_exists(s3: pafs.S3FileSystem) -> bool: |
|
|
|
logging.info(f"✓ Bucket exists: {BUCKET_NAME}") |
|
|
|
return True |
|
|
|
except Exception: |
|
|
|
# Bucket likely does not exist or is not accessible; fall back to creation. |
|
|
|
pass |
|
|
|
|
|
|
|
# Try to create the bucket |
|
|
|
@ -178,8 +178,8 @@ def ensure_bucket_exists(s3: pafs.S3FileSystem) -> bool: |
|
|
|
s3.create_dir(BUCKET_NAME) |
|
|
|
logging.info(f"✓ Bucket created: {BUCKET_NAME}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
|
|
logging.error(f"✗ Failed to create/check bucket with PyArrow: {e}") |
|
|
|
except Exception: |
|
|
|
logging.exception("✗ Failed to create/check bucket with PyArrow") |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
@ -198,36 +198,42 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) -> |
|
|
|
filesystem=s3, |
|
|
|
format="parquet", |
|
|
|
) |
|
|
|
logging.info(f" ✓ Write completed") |
|
|
|
logging.info(" ✓ Write completed") |
|
|
|
|
|
|
|
# Test Method 1: Read with pq.read_table |
|
|
|
logging.info(f" Reading with pq.read_table...") |
|
|
|
logging.info(" Reading with pq.read_table...") |
|
|
|
table_read = pq.read_table(filename, filesystem=s3) |
|
|
|
if table_read.num_rows != num_rows: |
|
|
|
return False, f"pq.read_table: Row count mismatch (expected {num_rows}, got {table_read.num_rows})" |
|
|
|
if not table_read.equals(table): |
|
|
|
return False, "pq.read_table: Table contents mismatch" |
|
|
|
logging.info(f" ✓ pq.read_table: {table_read.num_rows:,} rows") |
|
|
|
|
|
|
|
# Test Method 2: Read with pq.ParquetDataset |
|
|
|
logging.info(f" Reading with pq.ParquetDataset...") |
|
|
|
logging.info(" Reading with pq.ParquetDataset...") |
|
|
|
dataset = pq.ParquetDataset(filename, filesystem=s3) |
|
|
|
table_dataset = dataset.read() |
|
|
|
if table_dataset.num_rows != num_rows: |
|
|
|
return False, f"pq.ParquetDataset: Row count mismatch (expected {num_rows}, got {table_dataset.num_rows})" |
|
|
|
if not table_dataset.equals(table): |
|
|
|
return False, "pq.ParquetDataset: Table contents mismatch" |
|
|
|
logging.info(f" ✓ pq.ParquetDataset: {table_dataset.num_rows:,} rows") |
|
|
|
|
|
|
|
# Test Method 3: Read with pads.dataset |
|
|
|
logging.info(f" Reading with pads.dataset...") |
|
|
|
logging.info(" Reading with pads.dataset...") |
|
|
|
dataset_pads = pads.dataset(filename, filesystem=s3) |
|
|
|
table_pads = dataset_pads.to_table() |
|
|
|
if table_pads.num_rows != num_rows: |
|
|
|
return False, f"pads.dataset: Row count mismatch (expected {num_rows}, got {table_pads.num_rows})" |
|
|
|
if not table_pads.equals(table): |
|
|
|
return False, "pads.dataset: Table contents mismatch" |
|
|
|
logging.info(f" ✓ pads.dataset: {table_pads.num_rows:,} rows") |
|
|
|
|
|
|
|
return True, "All read methods passed" |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
logging.exception(f" ✗ Test failed: {e}") |
|
|
|
return False, f"{type(e).__name__}: {str(e)}" |
|
|
|
except Exception as exc: |
|
|
|
logging.exception(" ✗ Test failed") |
|
|
|
return False, f"{type(exc).__name__}: {exc}" |
|
|
|
|
|
|
|
|
|
|
|
def cleanup_test_files(s3: pafs.S3FileSystem) -> None: |
|
|
|
@ -236,15 +242,12 @@ def cleanup_test_files(s3: pafs.S3FileSystem) -> None: |
|
|
|
test_path = f"{BUCKET_NAME}/{TEST_DIR}" |
|
|
|
logging.info(f"Cleaning up test directory: {test_path}") |
|
|
|
|
|
|
|
# Delete all files in the test directory |
|
|
|
file_info = s3.get_file_info(pafs.FileSelector(test_path, recursive=True)) |
|
|
|
for info in file_info: |
|
|
|
if info.type == pafs.FileType.File: |
|
|
|
s3.delete_file(info.path) |
|
|
|
# Delete the test directory and all its contents |
|
|
|
s3.delete_dir(test_path) |
|
|
|
|
|
|
|
logging.info("✓ Test directory cleaned up") |
|
|
|
except Exception as e: |
|
|
|
logging.warning(f"Failed to cleanup test directory: {e}") |
|
|
|
except Exception: |
|
|
|
logging.exception("Failed to cleanup test directory") |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|