#!/usr/bin/env python3 import asyncio import collections.abc import hashlib import io import json import logging import os import pathlib import platform import sys import tempfile from typing import List, Dict, Callable import click from minio import Minio, InvalidResponseError from minio.error import S3Error from acm.logging import setup_basic_logging, update_logging_level # Size of the buffer to read files with BUF_SIZE = 4096 # Application Version VERSION = "2.0.0" LOG = setup_basic_logging("acm") ########### # AsyncIO # ########### async def run_command_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, on_success: List[Callable] = [()] ): """Run command in subprocess (shell). Note: This can be used if you wish to execute e.g. "copy" on Windows, which can only be executed in the shell. """ process = await asyncio.create_subprocess_shell( command, stdout=stdout, stderr=stderr ) process_stdout, process_stderr = await process.communicate() if process.returncode == 0: for success_callable in on_success: success_callable() if stdout != asyncio.subprocess.DEVNULL: result = process_stdout.decode().strip() return result else: return None def make_chunks(tasks, chunk_size): """Yield successive chunk_size-sized chunks from tasks. Note: Taken from https://stackoverflow.com/a/312464 modified for python 3 only """ for i in range(0, len(tasks), chunk_size): yield tasks[i: i + chunk_size] def run_asyncio_commands(tasks, max_concurrent_tasks=0): """Run tasks asynchronously using asyncio and return results. If max_concurrent_tasks are set to 0, no limit is applied. Note: By default, Windows uses SelectorEventLoop, which does not support subprocesses. Therefore ProactorEventLoop is used on Windows. https://docs.python.org/3/library/asyncio-eventloops.html#windows """ all_results = [] if max_concurrent_tasks == 0: chunks = [tasks] num_chunks = len(chunks) else: chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks) num_chunks = len( list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks))) if asyncio.get_event_loop().is_closed(): asyncio.set_event_loop(asyncio.new_event_loop()) if platform.system() == "Windows": asyncio.set_event_loop(asyncio.ProactorEventLoop()) loop = asyncio.get_event_loop() chunk = 1 for tasks_in_chunk in chunks: commands = asyncio.gather(*tasks_in_chunk) results = loop.run_until_complete(commands) all_results += results chunk += 1 loop.close() return all_results ########### # Helpers # ########### def update(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = update(d.get(k, {}), v) else: d[k] = v return d def get_metadata_name(key): return METADATA_PREFIX + 'SHA256SUM'.capitalize() def get_clean_stdin_iterator(stdin_stream): return (line for line in [line.strip() for line in stdin_stream if line.strip() != '']) def strip_prefix(prefix: str, file: str) -> str: if file.startswith(prefix): return file.replace(prefix, '') return file def get_file_identity(ctx_obj, file): if 'REMOVE_PREFIX' in ctx_obj and ctx_obj['REMOVE_PREFIX'] is not None: path = strip_prefix(ctx_obj['REMOVE_PREFIX'], file) else: path = file if os.pathsep != '/': path = '/'.join(path.split(os.pathsep)) return path def list_s3_dir(s3: Minio, bucket: str, prefix: str) -> List[str]: found_files = [] for obj in s3.list_objects_v2(bucket, prefix=prefix): if obj.is_dir: found_files.extend(list_s3_dir(s3, bucket, obj.object_name)) else: found_files.append(obj.object_name) return found_files def get_s3_client(config: any) -> Minio: host = config['host'] secure = config['secure'] access_key = config['access'] secret_key = config['secret'] return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key) def prep_s3(ctx): s3_config = ctx.obj['CONFIG']['s3'] s3_bucket = ctx.obj['CONTEXT'] s3 = get_s3_client(s3_config) if not s3.bucket_exists(s3_bucket): s3.make_bucket(s3_bucket) return s3_bucket, s3 def get_file_sha256sum(stored_data, profile, file): stored_file_hash = stored_data['sha256sum'] stored_profile_hash = stored_data['profileHash'] sha256sum = hashlib.sha256() with open(file, 'rb') as f: for byte_block in iter(lambda: f.read(BUF_SIZE), b""): sha256sum.update(byte_block) calculated_file_hash = sha256sum.hexdigest() return stored_profile_hash, stored_file_hash, calculated_file_hash def get_string_sha256sum(string: str, encoding='utf-8') -> str: sha256sum = hashlib.sha256() with io.BytesIO(json.dumps(string).encode(encoding)) as c: for byte_block in iter(lambda: c.read(BUF_SIZE), b''): sha256sum.update(byte_block) return sha256sum.hexdigest() def add_nested_key(config: Dict[str, any], path: List[str], value: str) -> bool: target = path[0].lower() if len(path) == 1: config[target] = value return True else: if target not in config: config[target] = {} add_nested_key(config[target], path[1:], value) return False def read_env_config(prefix, separator='__') -> any: prefix = prefix+separator env_config = {} environment_variables = [ env for env in os.environ.keys() if env.startswith(prefix)] for env in environment_variables: path = env[len(prefix):].split('__') add_nested_key(env_config, path, os.environ[env]) return env_config def load_config(path: str) -> any: combined_config = {} with open( os.path.join( os.path.dirname(os.path.realpath(__file__)), 'acm-config-default.json'), 'r') as combined_config_file: combined_config = json.load(combined_config_file) config = {} with open(path, 'r') as config_file: config = json.load(config_file) # Setup concurrency if 'concurrency' in config: config['concurrency'] = abs(int(config['concurrency'])) else: config['concurrency'] = 0 update(combined_config, config) update(combined_config, read_env_config('ACM')) # Calculate profiles hash profile_hashes = {} profile_hashes['all'] = get_string_sha256sum( json.dumps(combined_config['profiles'])) for profile in combined_config['profiles'].keys(): profile_hashes[profile] = get_string_sha256sum( json.dumps(combined_config['profiles'][profile])) combined_config['profileHashes'] = profile_hashes return combined_config @click.group() @click.option('-d', '--debug/--no-debug', default=False) @click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True) @click.option('-s', '--stdin/--no-stdin', default=False) @click.option('--remove-prefix', default=None) @click.option('--add-prefix', default=None) @click.pass_context def cli(ctx, debug, config, stdin, remove_prefix, add_prefix): ctx.ensure_object(dict) # Propagate the global configs ctx.obj['DEBUG'] = debug ctx.obj['CONFIG'] = load_config(config) ctx.obj['READ_STDIN'] = stdin ctx.obj['REMOVE_PREFIX'] = remove_prefix ctx.obj['ADD_PREFIX'] = add_prefix if debug: update_logging_level(3, LOG) # Reduce the logging noise for library loggers update_logging_level(0, "asyncio") #################### # Generic Commands # #################### @cli.command(name="config") @click.pass_context def print_config(ctx): """ Print the configuration """ print(json.dumps(ctx.obj['CONFIG'], indent=2, sort_keys=True)) ############################### # S3 Storage Focused Commands # ############################### @cli.command(name="list") @click.option('--sha256sum/--no-sha256sum', default=False) @click.option('--suffix', default=None) @click.option('-x', '--context', required=True) @click.option('--print-identity/--no-print-identity', default=False) @click.pass_context def list_files(ctx, context, sha256sum, suffix, print_identity): """ List all file object in a bucket """ ctx.obj['CONTEXT'] = context s3_config = ctx.obj['CONFIG']['s3'] s3_bucket = ctx.obj['CONTEXT'] LOG.debug(f"connecting to s3 {s3_config['']}") s3 = get_s3_client(s3_config) if not s3.bucket_exists(s3_bucket): s3.make_bucket(s3_bucket) found_files: List[str] = [] found_objects: List[str] = [] for obj in s3.list_objects_v2(s3_bucket, recursive=False): if obj.is_dir: found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name)) else: found_objects.append(obj.object_name) for obj in found_objects: file = obj if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None: file = os.path.join(ctx.obj['REMOVE_PREFIX'], file) if suffix is not None and suffix in file: file = file.replace(suffix, '') file = file.strip() if sha256sum: file_object = s3.get_object(s3_bucket, obj) stored_data = json.load(file_object) sha256sum_value = stored_data['sha256sum'] file = f'{sha256sum_value} {file}' elif print_identity: file_object = s3.get_object(s3_bucket, obj) stored_data = json.load(file_object) found_files.append(stored_data['storedAssetIdentity']) else: found_files.append(file) print(os.linesep.join(found_files)) @cli.command(name="match") @click.option('-x', '--context', required=True) @click.option('--print-identity/--no-print-identity', default=False) @click.option('-p', '--profile', default='all') @click.argument('files', nargs=-1) @click.pass_context def check_matched_files_hashes(ctx, context, print_identity, profile, files): """ List all files that have matching stored sha256sum and profile hash """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) matching_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) for file in files: file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: file_object = s3.get_object(s3_bucket, file_identity) stored_data = json.load(file_object) stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum( stored_data, profile, file) if calculated_file_hash == stored_file_hash \ and ctx.obj['CONFIG']['profileHashes'][profile] == stored_profile_hash: if print_identity: matching_files.append(stored_data['storedAssetIdentity']) else: matching_files.append(file) except S3Error as e: if e.code == "NoSuchKey": continue else: LOG.error(e) except ValueError or InvalidResponseError as e: LOG.error(f'ERROR: {file} {e}') print(os.linesep.join(matching_files)) @cli.command(name="check") @click.option('-x', '--context', required=True) @click.option('-p', '--profile', default='all') @click.argument('files', nargs=-1) @click.pass_context def check_changed_files_hashes(ctx, context, profile, files): """ List all files that do not have a matching sha256sum or profile hash """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) changed_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) for file in files: file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: file_object = s3.get_object(s3_bucket, file_identity) stored_data = json.load(file_object) stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum( stored_data, profile, file) if calculated_file_hash != stored_file_hash \ or ctx.obj['CONFIG']['profileHashes'][profile] != stored_profile_hash: changed_files.append(file) except S3Error as e: if e.code == "NoSuchKey": changed_files.append(file) else: LOG.error(e) except ValueError or InvalidResponseError as e: LOG.error(f'ERROR: {file} {e}') print(os.linesep.join(changed_files)) @cli.command(name="update") @click.option('-x', '--context', required=True) @click.option('--input-and-identity/--no-input-and-identity', default=False) @click.option('-p', '--profile', default='all') @click.argument('files', nargs=-1) @click.pass_context def update_changed_files_hashes(ctx, context, input_and_identity, profile, files): """ Store new data objects for the provided files """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) updated_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) for file in files: identity = None if input_and_identity: file, identity = file.split('\t') file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: sha256sum = hashlib.sha256() with open(file, 'rb') as f: for byte_block in iter(lambda: f.read(BUF_SIZE), b''): sha256sum.update(byte_block) calculated_file_hash = sha256sum.hexdigest() object_data = { "sourcePath": file, "storedAssetIdentity": identity, "identity": file_identity, "sha256sum": calculated_file_hash, "profileHash": ctx.obj['CONFIG']['profileHashes'][profile] } with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data: data.seek(0, os.SEEK_END) data_length = data.tell() data.seek(0) s3.put_object( s3_bucket, file_identity, data, data_length, content_type="application/json", metadata={} ) updated_files.append(file) except ValueError or InvalidResponseError as e: LOG.error(f'ERROR: {file} {e}') print(os.linesep.join(updated_files)) @cli.command(name="store") @click.option('-x', '--context', required=True) @click.argument('files', nargs=-1) @click.pass_context def store_files(ctx, context, files): """ Store specified files in a bucket for retrieval. """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) stored_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) for file in files: file_identity = get_file_identity(ctx.obj, file) try: s3.fput_object( s3_bucket, file_identity, file, content_type="application/octet-stream" ) if 'ADD_PREFIX' in ctx.obj and ctx.obj['ADD_PREFIX'] is not None: stored_files.append(os.path.join( ctx.obj['ADD_PREFIX'], file_identity)) else: stored_files.append(file) except InvalidResponseError as e: LOG.error(f'ERROR: {file} {e}', file=sys.stderr) print(os.linesep.join(stored_files)) @cli.command(name="retrieve") @click.option('-x', '--context', required=True) @click.option('-d', '--destination', default=None) @click.argument('files', nargs=-1) @click.pass_context def retrieve_files(ctx, context, destination, files): """ Retrieve specified files from a bucket """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) retrieved_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) for file in files: file_identity = get_file_identity(ctx.obj, file) file_destination = file if destination is not None: file_destination = os.path.join(destination, file_identity) try: s3.fget_object( s3_bucket, file_identity, file_destination ) retrieved_files.append(file_destination) except S3Error as e: if e.code == "NoSuchKey": LOG.error(f'ERROR: {file_identity} {file_destination} {e}', file=sys.stderr) else: LOG.error(e) except InvalidResponseError as e: LOG.error(f'ERROR: {file_destination} {e}', file=sys.stderr) print(os.linesep.join(retrieved_files)) @cli.command(name="clean") @click.option('-x', '--context', required=True) @click.option('-d', '--context-data', default=None) @click.option('-n', '--dry-run/--no-dry-run', default=False) @click.argument('files', nargs=-1) @click.pass_context def clean_files(ctx, context, context_data, dry_run, files): """ Remove non matching specified files in a bucket for retrieval. """ ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) s3_data_bucket = context_data found_files: List[str] = [] found_data_files: List[str] = [] removed_files: List[str] = [] if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) # Go through and find all matching files for file in files: file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: if s3_data_bucket is not None: file_object = s3.get_object(s3_bucket, file_identity) stored_data = json.load(file_object) stored_data_file_identity = stored_data['storedAssetIdentity'] found_files.append(file_identity) found_data_files.append(stored_data_file_identity) else: file_object = s3.get_object(s3_bucket, file_identity) found_files.append(file_identity) except InvalidResponseError as e: LOG.error(f'ERROR: InvalidResponseError {file_identity} {e}', file=sys.stderr) except S3Error as e: if e.code == "NoSuchKey": LOG.error(f'ERROR: NoSuchKey {file_identity}', file=sys.stderr) else: LOG.error(e) # print(os.linesep.join(found_objects)) # print(os.linesep.join(found_objects)) found_files = set(found_files) found_data_files = set(found_data_files) # Find all objects in s3 bucket found_objects: List[str] = [] for obj in s3.list_objects_v2(s3_bucket, recursive=False): if obj.is_dir: found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name)) else: found_objects.append(obj.object_name) # print(os.linesep.join(found_objects)) found_data_objects: List[str] = [] for obj in s3.list_objects_v2(s3_data_bucket, recursive=False): if obj.is_dir: found_data_objects.extend(list_s3_dir( s3, s3_data_bucket, obj.object_name)) else: found_data_objects.append(obj.object_name) # print(os.linesep.join(found_data_objects)) for file_identity in found_objects: if file_identity not in found_files: if dry_run: removed_files.append(f'{s3_bucket}:{file_identity}') else: try: s3.remove_object(s3_bucket, file_identity) removed_files.append(f'{s3_bucket}:{file_identity}') except InvalidResponseError as e: LOG.error( f'ERROR: {s3_bucket}:{file_identity} {e}', file=sys.stderr) for file_data_identity in found_data_objects: if file_data_identity not in found_data_files: if dry_run: removed_files.append(f'{s3_data_bucket}:{file_data_identity}') else: try: s3.remove_object(s3_data_bucket, file_data_identity) removed_files.append( f'{s3_data_bucket}:{file_data_identity}') except InvalidResponseError as e: LOG.error( f'ERROR: {s3_data_bucket}:{file_data_identity} {e}', file=sys.stderr) print(os.linesep.join(removed_files)) ###################################### # Asset Compression Focused Commands # ###################################### @cli.command(name="compress") @click.option('-p', '--profile', default='default') @click.option('-c', '--content', default='all') @click.option('-d', '--destination', default=None) @click.option('--print-input-and-identity/--no-print-input-and-identity', default=False) @click.argument('files', nargs=-1) @click.pass_context def compress_assets(ctx, profile, content, destination, print_input_and_identity, files): """ Compress the request files and store them in a storage bucket. """ profiles = ctx.obj['CONFIG']['profiles'] if profile not in profiles: raise ValueError(f'Unrecognized profile: {profile}') default_profile: Dict[str, any] = profiles['default'] profile: Dict[str, any] = profiles[profile] if content != 'all': if content not in profile and content not in default_profile: raise ValueError(f'Unrecognized content: {content}') content_configurations = [] if content == 'all': content_names: set = set() for content_name in profile.keys(): content_names.add(content_name) content_configurations.append(profile[content_name]) for content_name in default_profile.keys(): if content_name not in content_names: content_names.add(content_name) content_configurations.append(default_profile[content_name]) else: if content in profile: content_configurations.append(profile[content]) else: content_configurations.append(default_profile[content]) if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin')) if destination is None: destination = tempfile.mkdtemp() task_output = [] tasks = [] follow_up_tasks = [] def store_filename(storage_list: List[str], filename: str): """ A simple lambda wrapper to asynchronously add processed files to the list :param storage_list: :param filename: :return: """ return lambda: storage_list.append(filename) def queue_follow_up_task_if_keep_smaller_input(follow_up_tasks, input_file: str, output_file: str, keep_smaller_input: bool = True): """ A lambda wrapper that handles keeping the smallest of the two files. """ if keep_smaller_input: command = f"cp {input_file} {output_file}" def task(): input_size = os.path.getsize(input_file) output_size = os.path.getsize(output_file) if output_size > input_size: follow_up_tasks.append( run_command_shell( command, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, on_success=[store_filename( task_output, f'Preserved smaller "{input_file}" {output_size} > {input_size}' )] ) ) return task return lambda: True for input_file in files: for content_configuration in content_configurations: if any([input_file.endswith(extension) for extension in content_configuration['extensions']]): file = input_file file_extension = pathlib.Path(input_file).suffix if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None: file = strip_prefix(ctx.obj['REMOVE_PREFIX'], input_file) if 'preserveInputExtension' in content_configuration \ and content_configuration['preserveInputExtension']: output_file = os.path.join(destination, file) else: output_file_without_ext = os.path.splitext( os.path.join(destination, file))[0] output_file = f'{output_file_without_ext}.{content_configuration["outputExtension"]}' output_file_identity = get_file_identity( {'REMOVE_PREFIX': destination}, output_file) output_file_dir = os.path.dirname(output_file) os.makedirs(output_file_dir, exist_ok=True) if 'preserveSmallerInput' in content_configuration: preserve_smaller_input = bool( content_configuration['preserveSmallerInput']) else: preserve_smaller_input = True if 'forcePreserveSmallerInput' in content_configuration: force_preserve_smaller_input = bool( content_configuration['forcePreserveSmallerInput']) else: force_preserve_smaller_input = False # Only preserve the input if requested AND the extensions of the input and the output match preserve_smaller_input = preserve_smaller_input and ( force_preserve_smaller_input or file_extension == content_configuration["outputExtension"]) command: str = content_configuration['command'] \ .replace('{input_file}', f'\'{input_file}\'') \ .replace('{output_file}', f'\'{output_file}\'') tasks.append( run_command_shell( command, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, on_success=[store_filename( task_output, f'{input_file}\t{output_file_identity}' if print_input_and_identity else output_file ), queue_follow_up_task_if_keep_smaller_input( follow_up_tasks, input_file, output_file, preserve_smaller_input )] ) ) results = run_asyncio_commands( tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency'] ) follow_up_results = run_asyncio_commands( follow_up_tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency'] ) print(os.linesep.join(task_output)) if __name__ == '__main__': cli(obj={})