|
|
|
@ -157,6 +157,31 @@ def write_with_buffer_and_s3fs(table: pa.Table, path: str, fs: s3fs.S3FileSystem |
|
|
|
|
|
|
|
# Read Methods |
|
|
|
|
|
|
|
def get_parquet_files(path: str, fs: s3fs.S3FileSystem) -> list: |
|
|
|
""" |
|
|
|
Helper to discover all parquet files for a given path. |
|
|
|
|
|
|
|
Args: |
|
|
|
path: S3 path (file or directory) |
|
|
|
fs: S3FileSystem instance |
|
|
|
|
|
|
|
Returns: |
|
|
|
List of parquet file paths |
|
|
|
|
|
|
|
Raises: |
|
|
|
ValueError: If no parquet files are found in a directory |
|
|
|
""" |
|
|
|
if fs.isdir(path): |
|
|
|
# Find all parquet files in the directory |
|
|
|
files = [f for f in fs.ls(path) if f.endswith('.parquet')] |
|
|
|
if not files: |
|
|
|
raise ValueError(f"No parquet files found in directory: {path}") |
|
|
|
return files |
|
|
|
else: |
|
|
|
# Single file path |
|
|
|
return [path] |
|
|
|
|
|
|
|
|
|
|
|
def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
"""Read using pads.dataset - handles both single files and directories.""" |
|
|
|
try: |
|
|
|
@ -173,18 +198,22 @@ def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, |
|
|
|
def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
"""Read directly via s3fs.open() streaming.""" |
|
|
|
try: |
|
|
|
# Check if path is a directory (common with pads.write_dataset) |
|
|
|
if fs.isdir(path): |
|
|
|
# Find parquet files in the directory |
|
|
|
files = [f for f in fs.ls(path) if f.endswith('.parquet')] |
|
|
|
if not files: |
|
|
|
raise ValueError(f"No parquet files found in directory: {path}") |
|
|
|
# For simplicity, read the first parquet file |
|
|
|
# In production, you might want to read all files and concatenate |
|
|
|
path = files[0] |
|
|
|
# Get all parquet files (handles both single file and directory) |
|
|
|
parquet_files = get_parquet_files(path, fs) |
|
|
|
|
|
|
|
# Read all parquet files and concatenate them |
|
|
|
tables = [] |
|
|
|
for file_path in parquet_files: |
|
|
|
with fs.open(file_path, "rb") as f: |
|
|
|
table = pq.read_table(f) |
|
|
|
tables.append(table) |
|
|
|
|
|
|
|
# Concatenate all tables into one |
|
|
|
if len(tables) == 1: |
|
|
|
result = tables[0] |
|
|
|
else: |
|
|
|
result = pa.concat_tables(tables) |
|
|
|
|
|
|
|
with fs.open(path, "rb") as f: |
|
|
|
result = pq.read_table(f) |
|
|
|
return True, "s3fs.open+pq.read_table", result.num_rows |
|
|
|
except Exception as e: |
|
|
|
error_msg = f"s3fs.open+pq.read_table: {type(e).__name__}" |
|
|
|
@ -195,20 +224,24 @@ def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
def read_buffered_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
"""Read via s3fs.open() into buffer, then pq.read_table.""" |
|
|
|
try: |
|
|
|
# Check if path is a directory (common with pads.write_dataset) |
|
|
|
if fs.isdir(path): |
|
|
|
# Find parquet files in the directory |
|
|
|
files = [f for f in fs.ls(path) if f.endswith('.parquet')] |
|
|
|
if not files: |
|
|
|
raise ValueError(f"No parquet files found in directory: {path}") |
|
|
|
# For simplicity, read the first parquet file |
|
|
|
# In production, you might want to read all files and concatenate |
|
|
|
path = files[0] |
|
|
|
# Get all parquet files (handles both single file and directory) |
|
|
|
parquet_files = get_parquet_files(path, fs) |
|
|
|
|
|
|
|
# Read all parquet files and concatenate them |
|
|
|
tables = [] |
|
|
|
for file_path in parquet_files: |
|
|
|
with fs.open(file_path, "rb") as f: |
|
|
|
buffer = io.BytesIO(f.read()) |
|
|
|
buffer.seek(0) |
|
|
|
table = pq.read_table(buffer) |
|
|
|
tables.append(table) |
|
|
|
|
|
|
|
# Concatenate all tables into one |
|
|
|
if len(tables) == 1: |
|
|
|
result = tables[0] |
|
|
|
else: |
|
|
|
result = pa.concat_tables(tables) |
|
|
|
|
|
|
|
with fs.open(path, "rb") as f: |
|
|
|
buffer = io.BytesIO(f.read()) |
|
|
|
buffer.seek(0) |
|
|
|
result = pq.read_table(buffer) |
|
|
|
return True, "s3fs.open+BytesIO+pq.read_table", result.num_rows |
|
|
|
except Exception as e: |
|
|
|
error_msg = f"s3fs.open+BytesIO+pq.read_table: {type(e).__name__}" |
|
|
|
|