@ -1,10 +1,12 @@
#!/usr/bin/env python
import asyncio
import hashlib
import io
import json
import os
from typing import List
import platform
import tempfile
from typing import List , Dict , Callable
import click
from minio import Minio , ResponseError
@ -20,17 +22,104 @@ METADATA_SHA256SUM = "Sha256sum"
BUF_SIZE = 4096
###########
# AsyncIO #
###########
async def run_command_shell (
command , stdout = asyncio . subprocess . PIPE , stderr = asyncio . subprocess . PIPE , on_success : Callable = ( ) ) :
""" Run command in subprocess (shell).
Note :
This can be used if you wish to execute e . g . " copy "
on Windows , which can only be executed in the shell .
"""
process = await asyncio . create_subprocess_shell (
command , stdout = stdout , stderr = stderr
)
process_stdout , process_stderr = await process . communicate ( )
if process . returncode == 0 :
on_success ( )
if stdout != asyncio . subprocess . DEVNULL :
result = process_stdout . decode ( ) . strip ( )
return result
else :
return None
def make_chunks ( tasks , chunk_size ) :
""" Yield successive chunk_size-sized chunks from tasks.
Note :
Taken from https : / / stackoverflow . com / a / 312464
modified for python 3 only
"""
for i in range ( 0 , len ( tasks ) , chunk_size ) :
yield tasks [ i : i + chunk_size ]
def run_asyncio_commands ( tasks , max_concurrent_tasks = 0 ) :
""" Run tasks asynchronously using asyncio and return results.
If max_concurrent_tasks are set to 0 , no limit is applied .
Note :
By default , Windows uses SelectorEventLoop , which does not support
subprocesses . Therefore ProactorEventLoop is used on Windows .
https : / / docs . python . org / 3 / library / asyncio - eventloops . html #windows
"""
all_results = [ ]
if max_concurrent_tasks == 0 :
chunks = [ tasks ]
num_chunks = len ( chunks )
else :
chunks = make_chunks ( tasks = tasks , chunk_size = max_concurrent_tasks )
num_chunks = len ( list ( make_chunks ( tasks = tasks , chunk_size = max_concurrent_tasks ) ) )
if asyncio . get_event_loop ( ) . is_closed ( ) :
asyncio . set_event_loop ( asyncio . new_event_loop ( ) )
if platform . system ( ) == " Windows " :
asyncio . set_event_loop ( asyncio . ProactorEventLoop ( ) )
loop = asyncio . get_event_loop ( )
chunk = 1
for tasks_in_chunk in chunks :
commands = asyncio . gather ( * tasks_in_chunk )
results = loop . run_until_complete ( commands )
all_results + = results
chunk + = 1
loop . close ( )
return all_results
###########
# Helpers #
###########
def get_metadata_name ( key ) :
return METADATA_PREFIX + ' SHA256SUM ' . capitalize ( )
def get_clean_stdin_iterator ( stdin_stream ) :
return ( line . strip ( ) for line in stdin_stream if line . strip ( ) != ' ' )
return ( line for line in [ line . strip ( ) for line in stdin_stream if line . strip ( ) != ' ' ] )
def strip_prefix ( prefix : str , file : str ) - > str :
if file . startswith ( prefix ) :
return file . replace ( prefix , ' ' )
return file
def get_file_identity ( ctx_obj , file ) :
if ' PREFIX ' in ctx_obj and ctx_obj [ ' PREFIX ' ] is not None :
path = file . replace ( ctx_obj [ ' PREFIX ' ] , ' ' )
path = strip_prefix ( ctx_obj [ ' PREFIX ' ] , file )
else :
path = file
@ -89,30 +178,41 @@ def load_config(path: str) -> any:
config [ ' s3 ' ] [ ' access ' ] = os . getenv ( ' ACM_S3_ACCESS ' )
config [ ' s3 ' ] [ ' secret ' ] = os . getenv ( ' ACM_S3_SECRET ' )
# Setup concurrency
if ' concurrency ' in config :
config [ ' concurrency ' ] = abs ( int ( config [ ' concurrency ' ] ) )
else :
config [ ' concurrency ' ] = 0
return config
@click.group ( )
@click.option ( ' -d ' , ' --debug/--no-debug ' , default = False )
@click.option ( ' -c ' , ' --config ' , default = lambda : os . path . join ( os . getcwd ( ) , ' acm-config.json ' ) , show_default = True )
@click.option ( ' -x ' , ' --context ' , required = True )
@click.option ( ' -s ' , ' --stdin/--no-stdin ' , default = False )
@click.option ( ' -p ' , ' --prefix ' , default = None )
@click.pass_context
def cli ( ctx , debug , config , context , stdin , prefix ) :
def cli ( ctx , debug , config , stdin , prefix ) :
ctx . ensure_object ( dict )
ctx . obj [ ' DEBUG ' ] = debug
ctx . obj [ ' CONFIG ' ] = load_config ( config )
ctx . obj [ ' CONTEXT ' ] = context
ctx . obj [ ' READ_STDIN ' ] = stdin
ctx . obj [ ' PREFIX ' ] = prefix
###############################
# S3 Storage Focused Commands #
###############################
@cli.command ( name = " list " )
@click.option ( ' --sha256sum/--no-sha256sum ' , default = False )
@click.option ( ' --suffix ' , default = None )
@click.option ( ' -x ' , ' --context ' , required = True )
@click.pass_context
def list_files ( ctx , sha256sum , suffix ) :
def list_files ( ctx , context , sha256sum , suffix ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_config = ctx . obj [ ' CONFIG ' ] [ ' s3 ' ]
s3_bucket = ctx . obj [ ' CONTEXT ' ]
@ -151,9 +251,11 @@ def list_files(ctx, sha256sum, suffix):
@cli.command ( name = " match " )
@click.pass_context
@click.option ( ' -x ' , ' --context ' , required = True )
@click.argument ( ' files ' , nargs = - 1 )
def check_matched_files_hashes ( ctx , files ) :
@click.pass_context
def check_matched_files_hashes ( ctx , context , files ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_bucket , s3 = prep_s3 ( ctx )
matching_files : List [ str ] = [ ]
@ -175,9 +277,11 @@ def check_matched_files_hashes(ctx, files):
@cli.command ( name = " check " )
@click.pass_context
@click.option ( ' -x ' , ' --context ' , required = True )
@click.argument ( ' files ' , nargs = - 1 )
def check_changed_files_hashes ( ctx , files ) :
@click.pass_context
def check_changed_files_hashes ( ctx , context , files ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_bucket , s3 = prep_s3 ( ctx )
changed_files : List [ str ] = [ ]
@ -199,9 +303,11 @@ def check_changed_files_hashes(ctx, files):
@cli.command ( name = " update " )
@click.pass_context
@click.option ( ' -x ' , ' --context ' , required = True )
@click.argument ( ' files ' , nargs = - 1 )
def update_changed_files_hashes ( ctx , files ) :
@click.pass_context
def update_changed_files_hashes ( ctx , context , files ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_bucket , s3 = prep_s3 ( ctx )
updated_files : List [ str ] = [ ]
@ -241,9 +347,11 @@ def update_changed_files_hashes(ctx, files):
@cli.command ( name = " store " )
@click.pass_context
@click.option ( ' -x ' , ' --context ' , required = True )
@click.argument ( ' files ' , nargs = - 1 )
def store_files ( ctx , files ) :
@click.pass_context
def store_files ( ctx , context , files ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_bucket , s3 = prep_s3 ( ctx )
stored_files : List [ str ] = [ ]
@ -267,10 +375,12 @@ def store_files(ctx, files):
@cli.command ( name = " retrieve " )
@click.pass_context
@click.option ( ' -x ' , ' --context ' , required = True )
@click.option ( ' -d ' , ' --destination ' , default = None )
@click.argument ( ' files ' , nargs = - 1 )
def retrieve_files ( ctx , destination , files ) :
@click.pass_context
def retrieve_files ( ctx , context , destination , files ) :
ctx . obj [ ' CONTEXT ' ] = context
s3_bucket , s3 = prep_s3 ( ctx )
retrieved_files : List [ str ] = [ ]
@ -295,5 +405,104 @@ def retrieve_files(ctx, destination, files):
print ( os . linesep . join ( retrieved_files ) )
######################################
# Asset Compression Focused Commands #
######################################
@cli.command ( name = " compress " )
@click.option ( ' -p ' , ' --profile ' , default = ' default ' )
@click.option ( ' -c ' , ' --content ' , default = ' all ' )
@click.option ( ' -d ' , ' --destination ' , default = None )
@click.argument ( ' files ' , nargs = - 1 )
@click.pass_context
def compress_assets ( ctx , profile , content , destination , files ) :
profiles = ctx . obj [ ' CONFIG ' ] [ ' profiles ' ]
if profile not in profiles :
raise ValueError ( f ' Unrecognized profile: {profile} ' )
default_profile : Dict [ str , any ] = profiles [ ' default ' ]
profile : Dict [ str , any ] = profiles [ profile ]
if content != ' all ' :
if content not in profile and content not in default_profile :
raise ValueError ( f ' Unrecognized content: {content} ' )
content_configurations = [ ]
if content == ' all ' :
content_names : set = set ( )
for content_name in profile . keys ( ) :
content_names . add ( content_name )
content_configurations . append ( profile [ content_name ] )
for content_name in default_profile . keys ( ) :
if content_name not in content_names :
content_names . add ( content_name )
content_configurations . append ( default_profile [ content_name ] )
else :
if content in profile :
content_configurations . append ( profile [ content ] )
else :
content_configurations . append ( default_profile [ content ] )
if ctx . obj [ ' READ_STDIN ' ] :
files = get_clean_stdin_iterator ( click . get_text_stream ( ' stdin ' ) )
if destination is None :
destination = tempfile . mkdtemp ( )
compressed_files = [ ]
tasks = [ ]
def store_filename ( storage_list : List [ str ] , filename : str ) :
"""
A simple lambda wrapper to asynchronously add processed files to the list
: param storage_list :
: param filename :
: return :
"""
return lambda : storage_list . append ( filename )
for input_file in files :
for content_configuration in content_configurations :
if any ( [ input_file . endswith ( extension ) for extension in content_configuration [ ' extensions ' ] ] ) :
file = input_file
if ' PREFIX ' in ctx . obj and ctx . obj [ ' PREFIX ' ] is not None :
file = strip_prefix ( ctx . obj [ ' PREFIX ' ] , input_file )
if ' preserveInputExtension ' in content_configuration \
and content_configuration [ ' preserveInputExtension ' ] :
output_file = os . path . join ( destination , file )
else :
output_file_without_ext = os . path . splitext ( os . path . join ( destination , file ) ) [ 0 ]
output_file = f ' {output_file_without_ext}.{content_configuration[ " outputExtension " ]} '
output_file_dir = os . path . dirname ( output_file )
os . makedirs ( output_file_dir , exist_ok = True )
command : str = content_configuration [ ' command ' ] \
. replace ( ' {{input_file}} ' , f ' \' {input_file} \' ' ) \
. replace ( ' {{output_file}} ' , f ' \' {output_file} \' ' )
tasks . append (
run_command_shell (
command ,
stdout = asyncio . subprocess . DEVNULL ,
stderr = asyncio . subprocess . DEVNULL ,
on_success = store_filename ( compressed_files , output_file )
)
)
results = run_asyncio_commands (
tasks , max_concurrent_tasks = ctx . obj [ ' CONFIG ' ] [ ' concurrency ' ]
)
print ( os . linesep . join ( compressed_files ) )
if __name__ == ' __main__ ' :
cli ( obj = { } )