You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
626 lines
20 KiB
626 lines
20 KiB
#!/usr/bin/env python
|
|
import asyncio
|
|
import collections.abc
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import platform
|
|
import sys
|
|
import tempfile
|
|
from typing import List, Dict, Callable
|
|
|
|
import click
|
|
from minio import Minio, ResponseError
|
|
from minio.error import NoSuchKey
|
|
|
|
# Size of the buffer to read files with
|
|
BUF_SIZE = 4096
|
|
|
|
|
|
###########
|
|
# AsyncIO #
|
|
###########
|
|
|
|
|
|
async def run_command_shell(
|
|
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, on_success: Callable = ()):
|
|
"""Run command in subprocess (shell).
|
|
|
|
Note:
|
|
This can be used if you wish to execute e.g. "copy"
|
|
on Windows, which can only be executed in the shell.
|
|
"""
|
|
process = await asyncio.create_subprocess_shell(
|
|
command, stdout=stdout, stderr=stderr
|
|
)
|
|
|
|
process_stdout, process_stderr = await process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
on_success()
|
|
|
|
if stdout != asyncio.subprocess.DEVNULL:
|
|
result = process_stdout.decode().strip()
|
|
return result
|
|
else:
|
|
return None
|
|
|
|
|
|
def make_chunks(tasks, chunk_size):
|
|
"""Yield successive chunk_size-sized chunks from tasks.
|
|
|
|
Note:
|
|
Taken from https://stackoverflow.com/a/312464
|
|
modified for python 3 only
|
|
"""
|
|
for i in range(0, len(tasks), chunk_size):
|
|
yield tasks[i: i + chunk_size]
|
|
|
|
|
|
def run_asyncio_commands(tasks, max_concurrent_tasks=0):
|
|
"""Run tasks asynchronously using asyncio and return results.
|
|
|
|
If max_concurrent_tasks are set to 0, no limit is applied.
|
|
|
|
Note:
|
|
By default, Windows uses SelectorEventLoop, which does not support
|
|
subprocesses. Therefore ProactorEventLoop is used on Windows.
|
|
https://docs.python.org/3/library/asyncio-eventloops.html#windows
|
|
"""
|
|
all_results = []
|
|
|
|
if max_concurrent_tasks == 0:
|
|
chunks = [tasks]
|
|
num_chunks = len(chunks)
|
|
else:
|
|
chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)
|
|
num_chunks = len(list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)))
|
|
|
|
if asyncio.get_event_loop().is_closed():
|
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
|
if platform.system() == "Windows":
|
|
asyncio.set_event_loop(asyncio.ProactorEventLoop())
|
|
loop = asyncio.get_event_loop()
|
|
|
|
chunk = 1
|
|
for tasks_in_chunk in chunks:
|
|
commands = asyncio.gather(*tasks_in_chunk)
|
|
results = loop.run_until_complete(commands)
|
|
all_results += results
|
|
chunk += 1
|
|
|
|
loop.close()
|
|
return all_results
|
|
|
|
|
|
###########
|
|
# Helpers #
|
|
###########
|
|
|
|
|
|
def update(d, u):
|
|
for k, v in u.items():
|
|
if isinstance(v, collections.abc.Mapping):
|
|
d[k] = update(d.get(k, {}), v)
|
|
else:
|
|
d[k] = v
|
|
return d
|
|
|
|
|
|
def get_metadata_name(key):
|
|
return METADATA_PREFIX + 'SHA256SUM'.capitalize()
|
|
|
|
|
|
def get_clean_stdin_iterator(stdin_stream):
|
|
return (line for line in [line.strip() for line in stdin_stream if line.strip() != ''])
|
|
|
|
|
|
def strip_prefix(prefix: str, file: str) -> str:
|
|
if file.startswith(prefix):
|
|
return file.replace(prefix, '')
|
|
return file
|
|
|
|
|
|
def get_file_identity(ctx_obj, file):
|
|
if 'REMOVE_PREFIX' in ctx_obj and ctx_obj['REMOVE_PREFIX'] is not None:
|
|
path = strip_prefix(ctx_obj['REMOVE_PREFIX'], file)
|
|
else:
|
|
path = file
|
|
|
|
if os.pathsep != '/':
|
|
path = '/'.join(path.split(os.pathsep))
|
|
|
|
return path
|
|
|
|
|
|
def list_s3_dir(s3: Minio, bucket: str, prefix: str) -> List[str]:
|
|
found_files = []
|
|
for obj in s3.list_objects_v2(bucket, prefix=prefix):
|
|
if obj.is_dir:
|
|
found_files.extend(list_s3_dir(s3, bucket, obj.object_name))
|
|
else:
|
|
found_files.append(obj.object_name)
|
|
return found_files
|
|
|
|
|
|
def get_s3_client(config: any) -> Minio:
|
|
host = config['host']
|
|
secure = config['secure']
|
|
access_key = config['access']
|
|
secret_key = config['secret']
|
|
return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key)
|
|
|
|
|
|
def prep_s3(ctx):
|
|
s3_config = ctx.obj['CONFIG']['s3']
|
|
s3_bucket = ctx.obj['CONTEXT']
|
|
|
|
s3 = get_s3_client(s3_config)
|
|
|
|
if not s3.bucket_exists(s3_bucket):
|
|
s3.make_bucket(s3_bucket)
|
|
|
|
return s3_bucket, s3
|
|
|
|
|
|
def get_file_sha256sum(stored_data, profile, file):
|
|
stored_file_hash = stored_data['sha256sum']
|
|
stored_profile_hash = stored_data['profileHash']
|
|
sha256sum = hashlib.sha256()
|
|
with open(file, 'rb') as f:
|
|
for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
|
|
sha256sum.update(byte_block)
|
|
calculated_file_hash = sha256sum.hexdigest()
|
|
return stored_profile_hash, stored_file_hash, calculated_file_hash
|
|
|
|
|
|
def get_string_sha256sum(string: str, encoding='utf-8') -> str:
|
|
sha256sum = hashlib.sha256()
|
|
with io.BytesIO(json.dumps(string).encode(encoding)) as c:
|
|
for byte_block in iter(lambda: c.read(BUF_SIZE), b''):
|
|
sha256sum.update(byte_block)
|
|
return sha256sum.hexdigest()
|
|
|
|
|
|
def add_nested_key(config: Dict[str, any], path: List[str], value: str) -> bool:
|
|
target = path[0].lower()
|
|
if len(path) == 1:
|
|
config[target] = value
|
|
return True
|
|
else:
|
|
if target not in config:
|
|
config[target] = {}
|
|
add_nested_key(config[target], path[1:],value)
|
|
return False
|
|
|
|
|
|
def read_env_config(prefix, separator='__') -> any:
|
|
prefix = prefix+separator
|
|
env_config = {}
|
|
|
|
environment_variables = [env for env in os.environ.keys() if env.startswith(prefix)]
|
|
|
|
for env in environment_variables:
|
|
path = env[len(prefix):].split('__')
|
|
add_nested_key(env_config, path, os.environ[env])
|
|
|
|
return env_config
|
|
|
|
def load_config(path: str) -> any:
|
|
combined_config = {}
|
|
with open(
|
|
os.path.join(
|
|
os.path.dirname(os.path.realpath(__file__)),
|
|
'acm-config-default.json'),
|
|
'r') as combined_config_file:
|
|
combined_config = json.load(combined_config_file)
|
|
|
|
|
|
config = {}
|
|
with open(path, 'r') as config_file:
|
|
config = json.load(config_file)
|
|
|
|
# Setup S3 Settings
|
|
config['s3']['access'] = os.getenv('ACM_S3_ACCESS')
|
|
config['s3']['secret'] = os.getenv('ACM_S3_SECRET')
|
|
|
|
# Setup concurrency
|
|
if 'concurrency' in config:
|
|
config['concurrency'] = abs(int(config['concurrency']))
|
|
else:
|
|
config['concurrency'] = 0
|
|
|
|
update(combined_config, config)
|
|
update(combined_config, read_env_config('ACM'))
|
|
|
|
# Calculate profiles hash
|
|
profile_hashes={}
|
|
profile_hashes['all'] = get_string_sha256sum(json.dumps(combined_config['profiles']))
|
|
for profile in combined_config['profiles'].keys():
|
|
profile_hashes[profile] = get_string_sha256sum(json.dumps(combined_config['profiles'][profile]))
|
|
|
|
combined_config['profileHashes'] = profile_hashes
|
|
|
|
return combined_config
|
|
|
|
|
|
@click.group()
|
|
@click.option('-d', '--debug/--no-debug', default=False)
|
|
@click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True)
|
|
@click.option('-s', '--stdin/--no-stdin', default=False)
|
|
@click.option('--remove-prefix', default=None)
|
|
@click.option('--add-prefix', default=None)
|
|
@click.pass_context
|
|
def cli(ctx, debug, config, stdin, remove_prefix, add_prefix):
|
|
ctx.ensure_object(dict)
|
|
ctx.obj['DEBUG'] = debug
|
|
ctx.obj['CONFIG'] = load_config(config)
|
|
ctx.obj['READ_STDIN'] = stdin
|
|
ctx.obj['REMOVE_PREFIX'] = remove_prefix
|
|
ctx.obj['ADD_PREFIX'] = add_prefix
|
|
|
|
|
|
####################
|
|
# Generic Commands #
|
|
####################
|
|
|
|
|
|
@cli.command(name="config")
|
|
@click.pass_context
|
|
def print_config(ctx):
|
|
print(json.dumps(ctx.obj['CONFIG'], indent=2, sort_keys=True))
|
|
|
|
|
|
###############################
|
|
# S3 Storage Focused Commands #
|
|
###############################
|
|
|
|
|
|
@cli.command(name="list")
|
|
@click.option('--sha256sum/--no-sha256sum', default=False)
|
|
@click.option('--suffix', default=None)
|
|
@click.option('-x', '--context', required=True)
|
|
@click.pass_context
|
|
def list_files(ctx, context, sha256sum, suffix):
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_config = ctx.obj['CONFIG']['s3']
|
|
s3_bucket = ctx.obj['CONTEXT']
|
|
|
|
s3 = get_s3_client(s3_config)
|
|
|
|
if not s3.bucket_exists(s3_bucket):
|
|
s3.make_bucket(s3_bucket)
|
|
|
|
found_files: List[str] = []
|
|
found_objects: List[str] = []
|
|
|
|
for obj in s3.list_objects_v2(s3_bucket, recursive=False):
|
|
if obj.is_dir:
|
|
found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name))
|
|
else:
|
|
found_objects.append(obj.object_name)
|
|
|
|
for obj in found_objects:
|
|
file = obj
|
|
if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
|
|
file = os.path.join(ctx.obj['REMOVE_PREFIX'], file)
|
|
|
|
if suffix is not None and suffix in file:
|
|
file = file.replace(suffix, '')
|
|
|
|
file = file.strip()
|
|
|
|
if sha256sum:
|
|
stat = s3.stat_object(s3_bucket, obj)
|
|
sha256sum_value = stat.metadata[get_metadata_name("SHA256SUM")]
|
|
file = f'{sha256sum_value} {file}'
|
|
|
|
found_files.append(file)
|
|
|
|
print(os.linesep.join(found_files))
|
|
|
|
|
|
@cli.command(name="match")
|
|
@click.option('-x', '--context', required=True)
|
|
@click.option('--print-identity/--no-print-identity', default=False)
|
|
@click.option('-p', '--profile', default='all')
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def check_matched_files_hashes(ctx, context, print_identity, profile, files):
|
|
"""
|
|
List all files that have matching stored sha256sum and profile hash
|
|
"""
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_bucket, s3 = prep_s3(ctx)
|
|
matching_files: List[str] = []
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
for file in files:
|
|
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
|
|
try:
|
|
file_object = s3.get_object(s3_bucket, file_identity)
|
|
stored_data = json.load(file_object)
|
|
stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, profile, file)
|
|
if calculated_file_hash == stored_file_hash \
|
|
and ctx.obj['CONFIG']['profileHashes'][profile] == stored_profile_hash:
|
|
if print_identity:
|
|
matching_files.append(stored_data['storedAssetIdentity'])
|
|
else:
|
|
matching_files.append(file)
|
|
except NoSuchKey as e:
|
|
continue
|
|
except ValueError or ResponseError as e:
|
|
print(f'ERROR: {file} {e}')
|
|
|
|
print(os.linesep.join(matching_files))
|
|
|
|
|
|
@cli.command(name="check")
|
|
@click.option('-x', '--context', required=True)
|
|
@click.option('-p', '--profile', default='all')
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def check_changed_files_hashes(ctx, context, profile, files):
|
|
"""
|
|
List all files that do not have a matching sha256sum or profile hash
|
|
"""
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_bucket, s3 = prep_s3(ctx)
|
|
changed_files: List[str] = []
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
for file in files:
|
|
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
|
|
try:
|
|
file_object = s3.get_object(s3_bucket, file_identity)
|
|
stored_data = json.load(file_object)
|
|
stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, profile, file)
|
|
if calculated_file_hash != stored_file_hash \
|
|
or ctx.obj['CONFIG']['profileHashes'][profile] != stored_profile_hash:
|
|
changed_files.append(file)
|
|
except NoSuchKey as e:
|
|
changed_files.append(file)
|
|
except ValueError or ResponseError as e:
|
|
print(f'ERROR: {file} {e}')
|
|
|
|
print(os.linesep.join(changed_files))
|
|
|
|
|
|
@cli.command(name="update")
|
|
@click.option('-x', '--context', required=True)
|
|
@click.option('--input-and-identity/--no-input-and-identity', default=False)
|
|
@click.option('-p', '--profile', default='all')
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def update_changed_files_hashes(ctx, context, input_and_identity, profile, files):
|
|
"""
|
|
Store new data objects for the provided files
|
|
"""
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_bucket, s3 = prep_s3(ctx)
|
|
updated_files: List[str] = []
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
for file in files:
|
|
identity = None
|
|
if input_and_identity:
|
|
file, identity = file.split('\t')
|
|
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
|
|
try:
|
|
sha256sum = hashlib.sha256()
|
|
with open(file, 'rb') as f:
|
|
for byte_block in iter(lambda: f.read(BUF_SIZE), b''):
|
|
sha256sum.update(byte_block)
|
|
calculated_file_hash = sha256sum.hexdigest()
|
|
|
|
object_data = {
|
|
"sourcePath": file,
|
|
"storedAssetIdentity": identity,
|
|
"identity": file_identity,
|
|
"sha256sum": calculated_file_hash,
|
|
"profileHash": ctx.obj['CONFIG']['profileHashes'][profile]
|
|
}
|
|
|
|
with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data:
|
|
data.seek(0, os.SEEK_END)
|
|
data_length = data.tell()
|
|
data.seek(0)
|
|
s3.put_object(
|
|
s3_bucket,
|
|
file_identity,
|
|
data,
|
|
data_length,
|
|
content_type="application/json",
|
|
metadata={}
|
|
)
|
|
updated_files.append(file)
|
|
except ValueError or ResponseError as e:
|
|
print(f'ERROR: {file} {e}')
|
|
|
|
print(os.linesep.join(updated_files))
|
|
|
|
|
|
@cli.command(name="store")
|
|
@click.option('-x', '--context', required=True)
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def store_files(ctx, context, files):
|
|
"""
|
|
Store specified files in a <context> bucket for retrieval.
|
|
"""
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_bucket, s3 = prep_s3(ctx)
|
|
stored_files: List[str] = []
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
for file in files:
|
|
file_identity = get_file_identity(ctx.obj, file)
|
|
try:
|
|
s3.fput_object(
|
|
s3_bucket,
|
|
file_identity,
|
|
file,
|
|
content_type="application/octet-stream"
|
|
)
|
|
if 'ADD_PREFIX' in ctx.obj and ctx.obj['ADD_PREFIX'] is not None:
|
|
stored_files.append(os.path.join(ctx.obj['ADD_PREFIX'], file_identity))
|
|
else:
|
|
stored_files.append(file)
|
|
except ResponseError as e:
|
|
print(f'ERROR: {file} {e}', file=sys.stderr)
|
|
|
|
print(os.linesep.join(stored_files))
|
|
|
|
|
|
@cli.command(name="retrieve")
|
|
@click.option('-x', '--context', required=True)
|
|
@click.option('-d', '--destination', default=None)
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def retrieve_files(ctx, context, destination, files):
|
|
"""
|
|
Retrieve specified files from a <context> bucket
|
|
"""
|
|
ctx.obj['CONTEXT'] = context
|
|
s3_bucket, s3 = prep_s3(ctx)
|
|
retrieved_files: List[str] = []
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
for file in files:
|
|
file_identity = get_file_identity(ctx.obj, file)
|
|
file_destination = file
|
|
if destination is not None:
|
|
file_destination = os.path.join(destination, file_identity)
|
|
try:
|
|
s3.fget_object(
|
|
s3_bucket,
|
|
file_identity,
|
|
file_destination
|
|
)
|
|
retrieved_files.append(file_destination)
|
|
except NoSuchKey as e:
|
|
print(f'ERROR: {file_identity} {file_destination} {e}', file=sys.stderr)
|
|
except ResponseError as e:
|
|
print(f'ERROR: {file_destination} {e}', file=sys.stderr)
|
|
|
|
print(os.linesep.join(retrieved_files))
|
|
|
|
|
|
######################################
|
|
# Asset Compression Focused Commands #
|
|
######################################
|
|
|
|
|
|
@cli.command(name="compress")
|
|
@click.option('-p', '--profile', default='default')
|
|
@click.option('-c', '--content', default='all')
|
|
@click.option('-d', '--destination', default=None)
|
|
@click.option('--print-input-and-identity/--no-print-input-and-identity', default=False)
|
|
@click.argument('files', nargs=-1)
|
|
@click.pass_context
|
|
def compress_assets(ctx, profile, content, destination, print_input_and_identity, files):
|
|
profiles = ctx.obj['CONFIG']['profiles']
|
|
|
|
if profile not in profiles:
|
|
raise ValueError(f'Unrecognized profile: {profile}')
|
|
|
|
default_profile: Dict[str, any] = profiles['default']
|
|
profile: Dict[str, any] = profiles[profile]
|
|
|
|
if content != 'all':
|
|
if content not in profile and content not in default_profile:
|
|
raise ValueError(f'Unrecognized content: {content}')
|
|
|
|
content_configurations = []
|
|
|
|
if content == 'all':
|
|
content_names: set = set()
|
|
|
|
for content_name in profile.keys():
|
|
content_names.add(content_name)
|
|
content_configurations.append(profile[content_name])
|
|
|
|
for content_name in default_profile.keys():
|
|
if content_name not in content_names:
|
|
content_names.add(content_name)
|
|
content_configurations.append(default_profile[content_name])
|
|
else:
|
|
if content in profile:
|
|
content_configurations.append(profile[content])
|
|
else:
|
|
content_configurations.append(default_profile[content])
|
|
|
|
if ctx.obj['READ_STDIN']:
|
|
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
|
|
|
|
if destination is None:
|
|
destination = tempfile.mkdtemp()
|
|
|
|
compressed_files = []
|
|
tasks = []
|
|
|
|
def store_filename(storage_list: List[str], filename: str):
|
|
"""
|
|
A simple lambda wrapper to asynchronously add processed files to the list
|
|
|
|
:param storage_list:
|
|
:param filename:
|
|
:return:
|
|
"""
|
|
return lambda: storage_list.append(filename)
|
|
|
|
for input_file in files:
|
|
for content_configuration in content_configurations:
|
|
if any([input_file.endswith(extension) for extension in content_configuration['extensions']]):
|
|
file = input_file
|
|
if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
|
|
file = strip_prefix(ctx.obj['REMOVE_PREFIX'], input_file)
|
|
|
|
if 'preserveInputExtension' in content_configuration \
|
|
and content_configuration['preserveInputExtension']:
|
|
output_file = os.path.join(destination, file)
|
|
else:
|
|
output_file_without_ext = os.path.splitext(os.path.join(destination, file))[0]
|
|
output_file = f'{output_file_without_ext}.{content_configuration["outputExtension"]}'
|
|
|
|
output_file_identity = get_file_identity({'REMOVE_PREFIX': destination}, output_file)
|
|
|
|
output_file_dir = os.path.dirname(output_file)
|
|
os.makedirs(output_file_dir, exist_ok=True)
|
|
|
|
command: str = content_configuration['command'] \
|
|
.replace('{{input_file}}', f'\'{input_file}\'') \
|
|
.replace('{{output_file}}', f'\'{output_file}\'')
|
|
|
|
tasks.append(
|
|
run_command_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
on_success=store_filename(
|
|
compressed_files,
|
|
f'{input_file}\t{output_file_identity}' if print_input_and_identity else output_file
|
|
)
|
|
)
|
|
)
|
|
|
|
results = run_asyncio_commands(
|
|
tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency']
|
|
)
|
|
|
|
print(os.linesep.join(compressed_files))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli(obj={})
|