diff --git a/test/s3/parquet/Makefile b/test/s3/parquet/Makefile index bd79d1747..fa91cfeaa 100644 --- a/test/s3/parquet/Makefile +++ b/test/s3/parquet/Makefile @@ -30,7 +30,7 @@ GREEN := \033[0;32m YELLOW := \033[1;33m NC := \033[0m # No Color -.PHONY: all build-weed check-binary check-python ci-test clean debug-logs debug-status help manual-start manual-stop setup-python start-seaweedfs start-seaweedfs-ci stop-seaweedfs stop-seaweedfs-safe test test-implicit-dir test-implicit-dir-with-server test-native-s3 test-native-s3-with-server test-native-s3-with-sse test-quick test-sse-s3-compat test-with-server +.PHONY: all build-weed check-binary check-python ci-test clean debug-logs debug-status help manual-start manual-stop setup-python start-seaweedfs start-seaweedfs-ci stop-seaweedfs stop-seaweedfs-safe test test-cross-fs test-cross-fs-with-server test-implicit-dir test-implicit-dir-with-server test-native-s3 test-native-s3-with-server test-native-s3-with-sse test-quick test-sse-s3-compat test-with-server all: test @@ -52,6 +52,8 @@ help: @echo " test-native-s3 - Test PyArrow's native S3 filesystem (assumes server running)" @echo " test-native-s3-with-server - Test PyArrow's native S3 filesystem with server management" @echo " test-native-s3-with-sse - Test PyArrow's native S3 with SSE-S3 encryption enabled" + @echo " test-cross-fs - Test cross-filesystem compatibility (s3fs ↔ PyArrow native)" + @echo " test-cross-fs-with-server - Test cross-filesystem compatibility with server management" @echo " test-sse-s3-compat - Comprehensive SSE-S3 compatibility test (multipart uploads)" @echo " setup-python - Setup Python virtual environment and install dependencies" @echo " check-python - Check if Python and required packages are available" @@ -401,6 +403,37 @@ test-native-s3-with-server: build-weed setup-python exit 1; \ fi +# Test cross-filesystem compatibility (s3fs ↔ PyArrow native S3) +test-cross-fs: setup-python + @echo "$(YELLOW)Running cross-filesystem compatibility tests...$(NC)" + @echo "$(YELLOW)Assuming SeaweedFS is already running on localhost:$(S3_PORT)$(NC)" + @S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_cross_filesystem_compatibility.py + +# Test cross-filesystem compatibility with automatic server management +test-cross-fs-with-server: build-weed setup-python + @echo "πŸš€ Starting cross-filesystem compatibility tests with automated server management..." + @echo "Starting SeaweedFS cluster..." + @if $(MAKE) start-seaweedfs-ci > weed-test.log 2>&1; then \ + echo "βœ… SeaweedFS cluster started successfully"; \ + echo "Running cross-filesystem compatibility tests..."; \ + trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + S3_ENDPOINT_URL=http://localhost:$(S3_PORT) \ + S3_ACCESS_KEY=$(ACCESS_KEY) \ + S3_SECRET_KEY=$(SECRET_KEY) \ + BUCKET_NAME=$(BUCKET_NAME) \ + $(VENV_DIR)/bin/$(PYTHON) test_cross_filesystem_compatibility.py || exit 1; \ + echo "βœ… All tests completed successfully"; \ + else \ + echo "❌ Failed to start SeaweedFS cluster"; \ + echo "=== Server startup logs ==="; \ + tail -100 weed-test.log 2>/dev/null || echo "No startup log available"; \ + exit 1; \ + fi + # Test PyArrow's native S3 filesystem compatibility with SSE-S3 enabled backend # (For encryption-specific validation, use test-sse-s3-compat) test-native-s3-with-sse: build-weed setup-python diff --git a/test/s3/parquet/README.md b/test/s3/parquet/README.md index ed65e4cbb..ce111f052 100644 --- a/test/s3/parquet/README.md +++ b/test/s3/parquet/README.md @@ -44,6 +44,9 @@ make test-implicit-dir-with-server # Run PyArrow native S3 filesystem tests make test-native-s3-with-server +# Run cross-filesystem compatibility tests (s3fs ↔ PyArrow native) +make test-cross-fs-with-server + # Run SSE-S3 encryption tests make test-sse-s3-compat @@ -128,6 +131,15 @@ dataset = pads.dataset('bucket/dataset', filesystem=s3) # βœ… - Verifies multipart upload encryption works correctly - All tests pass βœ… +### Cross-Filesystem Compatibility Tests +- **`test_cross_filesystem_compatibility.py`** - Verifies cross-compatibility between s3fs and PyArrow native S3 + - Tests write with s3fs β†’ read with PyArrow native S3 + - Tests write with PyArrow native S3 β†’ read with s3fs + - Tests 2 directions Γ— 3 read methods Γ— 2 dataset sizes = 12 scenarios + - Validates that files written by either filesystem can be read by the other + - **All tests pass** βœ… + - See **`CROSS_FILESYSTEM_COMPATIBILITY.md`** for detailed test results and analysis + ### Implicit Directory Tests - **`test_implicit_directory_fix.py`** - Specific tests for the implicit directory fix - Tests HEAD request behavior @@ -159,6 +171,11 @@ dataset = pads.dataset('bucket/dataset', filesystem=s3) # βœ… - How the implicit directory fix works - Performance considerations +- **`CROSS_FILESYSTEM_COMPATIBILITY.md`** - Cross-filesystem compatibility test results βœ… **NEW** + - Validates s3fs ↔ PyArrow native S3 interoperability + - Confirms files written by either can be read by the other + - Test methodology and detailed results + - **`MINIO_DIRECTORY_HANDLING.md`** - Comparison with MinIO - How MinIO handles directory markers - Differences in implementation approaches @@ -202,6 +219,8 @@ make test-quick # Run quick tests with small files only (assumes serve make test-implicit-dir-with-server # Run implicit directory tests with server make test-native-s3 # Run PyArrow native S3 tests (assumes server is running) make test-native-s3-with-server # Run PyArrow native S3 tests with server management +make test-cross-fs # Run cross-filesystem compatibility tests (assumes server is running) +make test-cross-fs-with-server # Run cross-filesystem compatibility tests with server management make test-sse-s3-compat # Run comprehensive SSE-S3 encryption compatibility tests # Server Management @@ -222,8 +241,9 @@ The tests are automatically run in GitHub Actions on every push/PR that affects **Test Matrix**: - Python versions: 3.9, 3.11, 3.12 - PyArrow integration tests (s3fs): 20 test combinations -- PyArrow native S3 tests: 6 test scenarios βœ… **NEW** -- SSE-S3 encryption tests: 5 file sizes βœ… **NEW** +- PyArrow native S3 tests: 6 test scenarios βœ… +- Cross-filesystem compatibility tests: 12 test scenarios βœ… **NEW** +- SSE-S3 encryption tests: 5 file sizes βœ… - Implicit directory fix tests: 6 test scenarios - Go unit tests: 17 test cases @@ -231,9 +251,10 @@ The tests are automatically run in GitHub Actions on every push/PR that affects 1. Build SeaweedFS 2. Run PyArrow Parquet integration tests (`make test-with-server`) 3. Run implicit directory fix tests (`make test-implicit-dir-with-server`) -4. Run PyArrow native S3 filesystem tests (`make test-native-s3-with-server`) βœ… **NEW** -5. Run SSE-S3 encryption compatibility tests (`make test-sse-s3-compat`) βœ… **NEW** -6. Run Go unit tests for implicit directory handling +4. Run PyArrow native S3 filesystem tests (`make test-native-s3-with-server`) +5. Run cross-filesystem compatibility tests (`make test-cross-fs-with-server`) βœ… **NEW** +6. Run SSE-S3 encryption compatibility tests (`make test-sse-s3-compat`) +7. Run Go unit tests for implicit directory handling **Triggers**: - Push/PR to master (when `weed/s3api/**` or `weed/filer/**` changes) diff --git a/test/s3/parquet/test_cross_filesystem_compatibility.py b/test/s3/parquet/test_cross_filesystem_compatibility.py new file mode 100644 index 000000000..b2fc03831 --- /dev/null +++ b/test/s3/parquet/test_cross_filesystem_compatibility.py @@ -0,0 +1,446 @@ +#!/usr/bin/env python3 +""" +Cross-filesystem compatibility tests for PyArrow Parquet files. + +This test verifies that Parquet files written using one filesystem implementation +(s3fs or PyArrow native S3) can be correctly read using the other implementation. + +Test Matrix: +- Write with s3fs β†’ Read with PyArrow native S3 +- Write with PyArrow native S3 β†’ Read with s3fs + +Requirements: + - pyarrow>=22.0.0 + - s3fs>=2024.12.0 + - boto3>=1.40.0 + +Environment Variables: + S3_ENDPOINT_URL: S3 endpoint (default: http://localhost:8333) + S3_ACCESS_KEY: S3 access key (default: some_access_key1) + S3_SECRET_KEY: S3 secret key (default: some_secret_key1) + BUCKET_NAME: S3 bucket name (default: test-parquet-bucket) + TEST_QUICK: Run only small/quick tests (default: 0, set to 1 for quick mode) + +Usage: + # Run with default environment variables + python3 test_cross_filesystem_compatibility.py + + # Run with custom environment variables + S3_ENDPOINT_URL=http://localhost:8333 \ + S3_ACCESS_KEY=mykey \ + S3_SECRET_KEY=mysecret \ + BUCKET_NAME=mybucket \ + python3 test_cross_filesystem_compatibility.py +""" + +import os +import secrets +import sys +import logging +from typing import Optional, Tuple + +import pyarrow as pa +import pyarrow.dataset as pads +import pyarrow.fs as pafs +import pyarrow.parquet as pq +import s3fs + +try: + import boto3 + from botocore.exceptions import ClientError + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +from parquet_test_utils import create_sample_table + +logging.basicConfig(level=logging.INFO, format="%(message)s") + +# Configuration from environment variables with defaults +S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "http://localhost:8333") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "some_access_key1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "some_secret_key1") +BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket") +TEST_QUICK = os.getenv("TEST_QUICK", "0") == "1" + +# Create randomized test directory +TEST_RUN_ID = secrets.token_hex(8) +TEST_DIR = f"parquet-cross-fs-tests/{TEST_RUN_ID}" + +# Test file sizes +TEST_SIZES = { + "small": 5, + "large": 200_000, # This will create multiple row groups +} + +# Filter to only small tests if quick mode is enabled +if TEST_QUICK: + TEST_SIZES = {"small": TEST_SIZES["small"]} + logging.info("Quick test mode enabled - running only small tests") + + +def init_s3fs() -> Optional[s3fs.S3FileSystem]: + """Initialize s3fs filesystem.""" + try: + logging.info("Initializing s3fs...") + fs = s3fs.S3FileSystem( + client_kwargs={"endpoint_url": S3_ENDPOINT_URL}, + key=S3_ACCESS_KEY, + secret=S3_SECRET_KEY, + use_listings_cache=False, + ) + logging.info("βœ“ s3fs initialized successfully") + return fs + except Exception: + logging.exception("βœ— Failed to initialize s3fs") + return None + + +def init_pyarrow_s3() -> Tuple[Optional[pafs.S3FileSystem], str, str]: + """Initialize PyArrow's native S3 filesystem. + + Returns: + tuple: (S3FileSystem instance, scheme, endpoint) + """ + try: + logging.info("Initializing PyArrow S3FileSystem...") + + # Determine scheme from endpoint + if S3_ENDPOINT_URL.startswith("http://"): + scheme = "http" + endpoint = S3_ENDPOINT_URL[7:] # Remove http:// + elif S3_ENDPOINT_URL.startswith("https://"): + scheme = "https" + endpoint = S3_ENDPOINT_URL[8:] # Remove https:// + else: + # Default to http for localhost + scheme = "http" + endpoint = S3_ENDPOINT_URL + + # Enable bucket creation and deletion for testing + s3 = pafs.S3FileSystem( + access_key=S3_ACCESS_KEY, + secret_key=S3_SECRET_KEY, + endpoint_override=endpoint, + scheme=scheme, + allow_bucket_creation=True, + allow_bucket_deletion=True, + ) + + logging.info("βœ“ PyArrow S3FileSystem initialized successfully") + return s3, scheme, endpoint + except Exception: + logging.exception("βœ— Failed to initialize PyArrow S3FileSystem") + return None, "", "" + + +def ensure_bucket_exists(s3fs_fs: s3fs.S3FileSystem, pyarrow_s3: pafs.S3FileSystem) -> bool: + """Ensure the test bucket exists using s3fs.""" + try: + if not s3fs_fs.exists(BUCKET_NAME): + logging.info(f"Creating bucket: {BUCKET_NAME}") + try: + s3fs_fs.mkdir(BUCKET_NAME) + logging.info(f"βœ“ Bucket created: {BUCKET_NAME}") + except FileExistsError: + # Bucket was created between the check and mkdir call + logging.info(f"βœ“ Bucket exists: {BUCKET_NAME}") + else: + logging.info(f"βœ“ Bucket exists: {BUCKET_NAME}") + return True + except Exception: + logging.exception("βœ— Failed to create/check bucket") + return False + + +def write_with_s3fs(table: pa.Table, path: str, s3fs_fs: s3fs.S3FileSystem) -> bool: + """Write Parquet file using s3fs filesystem.""" + try: + pads.write_dataset(table, path, format="parquet", filesystem=s3fs_fs) + return True + except Exception: + logging.exception("βœ— Failed to write with s3fs") + return False + + +def write_with_pyarrow_s3(table: pa.Table, path: str, pyarrow_s3: pafs.S3FileSystem) -> bool: + """Write Parquet file using PyArrow native S3 filesystem.""" + try: + pads.write_dataset(table, path, format="parquet", filesystem=pyarrow_s3) + return True + except Exception: + logging.exception("βœ— Failed to write with PyArrow S3") + return False + + +def read_with_s3fs(path: str, s3fs_fs: s3fs.S3FileSystem) -> Tuple[bool, Optional[pa.Table], str]: + """Read Parquet file using s3fs filesystem with multiple methods.""" + errors = [] + + # Try pq.read_table + try: + table = pq.read_table(path, filesystem=s3fs_fs) + return True, table, "pq.read_table" + except Exception as e: + errors.append(f"pq.read_table: {type(e).__name__}: {e}") + + # Try pq.ParquetDataset + try: + dataset = pq.ParquetDataset(path, filesystem=s3fs_fs) + table = dataset.read() + return True, table, "pq.ParquetDataset" + except Exception as e: + errors.append(f"pq.ParquetDataset: {type(e).__name__}: {e}") + + # Try pads.dataset + try: + dataset = pads.dataset(path, format="parquet", filesystem=s3fs_fs) + table = dataset.to_table() + return True, table, "pads.dataset" + except Exception as e: + errors.append(f"pads.dataset: {type(e).__name__}: {e}") + + return False, None, " | ".join(errors) + + +def read_with_pyarrow_s3(path: str, pyarrow_s3: pafs.S3FileSystem) -> Tuple[bool, Optional[pa.Table], str]: + """Read Parquet file using PyArrow native S3 filesystem with multiple methods.""" + errors = [] + + # Try pq.read_table + try: + table = pq.read_table(path, filesystem=pyarrow_s3) + return True, table, "pq.read_table" + except Exception as e: + errors.append(f"pq.read_table: {type(e).__name__}: {e}") + + # Try pq.ParquetDataset + try: + dataset = pq.ParquetDataset(path, filesystem=pyarrow_s3) + table = dataset.read() + return True, table, "pq.ParquetDataset" + except Exception as e: + errors.append(f"pq.ParquetDataset: {type(e).__name__}: {e}") + + # Try pads.dataset + try: + dataset = pads.dataset(path, filesystem=pyarrow_s3) + table = dataset.to_table() + return True, table, "pads.dataset" + except Exception as e: + errors.append(f"pads.dataset: {type(e).__name__}: {e}") + + return False, None, " | ".join(errors) + + +def verify_table_integrity(original: pa.Table, read: pa.Table) -> Tuple[bool, str]: + """Verify that read table matches the original table.""" + # Check row count + if read.num_rows != original.num_rows: + return False, f"Row count mismatch: expected {original.num_rows}, got {read.num_rows}" + + # Check schema + if not read.schema.equals(original.schema): + return False, f"Schema mismatch: expected {original.schema}, got {read.schema}" + + # Sort both tables by 'id' column before comparison to handle potential row order differences + original_sorted = original.sort_by([('id', 'ascending')]) + read_sorted = read.sort_by([('id', 'ascending')]) + + # Check data equality + if not read_sorted.equals(original_sorted): + # Provide detailed error information + error_details = [] + for col_name in original.column_names: + col_original = original_sorted.column(col_name) + col_read = read_sorted.column(col_name) + if not col_original.equals(col_read): + error_details.append(f"column '{col_name}' differs") + return False, f"Data mismatch: {', '.join(error_details)}" + + return True, "Data verified successfully" + + +def test_write_s3fs_read_pyarrow( + test_name: str, + num_rows: int, + s3fs_fs: s3fs.S3FileSystem, + pyarrow_s3: pafs.S3FileSystem +) -> Tuple[bool, str]: + """Test: Write with s3fs, read with PyArrow native S3.""" + try: + table = create_sample_table(num_rows) + path = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet" + + # Write with s3fs + logging.info(f" Writing {num_rows:,} rows with s3fs to {path}...") + if not write_with_s3fs(table, path, s3fs_fs): + return False, "Write with s3fs failed" + logging.info(" βœ“ Write completed") + + # Read with PyArrow native S3 + logging.info(" Reading with PyArrow native S3...") + success, read_table, method = read_with_pyarrow_s3(path, pyarrow_s3) + if not success: + return False, f"Read with PyArrow S3 failed: {method}" + logging.info(f" βœ“ Read {read_table.num_rows:,} rows using {method}") + + # Verify data integrity + verify_success, verify_msg = verify_table_integrity(table, read_table) + if not verify_success: + return False, f"Verification failed: {verify_msg}" + logging.info(f" βœ“ {verify_msg}") + + return True, f"s3fsβ†’PyArrow: {method}" + + except Exception as e: + logging.exception(" βœ— Test failed") + return False, f"{type(e).__name__}: {e}" + + +def test_write_pyarrow_read_s3fs( + test_name: str, + num_rows: int, + s3fs_fs: s3fs.S3FileSystem, + pyarrow_s3: pafs.S3FileSystem +) -> Tuple[bool, str]: + """Test: Write with PyArrow native S3, read with s3fs.""" + try: + table = create_sample_table(num_rows) + path = f"{BUCKET_NAME}/{TEST_DIR}/{test_name}/data.parquet" + + # Write with PyArrow native S3 + logging.info(f" Writing {num_rows:,} rows with PyArrow native S3 to {path}...") + if not write_with_pyarrow_s3(table, path, pyarrow_s3): + return False, "Write with PyArrow S3 failed" + logging.info(" βœ“ Write completed") + + # Read with s3fs + logging.info(" Reading with s3fs...") + success, read_table, method = read_with_s3fs(path, s3fs_fs) + if not success: + return False, f"Read with s3fs failed: {method}" + logging.info(f" βœ“ Read {read_table.num_rows:,} rows using {method}") + + # Verify data integrity + verify_success, verify_msg = verify_table_integrity(table, read_table) + if not verify_success: + return False, f"Verification failed: {verify_msg}" + logging.info(f" βœ“ {verify_msg}") + + return True, f"PyArrowβ†’s3fs: {method}" + + except Exception as e: + logging.exception(" βœ— Test failed") + return False, f"{type(e).__name__}: {e}" + + +def cleanup_test_files(s3fs_fs: s3fs.S3FileSystem) -> None: + """Clean up test files from S3.""" + try: + test_path = f"{BUCKET_NAME}/{TEST_DIR}" + if s3fs_fs.exists(test_path): + logging.info(f"Cleaning up test directory: {test_path}") + s3fs_fs.rm(test_path, recursive=True) + logging.info("βœ“ Test directory cleaned up") + except Exception: + logging.exception("Failed to cleanup test directory") + + +def main(): + """Run cross-filesystem compatibility tests.""" + print("=" * 80) + print("Cross-Filesystem Compatibility Tests for PyArrow Parquet") + print("Testing: s3fs ↔ PyArrow Native S3 Filesystem") + if TEST_QUICK: + print("*** QUICK TEST MODE - Small files only ***") + print("=" * 80 + "\n") + + print("Configuration:") + print(f" S3 Endpoint: {S3_ENDPOINT_URL}") + print(f" Access Key: {S3_ACCESS_KEY}") + print(f" Bucket: {BUCKET_NAME}") + print(f" Test Directory: {TEST_DIR}") + print(f" Quick Mode: {'Yes (small files only)' if TEST_QUICK else 'No (all file sizes)'}") + print(f" PyArrow Version: {pa.__version__}") + print() + + # Initialize both filesystems + s3fs_fs = init_s3fs() + if s3fs_fs is None: + print("Cannot proceed without s3fs connection") + return 1 + + pyarrow_s3, scheme, endpoint = init_pyarrow_s3() + if pyarrow_s3 is None: + print("Cannot proceed without PyArrow S3 connection") + return 1 + + print() + + # Ensure bucket exists + if not ensure_bucket_exists(s3fs_fs, pyarrow_s3): + print("Cannot proceed without bucket") + return 1 + + print() + + results = [] + + # Test all file sizes + for size_name, num_rows in TEST_SIZES.items(): + print(f"\n{'='*80}") + print(f"Testing with {size_name} files ({num_rows:,} rows)") + print(f"{'='*80}\n") + + # Test 1: Write with s3fs, read with PyArrow native S3 + test_name = f"{size_name}_s3fs_to_pyarrow" + print(f"Test: Write with s3fs β†’ Read with PyArrow native S3") + success, message = test_write_s3fs_read_pyarrow( + test_name, num_rows, s3fs_fs, pyarrow_s3 + ) + results.append((test_name, success, message)) + status = "βœ“ PASS" if success else "βœ— FAIL" + print(f"{status}: {message}\n") + + # Test 2: Write with PyArrow native S3, read with s3fs + test_name = f"{size_name}_pyarrow_to_s3fs" + print(f"Test: Write with PyArrow native S3 β†’ Read with s3fs") + success, message = test_write_pyarrow_read_s3fs( + test_name, num_rows, s3fs_fs, pyarrow_s3 + ) + results.append((test_name, success, message)) + status = "βœ“ PASS" if success else "βœ— FAIL" + print(f"{status}: {message}\n") + + # Summary + print("\n" + "=" * 80) + print("SUMMARY") + print("=" * 80) + passed = sum(1 for _, success, _ in results if success) + total = len(results) + print(f"\nTotal: {passed}/{total} passed\n") + + for test_name, success, message in results: + status = "βœ“" if success else "βœ—" + print(f" {status} {test_name}: {message}") + + print("\n" + "=" * 80) + if passed == total: + print("βœ“ ALL CROSS-FILESYSTEM TESTS PASSED!") + print("\nConclusion: Files written with s3fs and PyArrow native S3 are") + print("fully compatible and can be read by either filesystem implementation.") + else: + print(f"βœ— {total - passed} test(s) failed") + + print("=" * 80 + "\n") + + # Cleanup + cleanup_test_files(s3fs_fs) + + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) +