Browse Source

Moved async work into a module

acm-debugging-and-enhancements
Drew Short 3 years ago
parent
commit
ed1341a976
  1. 109
      acm.py
  2. 81
      acm/asyncio.py
  3. 18
      acm/config.py
  4. 8
      acm/utility.py
  5. 31
      acm/version.py
  6. 14
      poetry.lock
  7. 1
      pyproject.toml

109
acm.py

@ -16,102 +16,19 @@ import click
from minio import Minio, InvalidResponseError
from minio.error import S3Error
from acm.config import VERSION, get_default_config
from acm.asyncio import make_chunks, run_asyncio_commands, run_command_shell
from acm.config import get_default_config
from acm.logging import setup_basic_logging, update_logging_level
from acm.utility import get_string_sha256sum
# Size of the buffer to read files with
BUF_SIZE = 4096
from acm.utility import get_file_sha256sum, get_string_sha256sum
from acm.version import VERSION
LOG = setup_basic_logging("acm")
###########
# AsyncIO #
###########
async def run_command_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
on_success: List[Callable] = [()]
):
"""Run command in subprocess (shell).
Note:
This can be used if you wish to execute e.g. "copy"
on Windows, which can only be executed in the shell.
"""
process = await asyncio.create_subprocess_shell(
command, stdout=stdout, stderr=stderr
)
process_stdout, process_stderr = await process.communicate()
if process.returncode == 0:
for success_callable in on_success:
success_callable()
if stdout != asyncio.subprocess.DEVNULL:
result = process_stdout.decode().strip()
return result
else:
return None
def make_chunks(tasks, chunk_size):
"""Yield successive chunk_size-sized chunks from tasks.
Note:
Taken from https://stackoverflow.com/a/312464
modified for python 3 only
"""
for i in range(0, len(tasks), chunk_size):
yield tasks[i: i + chunk_size]
def run_asyncio_commands(tasks, max_concurrent_tasks=0):
"""Run tasks asynchronously using asyncio and return results.
If max_concurrent_tasks are set to 0, no limit is applied.
Note:
By default, Windows uses SelectorEventLoop, which does not support
subprocesses. Therefore ProactorEventLoop is used on Windows.
https://docs.python.org/3/library/asyncio-eventloops.html#windows
"""
all_results = []
if max_concurrent_tasks == 0:
chunks = [tasks]
num_chunks = len(chunks)
else:
chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)
num_chunks = len(
list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)))
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
if platform.system() == "Windows":
asyncio.set_event_loop(asyncio.ProactorEventLoop())
loop = asyncio.get_event_loop()
chunk = 1
for tasks_in_chunk in chunks:
commands = asyncio.gather(*tasks_in_chunk)
results = loop.run_until_complete(commands)
all_results += results
chunk += 1
loop.close()
return all_results
###########
# Helpers #
###########
def update(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
@ -177,14 +94,10 @@ def prep_s3(ctx):
return s3_bucket, s3
def get_file_sha256sum(stored_data, profile, file):
def get_stored_and_computed_sha256sums(stored_data, profile, file):
stored_file_hash = stored_data['sha256sum']
stored_profile_hash = stored_data['profileHash']
sha256sum = hashlib.sha256()
with open(file, 'rb') as f:
for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
sha256sum.update(byte_block)
calculated_file_hash = sha256sum.hexdigest()
calculated_file_hash = get_file_sha256sum(file)
return stored_profile_hash, stored_file_hash, calculated_file_hash
@ -385,7 +298,7 @@ def check_matched_files_hashes(ctx, context, print_identity, profile, files):
try:
file_object = s3.get_object(s3_bucket, file_identity)
stored_data = json.load(file_object)
stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(
stored_profile_hash, stored_file_hash, calculated_file_hash = get_stored_and_computed_sha256sums(
stored_data, profile, file)
if calculated_file_hash == stored_file_hash \
and ctx.obj['CONFIG']['profileHashes'][profile] == stored_profile_hash:
@ -425,7 +338,7 @@ def check_changed_files_hashes(ctx, context, profile, files):
try:
file_object = s3.get_object(s3_bucket, file_identity)
stored_data = json.load(file_object)
stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(
stored_profile_hash, stored_file_hash, calculated_file_hash = get_stored_and_computed_sha256sums(
stored_data, profile, file)
if calculated_file_hash != stored_file_hash \
or ctx.obj['CONFIG']['profileHashes'][profile] != stored_profile_hash:
@ -464,11 +377,7 @@ def update_changed_files_hashes(ctx, context, input_and_identity, profile, files
file, identity = file.split('\t')
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
try:
sha256sum = hashlib.sha256()
with open(file, 'rb') as f:
for byte_block in iter(lambda: f.read(BUF_SIZE), b''):
sha256sum.update(byte_block)
calculated_file_hash = sha256sum.hexdigest()
calculated_file_hash = get_file_sha256sum(file)
object_data = {
"sourcePath": file,

81
acm/asyncio.py

@ -0,0 +1,81 @@
import asyncio
import logging
import typing
LOG = logging.getLogger("acm.async")
async def run_command_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
on_success: typing.List[typing.Callable] = [()]
):
"""Run command in subprocess (shell).
Note:
This can be used if you wish to execute e.g. "copy"
on Windows, which can only be executed in the shell.
"""
process = await asyncio.create_subprocess_shell(
command, stdout=stdout, stderr=stderr
)
process_stdout, process_stderr = await process.communicate()
if process.returncode == 0:
for success_callable in on_success:
success_callable()
if stdout != asyncio.subprocess.DEVNULL:
result = process_stdout.decode().strip()
return result
else:
return None
def make_chunks(tasks, chunk_size):
"""Yield successive chunk_size-sized chunks from tasks.
Note:
Taken from https://stackoverflow.com/a/312464
modified for python 3 only
"""
for i in range(0, len(tasks), chunk_size):
yield tasks[i: i + chunk_size]
def run_asyncio_commands(tasks, max_concurrent_tasks=0):
"""Run tasks asynchronously using asyncio and return results.
If max_concurrent_tasks are set to 0, no limit is applied.
Note:
By default, Windows uses SelectorEventLoop, which does not support
subprocesses. Therefore ProactorEventLoop is used on Windows.
https://docs.python.org/3/library/asyncio-eventloops.html#windows
"""
all_results = []
if max_concurrent_tasks == 0:
chunks = [tasks]
num_chunks = len(chunks)
else:
chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)
num_chunks = len(
list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)))
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
if platform.system() == "Windows":
asyncio.set_event_loop(asyncio.ProactorEventLoop())
loop = asyncio.get_event_loop()
chunk = 1
for tasks_in_chunk in chunks:
commands = asyncio.gather(*tasks_in_chunk)
results = loop.run_until_complete(commands)
all_results += results
chunk += 1
loop.close()
return all_results

18
acm/config.py

@ -1,3 +1,4 @@
import importlib.metadata
import json
import logging
import typing
@ -5,12 +6,10 @@ import typing
from pydantic import BaseModel, BaseSettings, validator
from acm.utility import get_string_sha256sum, get_string_xor
from acm.version import VERSION
LOG = logging.getLogger("acm.config")
# Application Version
VERSION = "2.0.0"
class ACMProfileProcessorOptions(BaseModel):
force_preserve_smaller_input: bool = False
@ -50,10 +49,6 @@ class ACMProfile(BaseModel):
if v is None:
return VERSION
# @validator('processors', always=True)
# def processors_validator(cls, v, values) -> str:
# # Collapse the same named processors into a single processor at the correct index
@validator('signature', always=True)
def hash_signature_validator(cls, v, values) -> str:
signature_keys = ["name", "version"]
@ -68,6 +63,15 @@ class ACMProfile(BaseModel):
return get_string_sha256sum(signature + combined_processor_signature)
def get_processor_names(self) -> typing.List[str]:
return [processor.name for processor in self.processors]
def get_processor(self, name: str) -> typing.Optional[ACMProfileProcessor]:
for processor in self.processors:
if name == processor.name:
return processor
return None
class ACMS3(BaseModel):
secure: bool = False,

8
acm/utility.py

@ -8,6 +8,14 @@ BUF_SIZE = 4096
LOG = logging.getLogger("acm.utility")
def get_file_sha256sum(input_file):
sha256sum = hashlib.sha256()
with open(input_file, 'rb') as f:
for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
sha256sum.update(byte_block)
return sha256sum.hexdigest()
def get_string_sha256sum(content: str, encoding='utf-8') -> str:
sha256sum = hashlib.sha256()
with io.BytesIO(content.encode(encoding)) as c:

31
acm/version.py

@ -0,0 +1,31 @@
import importlib.metadata
import logging
import os
import pathlib
import toml
LOG = logging.getLogger("acm.version")
def __get_version():
"""
Automatically determine the version of the application being run
"""
# Attempt to read the installed package information
try:
return importlib.metadata.version('asset-compression-manager')
except importlib.metadata.PackageNotFoundError:
LOG.debug("The package is not installed, reading the version from another source")
# Fallback on parsing the pyproject.toml file
root_dir = pathlib.Path(__file__).parent.parent.resolve()
with open(os.path.join(root_dir, "pyproject.toml"), "r") as project_file:
project = toml.load(project_file)
return project["tool"]["poetry"]["version"]
LOG.debug("Falling back on UNKNOWN version identifier")
return "UNKNOWN"
# Application Version
VERSION = __get_version()

14
poetry.lock

@ -52,6 +52,14 @@ typing-extensions = ">=3.7.4.3"
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]]
name = "toml"
version = "0.10.2"
description = "Python Library for Tom's Obvious, Minimal Language"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "typing-extensions"
version = "4.0.1"
@ -76,7 +84,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "4b3fe05a6a04674efefc45cfb7ea28502745b735546c46ee3f64ad0e11d42722"
content-hash = "751276ba1ea83218a27169c8d996edf4ae2f3c7a648d012674dd8ac2431508e4"
[metadata.files]
certifi = [
@ -119,6 +127,10 @@ pydantic = [
{file = "pydantic-1.8.2-py3-none-any.whl", hash = "sha256:fec866a0b59f372b7e776f2d7308511784dace622e0992a0b59ea3ccee0ae833"},
{file = "pydantic-1.8.2.tar.gz", hash = "sha256:26464e57ccaafe72b7ad156fdaa4e9b9ef051f69e175dbbb463283000c05ab7b"},
]
toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
]
typing-extensions = [
{file = "typing_extensions-4.0.1-py3-none-any.whl", hash = "sha256:7f001e5ac290a0c0401508864c7ec868be4e701886d5b573a9528ed3973d9d3b"},
{file = "typing_extensions-4.0.1.tar.gz", hash = "sha256:4ca091dea149f945ec56afb48dae714f21e8692ef22a395223bcd328961b6a0e"},

1
pyproject.toml

@ -10,6 +10,7 @@ python = "^3.8"
click = "8.0.3"
minio = "7.1.2"
pydantic = "1.8.2"
toml = "0.10.2"
[tool.poetry.dev-dependencies]

Loading…
Cancel
Save