From ed1341a976c8c5bb0a74ccdf1c9692051897335a Mon Sep 17 00:00:00 2001 From: Drew Short Date: Mon, 27 Dec 2021 20:05:00 -0600 Subject: [PATCH] Moved async work into a module --- acm.py | 109 ++++--------------------------------------------- acm/asyncio.py | 81 ++++++++++++++++++++++++++++++++++++ acm/config.py | 18 ++++---- acm/utility.py | 8 ++++ acm/version.py | 31 ++++++++++++++ poetry.lock | 14 ++++++- pyproject.toml | 1 + 7 files changed, 154 insertions(+), 108 deletions(-) create mode 100644 acm/asyncio.py create mode 100644 acm/version.py diff --git a/acm.py b/acm.py index 0c833aa..dcb18f2 100755 --- a/acm.py +++ b/acm.py @@ -16,102 +16,19 @@ import click from minio import Minio, InvalidResponseError from minio.error import S3Error -from acm.config import VERSION, get_default_config +from acm.asyncio import make_chunks, run_asyncio_commands, run_command_shell +from acm.config import get_default_config from acm.logging import setup_basic_logging, update_logging_level -from acm.utility import get_string_sha256sum - -# Size of the buffer to read files with -BUF_SIZE = 4096 +from acm.utility import get_file_sha256sum, get_string_sha256sum +from acm.version import VERSION LOG = setup_basic_logging("acm") -########### -# AsyncIO # -########### - - -async def run_command_shell( - command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - on_success: List[Callable] = [()] -): - """Run command in subprocess (shell). - - Note: - This can be used if you wish to execute e.g. "copy" - on Windows, which can only be executed in the shell. - """ - process = await asyncio.create_subprocess_shell( - command, stdout=stdout, stderr=stderr - ) - - process_stdout, process_stderr = await process.communicate() - - if process.returncode == 0: - for success_callable in on_success: - success_callable() - - if stdout != asyncio.subprocess.DEVNULL: - result = process_stdout.decode().strip() - return result - else: - return None - - -def make_chunks(tasks, chunk_size): - """Yield successive chunk_size-sized chunks from tasks. - - Note: - Taken from https://stackoverflow.com/a/312464 - modified for python 3 only - """ - for i in range(0, len(tasks), chunk_size): - yield tasks[i: i + chunk_size] - - -def run_asyncio_commands(tasks, max_concurrent_tasks=0): - """Run tasks asynchronously using asyncio and return results. - - If max_concurrent_tasks are set to 0, no limit is applied. - - Note: - By default, Windows uses SelectorEventLoop, which does not support - subprocesses. Therefore ProactorEventLoop is used on Windows. - https://docs.python.org/3/library/asyncio-eventloops.html#windows - """ - all_results = [] - - if max_concurrent_tasks == 0: - chunks = [tasks] - num_chunks = len(chunks) - else: - chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks) - num_chunks = len( - list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks))) - - if asyncio.get_event_loop().is_closed(): - asyncio.set_event_loop(asyncio.new_event_loop()) - if platform.system() == "Windows": - asyncio.set_event_loop(asyncio.ProactorEventLoop()) - loop = asyncio.get_event_loop() - - chunk = 1 - for tasks_in_chunk in chunks: - commands = asyncio.gather(*tasks_in_chunk) - results = loop.run_until_complete(commands) - all_results += results - chunk += 1 - - loop.close() - return all_results - ########### # Helpers # ########### - def update(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): @@ -177,14 +94,10 @@ def prep_s3(ctx): return s3_bucket, s3 -def get_file_sha256sum(stored_data, profile, file): +def get_stored_and_computed_sha256sums(stored_data, profile, file): stored_file_hash = stored_data['sha256sum'] stored_profile_hash = stored_data['profileHash'] - sha256sum = hashlib.sha256() - with open(file, 'rb') as f: - for byte_block in iter(lambda: f.read(BUF_SIZE), b""): - sha256sum.update(byte_block) - calculated_file_hash = sha256sum.hexdigest() + calculated_file_hash = get_file_sha256sum(file) return stored_profile_hash, stored_file_hash, calculated_file_hash @@ -385,7 +298,7 @@ def check_matched_files_hashes(ctx, context, print_identity, profile, files): try: file_object = s3.get_object(s3_bucket, file_identity) stored_data = json.load(file_object) - stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum( + stored_profile_hash, stored_file_hash, calculated_file_hash = get_stored_and_computed_sha256sums( stored_data, profile, file) if calculated_file_hash == stored_file_hash \ and ctx.obj['CONFIG']['profileHashes'][profile] == stored_profile_hash: @@ -425,7 +338,7 @@ def check_changed_files_hashes(ctx, context, profile, files): try: file_object = s3.get_object(s3_bucket, file_identity) stored_data = json.load(file_object) - stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum( + stored_profile_hash, stored_file_hash, calculated_file_hash = get_stored_and_computed_sha256sums( stored_data, profile, file) if calculated_file_hash != stored_file_hash \ or ctx.obj['CONFIG']['profileHashes'][profile] != stored_profile_hash: @@ -464,11 +377,7 @@ def update_changed_files_hashes(ctx, context, input_and_identity, profile, files file, identity = file.split('\t') file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: - sha256sum = hashlib.sha256() - with open(file, 'rb') as f: - for byte_block in iter(lambda: f.read(BUF_SIZE), b''): - sha256sum.update(byte_block) - calculated_file_hash = sha256sum.hexdigest() + calculated_file_hash = get_file_sha256sum(file) object_data = { "sourcePath": file, diff --git a/acm/asyncio.py b/acm/asyncio.py new file mode 100644 index 0000000..523abcb --- /dev/null +++ b/acm/asyncio.py @@ -0,0 +1,81 @@ +import asyncio +import logging +import typing + +LOG = logging.getLogger("acm.async") + +async def run_command_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + on_success: typing.List[typing.Callable] = [()] +): + """Run command in subprocess (shell). + + Note: + This can be used if you wish to execute e.g. "copy" + on Windows, which can only be executed in the shell. + """ + process = await asyncio.create_subprocess_shell( + command, stdout=stdout, stderr=stderr + ) + + process_stdout, process_stderr = await process.communicate() + + if process.returncode == 0: + for success_callable in on_success: + success_callable() + + if stdout != asyncio.subprocess.DEVNULL: + result = process_stdout.decode().strip() + return result + else: + return None + + +def make_chunks(tasks, chunk_size): + """Yield successive chunk_size-sized chunks from tasks. + + Note: + Taken from https://stackoverflow.com/a/312464 + modified for python 3 only + """ + for i in range(0, len(tasks), chunk_size): + yield tasks[i: i + chunk_size] + + +def run_asyncio_commands(tasks, max_concurrent_tasks=0): + """Run tasks asynchronously using asyncio and return results. + + If max_concurrent_tasks are set to 0, no limit is applied. + + Note: + By default, Windows uses SelectorEventLoop, which does not support + subprocesses. Therefore ProactorEventLoop is used on Windows. + https://docs.python.org/3/library/asyncio-eventloops.html#windows + """ + all_results = [] + + if max_concurrent_tasks == 0: + chunks = [tasks] + num_chunks = len(chunks) + else: + chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks) + num_chunks = len( + list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks))) + + if asyncio.get_event_loop().is_closed(): + asyncio.set_event_loop(asyncio.new_event_loop()) + if platform.system() == "Windows": + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + loop = asyncio.get_event_loop() + + chunk = 1 + for tasks_in_chunk in chunks: + commands = asyncio.gather(*tasks_in_chunk) + results = loop.run_until_complete(commands) + all_results += results + chunk += 1 + + loop.close() + return all_results \ No newline at end of file diff --git a/acm/config.py b/acm/config.py index 813a862..805d14c 100644 --- a/acm/config.py +++ b/acm/config.py @@ -1,3 +1,4 @@ +import importlib.metadata import json import logging import typing @@ -5,12 +6,10 @@ import typing from pydantic import BaseModel, BaseSettings, validator from acm.utility import get_string_sha256sum, get_string_xor +from acm.version import VERSION LOG = logging.getLogger("acm.config") -# Application Version -VERSION = "2.0.0" - class ACMProfileProcessorOptions(BaseModel): force_preserve_smaller_input: bool = False @@ -50,10 +49,6 @@ class ACMProfile(BaseModel): if v is None: return VERSION - # @validator('processors', always=True) - # def processors_validator(cls, v, values) -> str: - # # Collapse the same named processors into a single processor at the correct index - @validator('signature', always=True) def hash_signature_validator(cls, v, values) -> str: signature_keys = ["name", "version"] @@ -68,6 +63,15 @@ class ACMProfile(BaseModel): return get_string_sha256sum(signature + combined_processor_signature) + def get_processor_names(self) -> typing.List[str]: + return [processor.name for processor in self.processors] + + def get_processor(self, name: str) -> typing.Optional[ACMProfileProcessor]: + for processor in self.processors: + if name == processor.name: + return processor + return None + class ACMS3(BaseModel): secure: bool = False, diff --git a/acm/utility.py b/acm/utility.py index 5fe2d24..923e294 100644 --- a/acm/utility.py +++ b/acm/utility.py @@ -8,6 +8,14 @@ BUF_SIZE = 4096 LOG = logging.getLogger("acm.utility") +def get_file_sha256sum(input_file): + sha256sum = hashlib.sha256() + with open(input_file, 'rb') as f: + for byte_block in iter(lambda: f.read(BUF_SIZE), b""): + sha256sum.update(byte_block) + return sha256sum.hexdigest() + + def get_string_sha256sum(content: str, encoding='utf-8') -> str: sha256sum = hashlib.sha256() with io.BytesIO(content.encode(encoding)) as c: diff --git a/acm/version.py b/acm/version.py new file mode 100644 index 0000000..8e11ed5 --- /dev/null +++ b/acm/version.py @@ -0,0 +1,31 @@ +import importlib.metadata +import logging +import os +import pathlib + +import toml + +LOG = logging.getLogger("acm.version") + + +def __get_version(): + """ + Automatically determine the version of the application being run + """ + # Attempt to read the installed package information + try: + return importlib.metadata.version('asset-compression-manager') + except importlib.metadata.PackageNotFoundError: + LOG.debug("The package is not installed, reading the version from another source") + + # Fallback on parsing the pyproject.toml file + root_dir = pathlib.Path(__file__).parent.parent.resolve() + with open(os.path.join(root_dir, "pyproject.toml"), "r") as project_file: + project = toml.load(project_file) + return project["tool"]["poetry"]["version"] + + LOG.debug("Falling back on UNKNOWN version identifier") + return "UNKNOWN" + +# Application Version +VERSION = __get_version() diff --git a/poetry.lock b/poetry.lock index c20e2f8..94c4d24 100644 --- a/poetry.lock +++ b/poetry.lock @@ -52,6 +52,14 @@ typing-extensions = ">=3.7.4.3" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] +[[package]] +name = "toml" +version = "0.10.2" +description = "Python Library for Tom's Obvious, Minimal Language" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "typing-extensions" version = "4.0.1" @@ -76,7 +84,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "4b3fe05a6a04674efefc45cfb7ea28502745b735546c46ee3f64ad0e11d42722" +content-hash = "751276ba1ea83218a27169c8d996edf4ae2f3c7a648d012674dd8ac2431508e4" [metadata.files] certifi = [ @@ -119,6 +127,10 @@ pydantic = [ {file = "pydantic-1.8.2-py3-none-any.whl", hash = "sha256:fec866a0b59f372b7e776f2d7308511784dace622e0992a0b59ea3ccee0ae833"}, {file = "pydantic-1.8.2.tar.gz", hash = "sha256:26464e57ccaafe72b7ad156fdaa4e9b9ef051f69e175dbbb463283000c05ab7b"}, ] +toml = [ + {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, + {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, +] typing-extensions = [ {file = "typing_extensions-4.0.1-py3-none-any.whl", hash = "sha256:7f001e5ac290a0c0401508864c7ec868be4e701886d5b573a9528ed3973d9d3b"}, {file = "typing_extensions-4.0.1.tar.gz", hash = "sha256:4ca091dea149f945ec56afb48dae714f21e8692ef22a395223bcd328961b6a0e"}, diff --git a/pyproject.toml b/pyproject.toml index 93bf094..e5ba5ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ python = "^3.8" click = "8.0.3" minio = "7.1.2" pydantic = "1.8.2" +toml = "0.10.2" [tool.poetry.dev-dependencies]