|
|
|
@ -173,6 +173,16 @@ def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, |
|
|
|
def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
"""Read directly via s3fs.open() streaming.""" |
|
|
|
try: |
|
|
|
# Check if path is a directory (common with pads.write_dataset) |
|
|
|
if fs.isdir(path): |
|
|
|
# Find parquet files in the directory |
|
|
|
files = [f for f in fs.ls(path) if f.endswith('.parquet')] |
|
|
|
if not files: |
|
|
|
raise ValueError(f"No parquet files found in directory: {path}") |
|
|
|
# For simplicity, read the first parquet file |
|
|
|
# In production, you might want to read all files and concatenate |
|
|
|
path = files[0] |
|
|
|
|
|
|
|
with fs.open(path, "rb") as f: |
|
|
|
result = pq.read_table(f) |
|
|
|
return True, "s3fs.open+pq.read_table", result.num_rows |
|
|
|
@ -185,6 +195,16 @@ def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
def read_buffered_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: |
|
|
|
"""Read via s3fs.open() into buffer, then pq.read_table.""" |
|
|
|
try: |
|
|
|
# Check if path is a directory (common with pads.write_dataset) |
|
|
|
if fs.isdir(path): |
|
|
|
# Find parquet files in the directory |
|
|
|
files = [f for f in fs.ls(path) if f.endswith('.parquet')] |
|
|
|
if not files: |
|
|
|
raise ValueError(f"No parquet files found in directory: {path}") |
|
|
|
# For simplicity, read the first parquet file |
|
|
|
# In production, you might want to read all files and concatenate |
|
|
|
path = files[0] |
|
|
|
|
|
|
|
with fs.open(path, "rb") as f: |
|
|
|
buffer = io.BytesIO(f.read()) |
|
|
|
buffer.seek(0) |
|
|
|
|