You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
134 lines
3.6 KiB
134 lines
3.6 KiB
#!/usr/bin/env python3
|
|
# /// script
|
|
# dependencies = [
|
|
# "pyarrow>=22",
|
|
# "boto3>=1.28.0",
|
|
# ]
|
|
# ///
|
|
|
|
"""
|
|
Simple example of using PyArrow's native S3 filesystem with SeaweedFS.
|
|
|
|
This is a minimal example demonstrating how to write and read Parquet files
|
|
using PyArrow's built-in S3FileSystem without any additional dependencies
|
|
like s3fs.
|
|
|
|
Usage:
|
|
# Set environment variables
|
|
export S3_ENDPOINT_URL=localhost:8333
|
|
export S3_ACCESS_KEY=some_access_key1
|
|
export S3_SECRET_KEY=some_secret_key1
|
|
export BUCKET_NAME=test-parquet-bucket
|
|
|
|
# Run the script
|
|
python3 example_pyarrow_native.py
|
|
|
|
# Or run with uv (if available)
|
|
uv run example_pyarrow_native.py
|
|
"""
|
|
|
|
import os
|
|
import secrets
|
|
|
|
import pyarrow as pa
|
|
import pyarrow.dataset as pads
|
|
import pyarrow.fs as pafs
|
|
import pyarrow.parquet as pq
|
|
|
|
from parquet_test_utils import create_sample_table
|
|
|
|
# Configuration
|
|
BUCKET_NAME = os.getenv("BUCKET_NAME", "test-parquet-bucket")
|
|
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "localhost:8333")
|
|
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "some_access_key1")
|
|
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "some_secret_key1")
|
|
|
|
# Determine scheme from endpoint
|
|
if S3_ENDPOINT_URL.startswith("http://"):
|
|
scheme = "http"
|
|
endpoint = S3_ENDPOINT_URL[7:]
|
|
elif S3_ENDPOINT_URL.startswith("https://"):
|
|
scheme = "https"
|
|
endpoint = S3_ENDPOINT_URL[8:]
|
|
else:
|
|
scheme = "http" # Default to http for localhost
|
|
endpoint = S3_ENDPOINT_URL
|
|
|
|
print(f"Connecting to S3 endpoint: {scheme}://{endpoint}")
|
|
|
|
# Initialize PyArrow's NATIVE S3 filesystem
|
|
s3 = pafs.S3FileSystem(
|
|
access_key=S3_ACCESS_KEY,
|
|
secret_key=S3_SECRET_KEY,
|
|
endpoint_override=endpoint,
|
|
scheme=scheme,
|
|
allow_bucket_creation=True,
|
|
allow_bucket_deletion=True,
|
|
)
|
|
|
|
print("✓ Connected to S3 endpoint")
|
|
|
|
|
|
# Create bucket if needed (using boto3)
|
|
try:
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
s3_client = boto3.client(
|
|
's3',
|
|
endpoint_url=f"{scheme}://{endpoint}",
|
|
aws_access_key_id=S3_ACCESS_KEY,
|
|
aws_secret_access_key=S3_SECRET_KEY,
|
|
region_name='us-east-1',
|
|
)
|
|
|
|
try:
|
|
s3_client.head_bucket(Bucket=BUCKET_NAME)
|
|
print(f"✓ Bucket exists: {BUCKET_NAME}")
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == '404':
|
|
print(f"Creating bucket: {BUCKET_NAME}")
|
|
s3_client.create_bucket(Bucket=BUCKET_NAME)
|
|
print(f"✓ Bucket created: {BUCKET_NAME}")
|
|
else:
|
|
raise
|
|
except ImportError:
|
|
print("Warning: boto3 not available, assuming bucket exists")
|
|
|
|
# Generate a unique filename
|
|
filename = f"{BUCKET_NAME}/dataset-{secrets.token_hex(8)}/test.parquet"
|
|
|
|
print(f"\nWriting Parquet dataset to: {filename}")
|
|
|
|
# Write dataset
|
|
table = create_sample_table(200_000)
|
|
pads.write_dataset(
|
|
table,
|
|
filename,
|
|
filesystem=s3,
|
|
format="parquet",
|
|
)
|
|
|
|
print(f"✓ Wrote {table.num_rows:,} rows")
|
|
|
|
# Read with pq.read_table
|
|
print("\nReading with pq.read_table...")
|
|
table_read = pq.read_table(filename, filesystem=s3)
|
|
print(f"✓ Read {table_read.num_rows:,} rows")
|
|
|
|
# Read with pq.ParquetDataset
|
|
print("\nReading with pq.ParquetDataset...")
|
|
dataset = pq.ParquetDataset(filename, filesystem=s3)
|
|
table_dataset = dataset.read()
|
|
print(f"✓ Read {table_dataset.num_rows:,} rows")
|
|
|
|
# Read with pads.dataset
|
|
print("\nReading with pads.dataset...")
|
|
dataset_pads = pads.dataset(filename, filesystem=s3)
|
|
table_pads = dataset_pads.to_table()
|
|
print(f"✓ Read {table_pads.num_rows:,} rows")
|
|
|
|
print("\n✅ All operations completed successfully!")
|
|
print(f"\nFile written to: {filename}")
|
|
print("You can verify the file using the SeaweedFS S3 API or weed shell")
|
|
|