Tooling for managing asset compression, storage, and retrieval
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

728 lines
24 KiB

  1. #!/usr/bin/env python
  2. import asyncio
  3. import collections.abc
  4. import hashlib
  5. import io
  6. import json
  7. import os
  8. import platform
  9. import sys
  10. import tempfile
  11. from typing import List, Dict, Callable
  12. import click
  13. from minio import Minio, ResponseError
  14. from minio.error import NoSuchKey
  15. # Size of the buffer to read files with
  16. BUF_SIZE = 4096
  17. #Application Version
  18. VERSION = "1.3.1"
  19. ###########
  20. # AsyncIO #
  21. ###########
  22. async def run_command_shell(
  23. command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, on_success: Callable = ()):
  24. """Run command in subprocess (shell).
  25. Note:
  26. This can be used if you wish to execute e.g. "copy"
  27. on Windows, which can only be executed in the shell.
  28. """
  29. process = await asyncio.create_subprocess_shell(
  30. command, stdout=stdout, stderr=stderr
  31. )
  32. process_stdout, process_stderr = await process.communicate()
  33. if process.returncode == 0:
  34. on_success()
  35. if stdout != asyncio.subprocess.DEVNULL:
  36. result = process_stdout.decode().strip()
  37. return result
  38. else:
  39. return None
  40. def make_chunks(tasks, chunk_size):
  41. """Yield successive chunk_size-sized chunks from tasks.
  42. Note:
  43. Taken from https://stackoverflow.com/a/312464
  44. modified for python 3 only
  45. """
  46. for i in range(0, len(tasks), chunk_size):
  47. yield tasks[i: i + chunk_size]
  48. def run_asyncio_commands(tasks, max_concurrent_tasks=0):
  49. """Run tasks asynchronously using asyncio and return results.
  50. If max_concurrent_tasks are set to 0, no limit is applied.
  51. Note:
  52. By default, Windows uses SelectorEventLoop, which does not support
  53. subprocesses. Therefore ProactorEventLoop is used on Windows.
  54. https://docs.python.org/3/library/asyncio-eventloops.html#windows
  55. """
  56. all_results = []
  57. if max_concurrent_tasks == 0:
  58. chunks = [tasks]
  59. num_chunks = len(chunks)
  60. else:
  61. chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)
  62. num_chunks = len(list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)))
  63. if asyncio.get_event_loop().is_closed():
  64. asyncio.set_event_loop(asyncio.new_event_loop())
  65. if platform.system() == "Windows":
  66. asyncio.set_event_loop(asyncio.ProactorEventLoop())
  67. loop = asyncio.get_event_loop()
  68. chunk = 1
  69. for tasks_in_chunk in chunks:
  70. commands = asyncio.gather(*tasks_in_chunk)
  71. results = loop.run_until_complete(commands)
  72. all_results += results
  73. chunk += 1
  74. loop.close()
  75. return all_results
  76. ###########
  77. # Helpers #
  78. ###########
  79. def update(d, u):
  80. for k, v in u.items():
  81. if isinstance(v, collections.abc.Mapping):
  82. d[k] = update(d.get(k, {}), v)
  83. else:
  84. d[k] = v
  85. return d
  86. def get_metadata_name(key):
  87. return METADATA_PREFIX + 'SHA256SUM'.capitalize()
  88. def get_clean_stdin_iterator(stdin_stream):
  89. return (line for line in [line.strip() for line in stdin_stream if line.strip() != ''])
  90. def strip_prefix(prefix: str, file: str) -> str:
  91. if file.startswith(prefix):
  92. return file.replace(prefix, '')
  93. return file
  94. def get_file_identity(ctx_obj, file):
  95. if 'REMOVE_PREFIX' in ctx_obj and ctx_obj['REMOVE_PREFIX'] is not None:
  96. path = strip_prefix(ctx_obj['REMOVE_PREFIX'], file)
  97. else:
  98. path = file
  99. if os.pathsep != '/':
  100. path = '/'.join(path.split(os.pathsep))
  101. return path
  102. def list_s3_dir(s3: Minio, bucket: str, prefix: str) -> List[str]:
  103. found_files = []
  104. for obj in s3.list_objects_v2(bucket, prefix=prefix):
  105. if obj.is_dir:
  106. found_files.extend(list_s3_dir(s3, bucket, obj.object_name))
  107. else:
  108. found_files.append(obj.object_name)
  109. return found_files
  110. def get_s3_client(config: any) -> Minio:
  111. host = config['host']
  112. secure = config['secure']
  113. access_key = config['access']
  114. secret_key = config['secret']
  115. return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key)
  116. def prep_s3(ctx):
  117. s3_config = ctx.obj['CONFIG']['s3']
  118. s3_bucket = ctx.obj['CONTEXT']
  119. s3 = get_s3_client(s3_config)
  120. if not s3.bucket_exists(s3_bucket):
  121. s3.make_bucket(s3_bucket)
  122. return s3_bucket, s3
  123. def get_file_sha256sum(stored_data, profile, file):
  124. stored_file_hash = stored_data['sha256sum']
  125. stored_profile_hash = stored_data['profileHash']
  126. sha256sum = hashlib.sha256()
  127. with open(file, 'rb') as f:
  128. for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
  129. sha256sum.update(byte_block)
  130. calculated_file_hash = sha256sum.hexdigest()
  131. return stored_profile_hash, stored_file_hash, calculated_file_hash
  132. def get_string_sha256sum(string: str, encoding='utf-8') -> str:
  133. sha256sum = hashlib.sha256()
  134. with io.BytesIO(json.dumps(string).encode(encoding)) as c:
  135. for byte_block in iter(lambda: c.read(BUF_SIZE), b''):
  136. sha256sum.update(byte_block)
  137. return sha256sum.hexdigest()
  138. def add_nested_key(config: Dict[str, any], path: List[str], value: str) -> bool:
  139. target = path[0].lower()
  140. if len(path) == 1:
  141. config[target] = value
  142. return True
  143. else:
  144. if target not in config:
  145. config[target] = {}
  146. add_nested_key(config[target], path[1:],value)
  147. return False
  148. def read_env_config(prefix, separator='__') -> any:
  149. prefix = prefix+separator
  150. env_config = {}
  151. environment_variables = [env for env in os.environ.keys() if env.startswith(prefix)]
  152. for env in environment_variables:
  153. path = env[len(prefix):].split('__')
  154. add_nested_key(env_config, path, os.environ[env])
  155. return env_config
  156. def load_config(path: str) -> any:
  157. combined_config = {}
  158. with open(
  159. os.path.join(
  160. os.path.dirname(os.path.realpath(__file__)),
  161. 'acm-config-default.json'),
  162. 'r') as combined_config_file:
  163. combined_config = json.load(combined_config_file)
  164. config = {}
  165. with open(path, 'r') as config_file:
  166. config = json.load(config_file)
  167. # Setup concurrency
  168. if 'concurrency' in config:
  169. config['concurrency'] = abs(int(config['concurrency']))
  170. else:
  171. config['concurrency'] = 0
  172. update(combined_config, config)
  173. update(combined_config, read_env_config('ACM'))
  174. # Calculate profiles hash
  175. profile_hashes={}
  176. profile_hashes['all'] = get_string_sha256sum(json.dumps(combined_config['profiles']))
  177. for profile in combined_config['profiles'].keys():
  178. profile_hashes[profile] = get_string_sha256sum(json.dumps(combined_config['profiles'][profile]))
  179. combined_config['profileHashes'] = profile_hashes
  180. return combined_config
  181. @click.group()
  182. @click.option('-d', '--debug/--no-debug', default=False)
  183. @click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True)
  184. @click.option('-s', '--stdin/--no-stdin', default=False)
  185. @click.option('--remove-prefix', default=None)
  186. @click.option('--add-prefix', default=None)
  187. @click.pass_context
  188. def cli(ctx, debug, config, stdin, remove_prefix, add_prefix):
  189. ctx.ensure_object(dict)
  190. ctx.obj['DEBUG'] = debug
  191. ctx.obj['CONFIG'] = load_config(config)
  192. ctx.obj['READ_STDIN'] = stdin
  193. ctx.obj['REMOVE_PREFIX'] = remove_prefix
  194. ctx.obj['ADD_PREFIX'] = add_prefix
  195. ####################
  196. # Generic Commands #
  197. ####################
  198. @cli.command(name="config")
  199. @click.pass_context
  200. def print_config(ctx):
  201. """
  202. Print the configuration
  203. """
  204. print(json.dumps(ctx.obj['CONFIG'], indent=2, sort_keys=True))
  205. ###############################
  206. # S3 Storage Focused Commands #
  207. ###############################
  208. @cli.command(name="list")
  209. @click.option('--sha256sum/--no-sha256sum', default=False)
  210. @click.option('--suffix', default=None)
  211. @click.option('-x', '--context', required=True)
  212. @click.option('--print-identity/--no-print-identity', default=False)
  213. @click.pass_context
  214. def list_files(ctx, context, sha256sum, suffix, print_identity):
  215. """
  216. List all file object in a bucket
  217. """
  218. ctx.obj['CONTEXT'] = context
  219. s3_config = ctx.obj['CONFIG']['s3']
  220. s3_bucket = ctx.obj['CONTEXT']
  221. s3 = get_s3_client(s3_config)
  222. if not s3.bucket_exists(s3_bucket):
  223. s3.make_bucket(s3_bucket)
  224. found_files: List[str] = []
  225. found_objects: List[str] = []
  226. for obj in s3.list_objects_v2(s3_bucket, recursive=False):
  227. if obj.is_dir:
  228. found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name))
  229. else:
  230. found_objects.append(obj.object_name)
  231. for obj in found_objects:
  232. file = obj
  233. if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
  234. file = os.path.join(ctx.obj['REMOVE_PREFIX'], file)
  235. if suffix is not None and suffix in file:
  236. file = file.replace(suffix, '')
  237. file = file.strip()
  238. if sha256sum:
  239. file_object = s3.get_object(s3_bucket, obj)
  240. stored_data = json.load(file_object)
  241. sha256sum_value = stored_data['sha256sum']
  242. file = f'{sha256sum_value} {file}'
  243. elif print_identity:
  244. file_object = s3.get_object(s3_bucket, obj)
  245. stored_data = json.load(file_object)
  246. found_files.append(stored_data['storedAssetIdentity'])
  247. else:
  248. found_files.append(file)
  249. print(os.linesep.join(found_files))
  250. @cli.command(name="match")
  251. @click.option('-x', '--context', required=True)
  252. @click.option('--print-identity/--no-print-identity', default=False)
  253. @click.option('-p', '--profile', default='all')
  254. @click.argument('files', nargs=-1)
  255. @click.pass_context
  256. def check_matched_files_hashes(ctx, context, print_identity, profile, files):
  257. """
  258. List all files that have matching stored sha256sum and profile hash
  259. """
  260. ctx.obj['CONTEXT'] = context
  261. s3_bucket, s3 = prep_s3(ctx)
  262. matching_files: List[str] = []
  263. if ctx.obj['READ_STDIN']:
  264. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  265. for file in files:
  266. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  267. try:
  268. file_object = s3.get_object(s3_bucket, file_identity)
  269. stored_data = json.load(file_object)
  270. stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, profile, file)
  271. if calculated_file_hash == stored_file_hash \
  272. and ctx.obj['CONFIG']['profileHashes'][profile] == stored_profile_hash:
  273. if print_identity:
  274. matching_files.append(stored_data['storedAssetIdentity'])
  275. else:
  276. matching_files.append(file)
  277. except NoSuchKey as e:
  278. continue
  279. except ValueError or ResponseError as e:
  280. print(f'ERROR: {file} {e}')
  281. print(os.linesep.join(matching_files))
  282. @cli.command(name="check")
  283. @click.option('-x', '--context', required=True)
  284. @click.option('-p', '--profile', default='all')
  285. @click.argument('files', nargs=-1)
  286. @click.pass_context
  287. def check_changed_files_hashes(ctx, context, profile, files):
  288. """
  289. List all files that do not have a matching sha256sum or profile hash
  290. """
  291. ctx.obj['CONTEXT'] = context
  292. s3_bucket, s3 = prep_s3(ctx)
  293. changed_files: List[str] = []
  294. if ctx.obj['READ_STDIN']:
  295. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  296. for file in files:
  297. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  298. try:
  299. file_object = s3.get_object(s3_bucket, file_identity)
  300. stored_data = json.load(file_object)
  301. stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, profile, file)
  302. if calculated_file_hash != stored_file_hash \
  303. or ctx.obj['CONFIG']['profileHashes'][profile] != stored_profile_hash:
  304. changed_files.append(file)
  305. except NoSuchKey as e:
  306. changed_files.append(file)
  307. except ValueError or ResponseError as e:
  308. print(f'ERROR: {file} {e}')
  309. print(os.linesep.join(changed_files))
  310. @cli.command(name="update")
  311. @click.option('-x', '--context', required=True)
  312. @click.option('--input-and-identity/--no-input-and-identity', default=False)
  313. @click.option('-p', '--profile', default='all')
  314. @click.argument('files', nargs=-1)
  315. @click.pass_context
  316. def update_changed_files_hashes(ctx, context, input_and_identity, profile, files):
  317. """
  318. Store new data objects for the provided files
  319. """
  320. ctx.obj['CONTEXT'] = context
  321. s3_bucket, s3 = prep_s3(ctx)
  322. updated_files: List[str] = []
  323. if ctx.obj['READ_STDIN']:
  324. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  325. for file in files:
  326. identity = None
  327. if input_and_identity:
  328. file, identity = file.split('\t')
  329. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  330. try:
  331. sha256sum = hashlib.sha256()
  332. with open(file, 'rb') as f:
  333. for byte_block in iter(lambda: f.read(BUF_SIZE), b''):
  334. sha256sum.update(byte_block)
  335. calculated_file_hash = sha256sum.hexdigest()
  336. object_data = {
  337. "sourcePath": file,
  338. "storedAssetIdentity": identity,
  339. "identity": file_identity,
  340. "sha256sum": calculated_file_hash,
  341. "profileHash": ctx.obj['CONFIG']['profileHashes'][profile]
  342. }
  343. with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data:
  344. data.seek(0, os.SEEK_END)
  345. data_length = data.tell()
  346. data.seek(0)
  347. s3.put_object(
  348. s3_bucket,
  349. file_identity,
  350. data,
  351. data_length,
  352. content_type="application/json",
  353. metadata={}
  354. )
  355. updated_files.append(file)
  356. except ValueError or ResponseError as e:
  357. print(f'ERROR: {file} {e}')
  358. print(os.linesep.join(updated_files))
  359. @cli.command(name="store")
  360. @click.option('-x', '--context', required=True)
  361. @click.argument('files', nargs=-1)
  362. @click.pass_context
  363. def store_files(ctx, context, files):
  364. """
  365. Store specified files in a <context> bucket for retrieval.
  366. """
  367. ctx.obj['CONTEXT'] = context
  368. s3_bucket, s3 = prep_s3(ctx)
  369. stored_files: List[str] = []
  370. if ctx.obj['READ_STDIN']:
  371. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  372. for file in files:
  373. file_identity = get_file_identity(ctx.obj, file)
  374. try:
  375. s3.fput_object(
  376. s3_bucket,
  377. file_identity,
  378. file,
  379. content_type="application/octet-stream"
  380. )
  381. if 'ADD_PREFIX' in ctx.obj and ctx.obj['ADD_PREFIX'] is not None:
  382. stored_files.append(os.path.join(ctx.obj['ADD_PREFIX'], file_identity))
  383. else:
  384. stored_files.append(file)
  385. except ResponseError as e:
  386. print(f'ERROR: {file} {e}', file=sys.stderr)
  387. print(os.linesep.join(stored_files))
  388. @cli.command(name="retrieve")
  389. @click.option('-x', '--context', required=True)
  390. @click.option('-d', '--destination', default=None)
  391. @click.argument('files', nargs=-1)
  392. @click.pass_context
  393. def retrieve_files(ctx, context, destination, files):
  394. """
  395. Retrieve specified files from a <context> bucket
  396. """
  397. ctx.obj['CONTEXT'] = context
  398. s3_bucket, s3 = prep_s3(ctx)
  399. retrieved_files: List[str] = []
  400. if ctx.obj['READ_STDIN']:
  401. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  402. for file in files:
  403. file_identity = get_file_identity(ctx.obj, file)
  404. file_destination = file
  405. if destination is not None:
  406. file_destination = os.path.join(destination, file_identity)
  407. try:
  408. s3.fget_object(
  409. s3_bucket,
  410. file_identity,
  411. file_destination
  412. )
  413. retrieved_files.append(file_destination)
  414. except NoSuchKey as e:
  415. print(f'ERROR: {file_identity} {file_destination} {e}', file=sys.stderr)
  416. except ResponseError as e:
  417. print(f'ERROR: {file_destination} {e}', file=sys.stderr)
  418. print(os.linesep.join(retrieved_files))
  419. @cli.command(name="clean")
  420. @click.option('-x', '--context', required=True)
  421. @click.option('-d', '--context-data', default=None)
  422. @click.option('-n', '--dry-run/--no-dry-run', default=False)
  423. @click.argument('files', nargs=-1)
  424. @click.pass_context
  425. def clean_files(ctx, context, context_data, dry_run, files):
  426. """
  427. Remove non matching specified files in a <context> bucket for retrieval.
  428. """
  429. ctx.obj['CONTEXT'] = context
  430. s3_bucket, s3 = prep_s3(ctx)
  431. s3_data_bucket = context_data
  432. found_files: List[str] = []
  433. found_data_files: List[str] = []
  434. removed_files: List[str] = []
  435. if ctx.obj['READ_STDIN']:
  436. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  437. # Go through and find all matching files
  438. for file in files:
  439. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  440. try:
  441. if s3_data_bucket is not None:
  442. file_object = s3.get_object(s3_bucket, file_identity)
  443. stored_data = json.load(file_object)
  444. stored_data_file_identity = stored_data['storedAssetIdentity']
  445. found_files.append(file_identity)
  446. found_data_files.append(stored_data_file_identity)
  447. else:
  448. file_object = s3.get_object(s3_bucket, file_identity)
  449. found_files.append(file_identity)
  450. except ResponseError as e:
  451. print(f'ERROR: ResponseError {file_identity} {e}', file=sys.stderr)
  452. except NoSuchKey as e:
  453. print(f'ERROR: NoSuchKey {file_identity}', file=sys.stderr)
  454. # print(os.linesep.join(found_objects))
  455. # print(os.linesep.join(found_objects))
  456. found_files = set(found_files)
  457. found_data_files = set(found_data_files)
  458. # Find all objects in s3 bucket
  459. found_objects: List[str] = []
  460. for obj in s3.list_objects_v2(s3_bucket, recursive=False):
  461. if obj.is_dir:
  462. found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name))
  463. else:
  464. found_objects.append(obj.object_name)
  465. # print(os.linesep.join(found_objects))
  466. found_data_objects: List[str] = []
  467. for obj in s3.list_objects_v2(s3_data_bucket, recursive=False):
  468. if obj.is_dir:
  469. found_data_objects.extend(list_s3_dir(s3, s3_data_bucket, obj.object_name))
  470. else:
  471. found_data_objects.append(obj.object_name)
  472. # print(os.linesep.join(found_data_objects))
  473. for file_identity in found_objects:
  474. if not file_identity in found_files:
  475. if dry_run:
  476. removed_files.append(f'{s3_bucket}:{file_identity}')
  477. else:
  478. try:
  479. s3.remove_object(s3_bucket, file_identity)
  480. removed_files.append(f'{s3_bucket}:{file_identity}')
  481. except ResponseError as e:
  482. print(f'ERROR: {s3_bucket}:{file_identity} {e}', file=sys.stderr)
  483. for file_data_identity in found_data_objects:
  484. if not file_data_identity in found_data_files:
  485. if dry_run:
  486. removed_files.append(f'{s3_data_bucket}:{file_data_identity}')
  487. else:
  488. try:
  489. s3.remove_object(s3_data_bucket, file_data_identity)
  490. removed_files.append(f'{s3_data_bucket}:{file_data_identity}')
  491. except ResponseError as e:
  492. print(f'ERROR: {s3_data_bucket}:{file_data_identity} {e}', file=sys.stderr)
  493. print(os.linesep.join(removed_files))
  494. ######################################
  495. # Asset Compression Focused Commands #
  496. ######################################
  497. @cli.command(name="compress")
  498. @click.option('-p', '--profile', default='default')
  499. @click.option('-c', '--content', default='all')
  500. @click.option('-d', '--destination', default=None)
  501. @click.option('--print-input-and-identity/--no-print-input-and-identity', default=False)
  502. @click.argument('files', nargs=-1)
  503. @click.pass_context
  504. def compress_assets(ctx, profile, content, destination, print_input_and_identity, files):
  505. """
  506. Compress the request files and store them in a storage bucket.
  507. """
  508. profiles = ctx.obj['CONFIG']['profiles']
  509. if profile not in profiles:
  510. raise ValueError(f'Unrecognized profile: {profile}')
  511. default_profile: Dict[str, any] = profiles['default']
  512. profile: Dict[str, any] = profiles[profile]
  513. if content != 'all':
  514. if content not in profile and content not in default_profile:
  515. raise ValueError(f'Unrecognized content: {content}')
  516. content_configurations = []
  517. if content == 'all':
  518. content_names: set = set()
  519. for content_name in profile.keys():
  520. content_names.add(content_name)
  521. content_configurations.append(profile[content_name])
  522. for content_name in default_profile.keys():
  523. if content_name not in content_names:
  524. content_names.add(content_name)
  525. content_configurations.append(default_profile[content_name])
  526. else:
  527. if content in profile:
  528. content_configurations.append(profile[content])
  529. else:
  530. content_configurations.append(default_profile[content])
  531. if ctx.obj['READ_STDIN']:
  532. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  533. if destination is None:
  534. destination = tempfile.mkdtemp()
  535. compressed_files = []
  536. tasks = []
  537. def store_filename(storage_list: List[str], filename: str):
  538. """
  539. A simple lambda wrapper to asynchronously add processed files to the list
  540. :param storage_list:
  541. :param filename:
  542. :return:
  543. """
  544. return lambda: storage_list.append(filename)
  545. for input_file in files:
  546. for content_configuration in content_configurations:
  547. if any([input_file.endswith(extension) for extension in content_configuration['extensions']]):
  548. file = input_file
  549. if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
  550. file = strip_prefix(ctx.obj['REMOVE_PREFIX'], input_file)
  551. if 'preserveInputExtension' in content_configuration \
  552. and content_configuration['preserveInputExtension']:
  553. output_file = os.path.join(destination, file)
  554. else:
  555. output_file_without_ext = os.path.splitext(os.path.join(destination, file))[0]
  556. output_file = f'{output_file_without_ext}.{content_configuration["outputExtension"]}'
  557. output_file_identity = get_file_identity({'REMOVE_PREFIX': destination}, output_file)
  558. output_file_dir = os.path.dirname(output_file)
  559. os.makedirs(output_file_dir, exist_ok=True)
  560. command: str = content_configuration['command'] \
  561. .replace('{input_file}', f'\'{input_file}\'') \
  562. .replace('{output_file}', f'\'{output_file}\'')
  563. tasks.append(
  564. run_command_shell(
  565. command,
  566. stdout=asyncio.subprocess.DEVNULL,
  567. stderr=asyncio.subprocess.DEVNULL,
  568. on_success=store_filename(
  569. compressed_files,
  570. f'{input_file}\t{output_file_identity}' if print_input_and_identity else output_file
  571. )
  572. )
  573. )
  574. results = run_asyncio_commands(
  575. tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency']
  576. )
  577. print(os.linesep.join(compressed_files))
  578. if __name__ == '__main__':
  579. cli(obj={})