Browse Source

refactoring

pull/7508/head
chrislu 2 weeks ago
parent
commit
1c650c6466
  1. 14
      test/s3/parquet/example_pyarrow_native.py
  2. 41
      test/s3/parquet/parquet_test_utils.py
  3. 23
      test/s3/parquet/test_pyarrow_native_s3.py
  4. 14
      test/s3/parquet/test_sse_s3_compatibility.py

14
test/s3/parquet/example_pyarrow_native.py

@ -35,6 +35,8 @@ import pyarrow.dataset as pads
import pyarrow.fs as pafs import pyarrow.fs as pafs
import pyarrow.parquet as pq import pyarrow.parquet as pq
from parquet_test_utils import create_sample_table
# Configuration # Configuration
BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "localhost:8333") S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "localhost:8333")
@ -67,18 +69,6 @@ s3 = pafs.S3FileSystem(
print("✓ Connected to S3 endpoint") print("✓ Connected to S3 endpoint")
def create_sample_table(num_rows: int = 5) -> pa.Table:
"""Create a sample PyArrow table for testing."""
return pa.table(
{
"id": pa.array(range(num_rows), type=pa.int64()),
"name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()),
"value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()),
"flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()),
}
)
# Create bucket if needed (using boto3) # Create bucket if needed (using boto3)
try: try:
import boto3 import boto3

41
test/s3/parquet/parquet_test_utils.py

@ -0,0 +1,41 @@
"""
Shared utility functions for PyArrow Parquet tests.
This module provides common test utilities used across multiple test scripts
to avoid code duplication and ensure consistency.
"""
import pyarrow as pa
def create_sample_table(num_rows: int = 5) -> pa.Table:
"""Create a sample PyArrow table for testing.
Args:
num_rows: Number of rows to generate (default: 5)
Returns:
PyArrow Table with test data containing:
- id: int64 sequential IDs (0 to num_rows-1)
- name: string user names (user_0, user_1, ...)
- value: float64 values (id * 1.5)
- flag: bool alternating True/False based on even/odd id
Example:
>>> table = create_sample_table(3)
>>> print(table)
pyarrow.Table
id: int64
name: string
value: double
flag: bool
"""
return pa.table(
{
"id": pa.array(range(num_rows), type=pa.int64()),
"name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()),
"value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()),
"flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()),
}
)

23
test/s3/parquet/test_pyarrow_native_s3.py

@ -46,6 +46,8 @@ try:
except ImportError: except ImportError:
HAS_BOTO3 = False HAS_BOTO3 = False
from parquet_test_utils import create_sample_table
logging.basicConfig(level=logging.INFO, format="%(message)s") logging.basicConfig(level=logging.INFO, format="%(message)s")
# Configuration from environment variables with defaults # Configuration from environment variables with defaults
@ -71,18 +73,6 @@ if TEST_QUICK:
logging.info("Quick test mode enabled - running only small tests") logging.info("Quick test mode enabled - running only small tests")
def create_sample_table(num_rows: int = 5) -> pa.Table:
"""Create a sample PyArrow table for testing."""
return pa.table(
{
"id": pa.array(range(num_rows), type=pa.int64()),
"name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()),
"value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()),
"flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()),
}
)
def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]: def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]:
"""Initialize PyArrow's native S3 filesystem. """Initialize PyArrow's native S3 filesystem.
@ -271,7 +261,14 @@ def test_write_and_read(s3: pafs.S3FileSystem, test_name: str, num_rows: int) ->
def cleanup_test_files(s3: pafs.S3FileSystem) -> None: def cleanup_test_files(s3: pafs.S3FileSystem) -> None:
"""Clean up test files from S3."""
"""Clean up test files from S3.
Note: We cannot use s3.delete_dir() directly because SeaweedFS uses implicit
directories (path prefixes without physical directory objects). PyArrow's
delete_dir() attempts to delete the directory marker itself, which fails with
"INTERNAL_FAILURE" on SeaweedFS. Instead, we list and delete files individually,
letting implicit directories disappear automatically.
"""
try: try:
test_path = f"{BUCKET_NAME}/{TEST_DIR}" test_path = f"{BUCKET_NAME}/{TEST_DIR}"
logging.info(f"Cleaning up test directory: {test_path}") logging.info(f"Cleaning up test directory: {test_path}")

14
test/s3/parquet/test_sse_s3_compatibility.py

@ -44,6 +44,8 @@ except ImportError:
logging.exception("boto3 is required for this test") logging.exception("boto3 is required for this test")
sys.exit(1) sys.exit(1)
from parquet_test_utils import create_sample_table
logging.basicConfig(level=logging.INFO, format="%(message)s") logging.basicConfig(level=logging.INFO, format="%(message)s")
# Configuration # Configuration
@ -66,18 +68,6 @@ TEST_SIZES = {
} }
def create_sample_table(num_rows: int = 5) -> pa.Table:
"""Create a sample PyArrow table for testing."""
return pa.table(
{
"id": pa.array(range(num_rows), type=pa.int64()),
"name": pa.array([f"user_{i}" for i in range(num_rows)], type=pa.string()),
"value": pa.array([float(i) * 1.5 for i in range(num_rows)], type=pa.float64()),
"flag": pa.array([i % 2 == 0 for i in range(num_rows)], type=pa.bool_()),
}
)
def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]: def init_s3_filesystem() -> tuple[Optional[pafs.S3FileSystem], str, str]:
"""Initialize PyArrow's native S3 filesystem.""" """Initialize PyArrow's native S3 filesystem."""
try: try:

Loading…
Cancel
Save