From 658e5535f3d1613e6fa44b4e95f25be8d389b9b4 Mon Sep 17 00:00:00 2001 From: Drew Short Date: Wed, 15 Apr 2020 01:56:24 -0500 Subject: [PATCH] Added compression and profiles - Compression with basic profiles for jpeg,png,audio,video - Default profile with acceptable compression - Placebo profile with no compression - Aggressive profile with 720p downsample and high compression - AsyncIO compression to max system resources - Configurable compression via profiles --- README.md | 40 ++++--- acm-config.json.example | 89 ++++++++++++++- acm.py | 245 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 337 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index a217476..abff33c 100644 --- a/README.md +++ b/README.md @@ -44,25 +44,37 @@ Additionally the following environment variables need to be populated to interac ### Common Options - `-c, --config`: The config file to read. Default file is `acm-config.json` in the current directory. -- `-x, --context`: The remote bucket to use. For `store` and `retrieve` operations it is `-data`. - `-s, --stdin`: Read the file list to process from stdin. - `-p, --prefix`: The prefix to strip from the input. i.e. `acm.py -x test -p /tmp/data/storage/ check /tmp/data/storage/images/img1.jpg` => `images/img1.jpg` +### S3 Common Options + +- `-x, --context`: The remote bucket to use. For `store` and `retrieve` operations it is `-data`. + +### S3 Commands + +- list - list files in context +- check - find changes in provided files +- match - find matches in provided files +- update - update stored sha256sum values for provided files +- store - put files in data bucket +- retrieve - retrieve files from data bucket + ### Listing Files List all files in a bucket ```bash -$ ./acm.py -x list +$ ./acm.py list -x ``` List all files while adding a prefix and stripping a suffix ```bash -$ ./acm.py -x -p list --suffix +$ ./acm.py -p list -x --suffix ``` List all files with sha256sum compatible output ```bash -$ ./acm.py --context testing --prefix "/tmp/" --stdin list --suffix .json --sha256sum +$ ./acm.py --prefix "/tmp/" --stdin list -x testing --suffix .json --sha256sum ``` Print out a sha256sum compatible check list @@ -73,12 +85,12 @@ Do a comparison of the remote bucket for files with a matching sha256sum value. Process a list of files ```bash -$ ./acm.py -x -p match FILES... +$ ./acm.py -p match -x FILES... ``` Process a list from stdin ```bash -$ find /tmp -name '*.jpg' | ./acm.py -x -p match +$ find /tmp -name '*.jpg' | ./acm.py -p match -x ``` ### Checking For Changes @@ -87,12 +99,12 @@ Do a comparison of the remote bucket for missing files or files with a mismatch Process a list of files ```bash -$ ./acm.py -x -p check FILES... +$ ./acm.py -p check -x FILES... ``` Process a list from stdin ```bash -$ find /tmp -name '*.jpg' | ./acm.py -x -p check +$ find /tmp -name '*.jpg' | ./acm.py -p check -x ``` ### Updating Metadata For Changed Files @@ -101,12 +113,12 @@ Update the remote bucket with new metadata for the listed files. Calculates new Process a list of files ```bash -$ ./acm.py -x -p update FILES... +$ ./acm.py -p update -x FILES... ``` Process a list from stdin ```bash -$ find /tmp -name '*.jpg' | ./acm.py -x -p update +$ find /tmp -name '*.jpg' | ./acm.py -p update -x ``` ### Storing Files @@ -115,12 +127,12 @@ Store the listed files in `-data`. Process a list of files ```bash -$ ./acm.py -x -p store FILES... +$ ./acm.py -p store -x FILES... ``` Process a list from stdin ```bash -$ find /tmp -name '*.jpg' | ./acm.py -x -p store +$ find /tmp -name '*.jpg' | ./acm.py -p store -x ``` ### Retrieving Files @@ -129,12 +141,12 @@ Retrieve remote files matching listed files. Optionally place the downloaded fil Process a list of files ```bash -$ ./acm.py -x -p retrieve [-d ] FILES... +$ ./acm.py -p retrieve -x [-d ] FILES... ``` Process a list from stdin ```bash -$ find /tmp -name '*.jpg' | ./acm.py -x -p retrieve [-d ] +$ find /tmp -name '*.jpg' | ./acm.py -p retrieve -x [-d ] ``` ### Configuring Profiles diff --git a/acm-config.json.example b/acm-config.json.example index 989a7bd..cb37ecf 100644 --- a/acm-config.json.example +++ b/acm-config.json.example @@ -1,4 +1,5 @@ { + "concurrency": 0, "s3": { "secure": false, "host": "127.0.0.1:9000" @@ -6,13 +7,13 @@ "profiles": { "default": { "jpeg": { - "processors": ["mozjpeg"], + "processors": ["cjpeg"], "extensions": [ "jpg", "jpeg" ], "outputExtension": "jpg", - "command": "cjpeg -optimize -quality 75 -progressive -out {output_file} {input_file}" + "command": "cjpeg -optimize -quality 90 -progressive -outfile {{output_file}} {{input_file}}" }, "png": { "processors": ["optipng"], @@ -20,7 +21,7 @@ "png" ], "outputExtension": "png", - "command": "optipng -o2 -strip all -out {output_file} {input_file}}" + "command": "optipng -o2 -strip all -out {{output_file}} {{input_file}}" }, "video": { "processors": ["ffmpeg"], @@ -29,7 +30,7 @@ "webm" ], "outputExtension": "mp4", - "command": "ffmpeg -i {input_file} -vcodec libx264 -crf 24 {output_file}" + "command": "ffmpeg -hide_banner -loglevel panic -i {{input_file}} -vcodec libx264 -crf 20 {{output_file}}" }, "audio": { "processors": ["ffmpeg", "opusenc"], @@ -38,7 +39,85 @@ "mp3" ], "outputExtension": "ogg", - "command": "ffmpeg -hide_banner -loglevel panic -i {input_file} -f wav -| opusenc --quiet --bitrate 64 --vbr --downmix-stereo --discard-comments --discard-pictures - {output_file} >/dev/null 2>&1" + "command": "ffmpeg -hide_banner -loglevel panic -i {{input_file}} -f wav -| opusenc --bitrate 64 --vbr --downmix-stereo --discard-comments --discard-pictures - {{output_file}}" + } + }, + "placebo": { + "jpeg": { + "processors": ["cp"], + "extensions": [ + "jpg", + "jpeg" + ], + "outputExtension": "jpg", + "preserveInputExtension": true, + "command": "cp {{input_file}} {{output_file}}" + }, + "png": { + "processors": ["cp"], + "extensions": [ + "png" + ], + "outputExtension": "png", + "preserveInputExtension": true, + "command": "cp {{input_file}} {{output_file}}" + }, + "video": { + "processors": ["cp"], + "extensions": [ + "mp4", + "webm" + ], + "outputExtension": "mp4", + "preserveInputExtension": true, + "command": "cp {{input_file}} {{output_file}}" + }, + "audio": { + "processors": ["cp"], + "extensions": [ + "wav", + "mp3" + ], + "outputExtension": "ogg", + "preserveInputExtension": true, + "command": "cp {{input_file}} {{output_file}}" + } + }, + "aggressive": { + "jpeg": { + "processors": ["ffmpeg", ""cjpeg"], + "extensions": [ + "jpg", + "jpeg" + ], + "outputExtension": "jpg", + "command": "export FILE={{output_file}} && export TEMP_FILE=${FILE}_tmp.jpg && ffmpeg -i {{input_file}} -vf scale=-1:720 ${TEMP_FILE} && cjpeg -optimize -quality 75 -progressive -outfile {{output_file}} ${TEMP_FILE} && rm ${TEMP_FILE}" + }, + "png": { + "processors": ["optipng"], + "extensions": [ + "png" + ], + "outputExtension": "png", + "command": "optipng -o2 -strip all -out {{output_file}} {{input_file}}" + }, + "video": { + "processors": ["ffmpeg"], + "extensions": [ + "mp4", + "webm" + ], + "outputExtension": "mp4", + "command": "ffmpeg -hide_banner -loglevel panic -i {{input_file}} -vf scale=-1:720 -vcodec libx264 -crf 24 {{output_file}}" + }, + "audio": { + "processors": ["ffmpeg", "opusenc"], + "extensions": [ + "wav", + "mp3" + ], + "outputExtension": "ogg", + "command": "ffmpeg -hide_banner -loglevel panic -i {{input_file}} -f wav -| opusenc --bitrate 64 --vbr --downmix-stereo --discard-comments --discard-pictures - {{output_file}}" } } } diff --git a/acm.py b/acm.py index 5c96a78..d21082c 100644 --- a/acm.py +++ b/acm.py @@ -1,10 +1,12 @@ #!/usr/bin/env python - +import asyncio import hashlib import io import json import os -from typing import List +import platform +import tempfile +from typing import List, Dict, Callable import click from minio import Minio, ResponseError @@ -20,17 +22,104 @@ METADATA_SHA256SUM = "Sha256sum" BUF_SIZE = 4096 +########### +# AsyncIO # +########### + + +async def run_command_shell( + command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, on_success: Callable = ()): + """Run command in subprocess (shell). + + Note: + This can be used if you wish to execute e.g. "copy" + on Windows, which can only be executed in the shell. + """ + process = await asyncio.create_subprocess_shell( + command, stdout=stdout, stderr=stderr + ) + + process_stdout, process_stderr = await process.communicate() + + if process.returncode == 0: + on_success() + + if stdout != asyncio.subprocess.DEVNULL: + result = process_stdout.decode().strip() + return result + else: + return None + + +def make_chunks(tasks, chunk_size): + """Yield successive chunk_size-sized chunks from tasks. + + Note: + Taken from https://stackoverflow.com/a/312464 + modified for python 3 only + """ + for i in range(0, len(tasks), chunk_size): + yield tasks[i: i + chunk_size] + + +def run_asyncio_commands(tasks, max_concurrent_tasks=0): + """Run tasks asynchronously using asyncio and return results. + + If max_concurrent_tasks are set to 0, no limit is applied. + + Note: + By default, Windows uses SelectorEventLoop, which does not support + subprocesses. Therefore ProactorEventLoop is used on Windows. + https://docs.python.org/3/library/asyncio-eventloops.html#windows + """ + all_results = [] + + if max_concurrent_tasks == 0: + chunks = [tasks] + num_chunks = len(chunks) + else: + chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks) + num_chunks = len(list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks))) + + if asyncio.get_event_loop().is_closed(): + asyncio.set_event_loop(asyncio.new_event_loop()) + if platform.system() == "Windows": + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + loop = asyncio.get_event_loop() + + chunk = 1 + for tasks_in_chunk in chunks: + commands = asyncio.gather(*tasks_in_chunk) + results = loop.run_until_complete(commands) + all_results += results + chunk += 1 + + loop.close() + return all_results + + +########### +# Helpers # +########### + + def get_metadata_name(key): return METADATA_PREFIX + 'SHA256SUM'.capitalize() def get_clean_stdin_iterator(stdin_stream): - return (line.strip() for line in stdin_stream if line.strip() != '') + return (line for line in [line.strip() for line in stdin_stream if line.strip() != '']) + + +def strip_prefix(prefix: str, file: str) -> str: + if file.startswith(prefix): + return file.replace(prefix, '') + return file def get_file_identity(ctx_obj, file): if 'PREFIX' in ctx_obj and ctx_obj['PREFIX'] is not None: - path = file.replace(ctx_obj['PREFIX'], '') + path = strip_prefix(ctx_obj['PREFIX'], file) else: path = file @@ -89,30 +178,41 @@ def load_config(path: str) -> any: config['s3']['access'] = os.getenv('ACM_S3_ACCESS') config['s3']['secret'] = os.getenv('ACM_S3_SECRET') + # Setup concurrency + if 'concurrency' in config: + config['concurrency'] = abs(int(config['concurrency'])) + else: + config['concurrency'] = 0 + return config @click.group() @click.option('-d', '--debug/--no-debug', default=False) @click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True) -@click.option('-x', '--context', required=True) @click.option('-s', '--stdin/--no-stdin', default=False) @click.option('-p', '--prefix', default=None) @click.pass_context -def cli(ctx, debug, config, context, stdin, prefix): +def cli(ctx, debug, config, stdin, prefix): ctx.ensure_object(dict) ctx.obj['DEBUG'] = debug ctx.obj['CONFIG'] = load_config(config) - ctx.obj['CONTEXT'] = context ctx.obj['READ_STDIN'] = stdin ctx.obj['PREFIX'] = prefix +############################### +# S3 Storage Focused Commands # +############################### + + @cli.command(name="list") @click.option('--sha256sum/--no-sha256sum', default=False) @click.option('--suffix', default=None) +@click.option('-x', '--context', required=True) @click.pass_context -def list_files(ctx, sha256sum, suffix): +def list_files(ctx, context, sha256sum, suffix): + ctx.obj['CONTEXT'] = context s3_config = ctx.obj['CONFIG']['s3'] s3_bucket = ctx.obj['CONTEXT'] @@ -151,9 +251,11 @@ def list_files(ctx, sha256sum, suffix): @cli.command(name="match") -@click.pass_context +@click.option('-x', '--context', required=True) @click.argument('files', nargs=-1) -def check_matched_files_hashes(ctx, files): +@click.pass_context +def check_matched_files_hashes(ctx, context, files): + ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) matching_files: List[str] = [] @@ -175,9 +277,11 @@ def check_matched_files_hashes(ctx, files): @cli.command(name="check") -@click.pass_context +@click.option('-x', '--context', required=True) @click.argument('files', nargs=-1) -def check_changed_files_hashes(ctx, files): +@click.pass_context +def check_changed_files_hashes(ctx, context, files): + ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) changed_files: List[str] = [] @@ -199,9 +303,11 @@ def check_changed_files_hashes(ctx, files): @cli.command(name="update") -@click.pass_context +@click.option('-x', '--context', required=True) @click.argument('files', nargs=-1) -def update_changed_files_hashes(ctx, files): +@click.pass_context +def update_changed_files_hashes(ctx, context, files): + ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) updated_files: List[str] = [] @@ -241,9 +347,11 @@ def update_changed_files_hashes(ctx, files): @cli.command(name="store") -@click.pass_context +@click.option('-x', '--context', required=True) @click.argument('files', nargs=-1) -def store_files(ctx, files): +@click.pass_context +def store_files(ctx, context, files): + ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) stored_files: List[str] = [] @@ -267,10 +375,12 @@ def store_files(ctx, files): @cli.command(name="retrieve") -@click.pass_context +@click.option('-x', '--context', required=True) @click.option('-d', '--destination', default=None) @click.argument('files', nargs=-1) -def retrieve_files(ctx, destination, files): +@click.pass_context +def retrieve_files(ctx, context, destination, files): + ctx.obj['CONTEXT'] = context s3_bucket, s3 = prep_s3(ctx) retrieved_files: List[str] = [] @@ -295,5 +405,104 @@ def retrieve_files(ctx, destination, files): print(os.linesep.join(retrieved_files)) +###################################### +# Asset Compression Focused Commands # +###################################### + + +@cli.command(name="compress") +@click.option('-p', '--profile', default='default') +@click.option('-c', '--content', default='all') +@click.option('-d', '--destination', default=None) +@click.argument('files', nargs=-1) +@click.pass_context +def compress_assets(ctx, profile, content, destination, files): + profiles = ctx.obj['CONFIG']['profiles'] + + if profile not in profiles: + raise ValueError(f'Unrecognized profile: {profile}') + + default_profile: Dict[str, any] = profiles['default'] + profile: Dict[str, any] = profiles[profile] + + if content != 'all': + if content not in profile and content not in default_profile: + raise ValueError(f'Unrecognized content: {content}') + + content_configurations = [] + + if content == 'all': + content_names: set = set() + + for content_name in profile.keys(): + content_names.add(content_name) + content_configurations.append(profile[content_name]) + + for content_name in default_profile.keys(): + if content_name not in content_names: + content_names.add(content_name) + content_configurations.append(default_profile[content_name]) + else: + if content in profile: + content_configurations.append(profile[content]) + else: + content_configurations.append(default_profile[content]) + + if ctx.obj['READ_STDIN']: + files = get_clean_stdin_iterator(click.get_text_stream('stdin')) + + if destination is None: + destination = tempfile.mkdtemp() + + compressed_files = [] + tasks = [] + + def store_filename(storage_list: List[str], filename: str): + """ + A simple lambda wrapper to asynchronously add processed files to the list + + :param storage_list: + :param filename: + :return: + """ + return lambda: storage_list.append(filename) + + for input_file in files: + for content_configuration in content_configurations: + if any([input_file.endswith(extension) for extension in content_configuration['extensions']]): + file = input_file + if 'PREFIX' in ctx.obj and ctx.obj['PREFIX'] is not None: + file = strip_prefix(ctx.obj['PREFIX'], input_file) + + if 'preserveInputExtension' in content_configuration \ + and content_configuration['preserveInputExtension']: + output_file = os.path.join(destination, file) + else: + output_file_without_ext = os.path.splitext(os.path.join(destination, file))[0] + output_file = f'{output_file_without_ext}.{content_configuration["outputExtension"]}' + + output_file_dir = os.path.dirname(output_file) + os.makedirs(output_file_dir, exist_ok=True) + + command: str = content_configuration['command'] \ + .replace('{{input_file}}', f'\'{input_file}\'') \ + .replace('{{output_file}}', f'\'{output_file}\'') + + tasks.append( + run_command_shell( + command, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + on_success=store_filename(compressed_files, output_file) + ) + ) + + results = run_asyncio_commands( + tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency'] + ) + + print(os.linesep.join(compressed_files)) + + if __name__ == '__main__': cli(obj={})