diff --git a/test/s3/parquet/s3_parquet_test.py b/test/s3/parquet/s3_parquet_test.py index e41b2040b..629cbf890 100755 --- a/test/s3/parquet/s3_parquet_test.py +++ b/test/s3/parquet/s3_parquet_test.py @@ -157,6 +157,31 @@ def write_with_buffer_and_s3fs(table: pa.Table, path: str, fs: s3fs.S3FileSystem # Read Methods +def get_parquet_files(path: str, fs: s3fs.S3FileSystem) -> list: + """ + Helper to discover all parquet files for a given path. + + Args: + path: S3 path (file or directory) + fs: S3FileSystem instance + + Returns: + List of parquet file paths + + Raises: + ValueError: If no parquet files are found in a directory + """ + if fs.isdir(path): + # Find all parquet files in the directory + files = [f for f in fs.ls(path) if f.endswith('.parquet')] + if not files: + raise ValueError(f"No parquet files found in directory: {path}") + return files + else: + # Single file path + return [path] + + def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: """Read using pads.dataset - handles both single files and directories.""" try: @@ -173,18 +198,22 @@ def read_with_pads_dataset(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: """Read directly via s3fs.open() streaming.""" try: - # Check if path is a directory (common with pads.write_dataset) - if fs.isdir(path): - # Find parquet files in the directory - files = [f for f in fs.ls(path) if f.endswith('.parquet')] - if not files: - raise ValueError(f"No parquet files found in directory: {path}") - # For simplicity, read the first parquet file - # In production, you might want to read all files and concatenate - path = files[0] + # Get all parquet files (handles both single file and directory) + parquet_files = get_parquet_files(path, fs) + + # Read all parquet files and concatenate them + tables = [] + for file_path in parquet_files: + with fs.open(file_path, "rb") as f: + table = pq.read_table(f) + tables.append(table) + + # Concatenate all tables into one + if len(tables) == 1: + result = tables[0] + else: + result = pa.concat_tables(tables) - with fs.open(path, "rb") as f: - result = pq.read_table(f) return True, "s3fs.open+pq.read_table", result.num_rows except Exception as e: error_msg = f"s3fs.open+pq.read_table: {type(e).__name__}" @@ -195,20 +224,24 @@ def read_direct_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: def read_buffered_s3fs(path: str, fs: s3fs.S3FileSystem) -> Tuple[bool, str, int]: """Read via s3fs.open() into buffer, then pq.read_table.""" try: - # Check if path is a directory (common with pads.write_dataset) - if fs.isdir(path): - # Find parquet files in the directory - files = [f for f in fs.ls(path) if f.endswith('.parquet')] - if not files: - raise ValueError(f"No parquet files found in directory: {path}") - # For simplicity, read the first parquet file - # In production, you might want to read all files and concatenate - path = files[0] + # Get all parquet files (handles both single file and directory) + parquet_files = get_parquet_files(path, fs) + + # Read all parquet files and concatenate them + tables = [] + for file_path in parquet_files: + with fs.open(file_path, "rb") as f: + buffer = io.BytesIO(f.read()) + buffer.seek(0) + table = pq.read_table(buffer) + tables.append(table) + + # Concatenate all tables into one + if len(tables) == 1: + result = tables[0] + else: + result = pa.concat_tables(tables) - with fs.open(path, "rb") as f: - buffer = io.BytesIO(f.read()) - buffer.seek(0) - result = pq.read_table(buffer) return True, "s3fs.open+BytesIO+pq.read_table", result.num_rows except Exception as e: error_msg = f"s3fs.open+BytesIO+pq.read_table: {type(e).__name__}"