Tooling for managing asset compression, storage, and retrieval
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
18 KiB

  1. #!/usr/bin/env python
  2. import asyncio
  3. import hashlib
  4. import io
  5. import json
  6. import os
  7. import platform
  8. import sys
  9. import tempfile
  10. from typing import List, Dict, Callable
  11. import click
  12. from minio import Minio, ResponseError
  13. from minio.error import NoSuchKey
  14. # Size of the buffer to read files with
  15. BUF_SIZE = 4096
  16. ###########
  17. # AsyncIO #
  18. ###########
  19. async def run_command_shell(
  20. command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, on_success: Callable = ()):
  21. """Run command in subprocess (shell).
  22. Note:
  23. This can be used if you wish to execute e.g. "copy"
  24. on Windows, which can only be executed in the shell.
  25. """
  26. process = await asyncio.create_subprocess_shell(
  27. command, stdout=stdout, stderr=stderr
  28. )
  29. process_stdout, process_stderr = await process.communicate()
  30. if process.returncode == 0:
  31. on_success()
  32. if stdout != asyncio.subprocess.DEVNULL:
  33. result = process_stdout.decode().strip()
  34. return result
  35. else:
  36. return None
  37. def make_chunks(tasks, chunk_size):
  38. """Yield successive chunk_size-sized chunks from tasks.
  39. Note:
  40. Taken from https://stackoverflow.com/a/312464
  41. modified for python 3 only
  42. """
  43. for i in range(0, len(tasks), chunk_size):
  44. yield tasks[i: i + chunk_size]
  45. def run_asyncio_commands(tasks, max_concurrent_tasks=0):
  46. """Run tasks asynchronously using asyncio and return results.
  47. If max_concurrent_tasks are set to 0, no limit is applied.
  48. Note:
  49. By default, Windows uses SelectorEventLoop, which does not support
  50. subprocesses. Therefore ProactorEventLoop is used on Windows.
  51. https://docs.python.org/3/library/asyncio-eventloops.html#windows
  52. """
  53. all_results = []
  54. if max_concurrent_tasks == 0:
  55. chunks = [tasks]
  56. num_chunks = len(chunks)
  57. else:
  58. chunks = make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)
  59. num_chunks = len(list(make_chunks(tasks=tasks, chunk_size=max_concurrent_tasks)))
  60. if asyncio.get_event_loop().is_closed():
  61. asyncio.set_event_loop(asyncio.new_event_loop())
  62. if platform.system() == "Windows":
  63. asyncio.set_event_loop(asyncio.ProactorEventLoop())
  64. loop = asyncio.get_event_loop()
  65. chunk = 1
  66. for tasks_in_chunk in chunks:
  67. commands = asyncio.gather(*tasks_in_chunk)
  68. results = loop.run_until_complete(commands)
  69. all_results += results
  70. chunk += 1
  71. loop.close()
  72. return all_results
  73. ###########
  74. # Helpers #
  75. ###########
  76. def get_metadata_name(key):
  77. return METADATA_PREFIX + 'SHA256SUM'.capitalize()
  78. def get_clean_stdin_iterator(stdin_stream):
  79. return (line for line in [line.strip() for line in stdin_stream if line.strip() != ''])
  80. def strip_prefix(prefix: str, file: str) -> str:
  81. if file.startswith(prefix):
  82. return file.replace(prefix, '')
  83. return file
  84. def get_file_identity(ctx_obj, file):
  85. if 'REMOVE_PREFIX' in ctx_obj and ctx_obj['REMOVE_PREFIX'] is not None:
  86. path = strip_prefix(ctx_obj['REMOVE_PREFIX'], file)
  87. else:
  88. path = file
  89. if os.pathsep != '/':
  90. path = '/'.join(path.split(os.pathsep))
  91. return path
  92. def list_s3_dir(s3: Minio, bucket: str, prefix: str) -> List[str]:
  93. found_files = []
  94. for obj in s3.list_objects_v2(bucket, prefix=prefix):
  95. if obj.is_dir:
  96. found_files.extend(list_s3_dir(s3, bucket, obj.object_name))
  97. else:
  98. found_files.append(obj.object_name)
  99. return found_files
  100. def get_s3_client(config: any) -> Minio:
  101. host = config['host']
  102. secure = config['secure']
  103. access_key = config['access']
  104. secret_key = config['secret']
  105. return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key)
  106. def prep_s3(ctx):
  107. s3_config = ctx.obj['CONFIG']['s3']
  108. s3_bucket = ctx.obj['CONTEXT']
  109. s3 = get_s3_client(s3_config)
  110. if not s3.bucket_exists(s3_bucket):
  111. s3.make_bucket(s3_bucket)
  112. return s3_bucket, s3
  113. def get_file_sha256sum(stored_data, file):
  114. stored_file_hash = stored_data['sha256sum']
  115. stored_profile_hash = stored_data['profilesHash']
  116. sha256sum = hashlib.sha256()
  117. with open(file, 'rb') as f:
  118. for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
  119. sha256sum.update(byte_block)
  120. calculated_file_hash = sha256sum.hexdigest()
  121. return stored_profile_hash, stored_file_hash, calculated_file_hash
  122. def load_config(path: str) -> any:
  123. with open(path, 'r') as config_file:
  124. config = json.load(config_file)
  125. # Setup S3 Settings
  126. config['s3']['access'] = os.getenv('ACM_S3_ACCESS')
  127. config['s3']['secret'] = os.getenv('ACM_S3_SECRET')
  128. # Setup concurrency
  129. if 'concurrency' in config:
  130. config['concurrency'] = abs(int(config['concurrency']))
  131. else:
  132. config['concurrency'] = 0
  133. # Calculate profiles hash
  134. sha256sum = hashlib.sha256()
  135. with io.BytesIO(json.dumps(config['profiles']).encode('utf-8')) as c:
  136. for byte_block in iter(lambda: c.read(BUF_SIZE), b''):
  137. sha256sum.update(byte_block)
  138. profiles_hash = sha256sum.hexdigest()
  139. config['profilesHash'] = profiles_hash
  140. return config
  141. @click.group()
  142. @click.option('-d', '--debug/--no-debug', default=False)
  143. @click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True)
  144. @click.option('-s', '--stdin/--no-stdin', default=False)
  145. @click.option('--remove-prefix', default=None)
  146. @click.option('--add-prefix', default=None)
  147. @click.pass_context
  148. def cli(ctx, debug, config, stdin, remove_prefix, add_prefix):
  149. ctx.ensure_object(dict)
  150. ctx.obj['DEBUG'] = debug
  151. ctx.obj['CONFIG'] = load_config(config)
  152. ctx.obj['READ_STDIN'] = stdin
  153. ctx.obj['REMOVE_PREFIX'] = remove_prefix
  154. ctx.obj['ADD_PREFIX'] = add_prefix
  155. ###############################
  156. # S3 Storage Focused Commands #
  157. ###############################
  158. @cli.command(name="list")
  159. @click.option('--sha256sum/--no-sha256sum', default=False)
  160. @click.option('--suffix', default=None)
  161. @click.option('-x', '--context', required=True)
  162. @click.pass_context
  163. def list_files(ctx, context, sha256sum, suffix):
  164. ctx.obj['CONTEXT'] = context
  165. s3_config = ctx.obj['CONFIG']['s3']
  166. s3_bucket = ctx.obj['CONTEXT']
  167. s3 = get_s3_client(s3_config)
  168. if not s3.bucket_exists(s3_bucket):
  169. s3.make_bucket(s3_bucket)
  170. found_files: List[str] = []
  171. found_objects: List[str] = []
  172. for obj in s3.list_objects_v2(s3_bucket, recursive=False):
  173. if obj.is_dir:
  174. found_objects.extend(list_s3_dir(s3, s3_bucket, obj.object_name))
  175. else:
  176. found_objects.append(obj.object_name)
  177. for obj in found_objects:
  178. file = obj
  179. if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
  180. file = os.path.join(ctx.obj['REMOVE_PREFIX'], file)
  181. if suffix is not None and suffix in file:
  182. file = file.replace(suffix, '')
  183. file = file.strip()
  184. if sha256sum:
  185. stat = s3.stat_object(s3_bucket, obj)
  186. sha256sum_value = stat.metadata[get_metadata_name("SHA256SUM")]
  187. file = f'{sha256sum_value} {file}'
  188. found_files.append(file)
  189. print(os.linesep.join(found_files))
  190. @cli.command(name="match")
  191. @click.option('-x', '--context', required=True)
  192. @click.option('--print-identity/--no-print-identity', default=False)
  193. @click.argument('files', nargs=-1)
  194. @click.pass_context
  195. def check_matched_files_hashes(ctx, context, print_identity, files):
  196. """
  197. List all files that have matching stored sha256sum and profilesHash
  198. """
  199. ctx.obj['CONTEXT'] = context
  200. s3_bucket, s3 = prep_s3(ctx)
  201. matching_files: List[str] = []
  202. if ctx.obj['READ_STDIN']:
  203. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  204. for file in files:
  205. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  206. try:
  207. file_object = s3.get_object(s3_bucket, file_identity)
  208. stored_data = json.load(file_object)
  209. stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, file)
  210. if calculated_file_hash == stored_file_hash \
  211. and ctx.obj['CONFIG']['profilesHash'] == stored_profile_hash:
  212. if print_identity:
  213. matching_files.append(stored_data['storedAssetIdentity'])
  214. else:
  215. matching_files.append(file)
  216. except NoSuchKey as e:
  217. continue
  218. except ValueError or ResponseError as e:
  219. print(f'ERROR: {file} {e}')
  220. print(os.linesep.join(matching_files))
  221. @cli.command(name="check")
  222. @click.option('-x', '--context', required=True)
  223. @click.argument('files', nargs=-1)
  224. @click.pass_context
  225. def check_changed_files_hashes(ctx, context, files):
  226. """
  227. List all files that do not have a matching sha256sum or profilesHash
  228. """
  229. ctx.obj['CONTEXT'] = context
  230. s3_bucket, s3 = prep_s3(ctx)
  231. changed_files: List[str] = []
  232. if ctx.obj['READ_STDIN']:
  233. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  234. for file in files:
  235. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  236. try:
  237. file_object = s3.get_object(s3_bucket, file_identity)
  238. stored_data = json.load(file_object)
  239. stored_profile_hash, stored_file_hash, calculated_file_hash = get_file_sha256sum(stored_data, file)
  240. if calculated_file_hash != stored_file_hash \
  241. or ctx.obj['CONFIG']['profilesHash'] != stored_profile_hash:
  242. changed_files.append(file)
  243. except NoSuchKey as e:
  244. changed_files.append(file)
  245. except ValueError or ResponseError as e:
  246. print(f'ERROR: {file} {e}')
  247. print(os.linesep.join(changed_files))
  248. @cli.command(name="update")
  249. @click.option('-x', '--context', required=True)
  250. @click.option('--input-and-identity/--no-input-and-identity', default=False)
  251. @click.argument('files', nargs=-1)
  252. @click.pass_context
  253. def update_changed_files_hashes(ctx, context, input_and_identity, files):
  254. """
  255. Store new data objects for the provided files
  256. """
  257. ctx.obj['CONTEXT'] = context
  258. s3_bucket, s3 = prep_s3(ctx)
  259. updated_files: List[str] = []
  260. if ctx.obj['READ_STDIN']:
  261. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  262. for file in files:
  263. identity = None
  264. if input_and_identity:
  265. file, identity = file.split('\t')
  266. file_identity = f'{get_file_identity(ctx.obj, file)}.json'
  267. try:
  268. sha256sum = hashlib.sha256()
  269. with open(file, 'rb') as f:
  270. for byte_block in iter(lambda: f.read(BUF_SIZE), b''):
  271. sha256sum.update(byte_block)
  272. calculated_file_hash = sha256sum.hexdigest()
  273. object_data = {
  274. "sourcePath": file,
  275. "storedAssetIdentity": identity,
  276. "identity": file_identity,
  277. "sha256sum": calculated_file_hash,
  278. "profilesHash": ctx.obj['CONFIG']['profilesHash']
  279. }
  280. with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data:
  281. data.seek(0, os.SEEK_END)
  282. data_length = data.tell()
  283. data.seek(0)
  284. s3.put_object(
  285. s3_bucket,
  286. file_identity,
  287. data,
  288. data_length,
  289. content_type="application/json",
  290. metadata={}
  291. )
  292. updated_files.append(file)
  293. except ValueError or ResponseError as e:
  294. print(f'ERROR: {file} {e}')
  295. print(os.linesep.join(updated_files))
  296. @cli.command(name="store")
  297. @click.option('-x', '--context', required=True)
  298. @click.argument('files', nargs=-1)
  299. @click.pass_context
  300. def store_files(ctx, context, files):
  301. """
  302. Store specified files in a <context> bucket for retrieval.
  303. """
  304. ctx.obj['CONTEXT'] = context
  305. s3_bucket, s3 = prep_s3(ctx)
  306. stored_files: List[str] = []
  307. if ctx.obj['READ_STDIN']:
  308. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  309. for file in files:
  310. file_identity = get_file_identity(ctx.obj, file)
  311. try:
  312. s3.fput_object(
  313. s3_bucket,
  314. file_identity,
  315. file,
  316. content_type="application/octet-stream"
  317. )
  318. if 'ADD_PREFIX' in ctx.obj and ctx.obj['ADD_PREFIX'] is not None:
  319. stored_files.append(os.path.join(ctx.obj['ADD_PREFIX'], file_identity))
  320. else:
  321. stored_files.append(file)
  322. except ResponseError as e:
  323. print(f'ERROR: {file} {e}', file=sys.stderr)
  324. print(os.linesep.join(stored_files))
  325. @cli.command(name="retrieve")
  326. @click.option('-x', '--context', required=True)
  327. @click.option('-d', '--destination', default=None)
  328. @click.argument('files', nargs=-1)
  329. @click.pass_context
  330. def retrieve_files(ctx, context, destination, files):
  331. """
  332. Retrieve specified files from a <context> bucket
  333. """
  334. ctx.obj['CONTEXT'] = context
  335. s3_bucket, s3 = prep_s3(ctx)
  336. retrieved_files: List[str] = []
  337. if ctx.obj['READ_STDIN']:
  338. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  339. for file in files:
  340. file_identity = get_file_identity(ctx.obj, file)
  341. file_destination = file
  342. if destination is not None:
  343. file_destination = os.path.join(destination, file_identity)
  344. try:
  345. s3.fget_object(
  346. s3_bucket,
  347. file_identity,
  348. file_destination
  349. )
  350. retrieved_files.append(file_destination)
  351. except NoSuchKey as e:
  352. print(f'ERROR: {file_identity} {file_destination} {e}', file=sys.stderr)
  353. except ResponseError as e:
  354. print(f'ERROR: {file_destination} {e}', file=sys.stderr)
  355. print(os.linesep.join(retrieved_files))
  356. ######################################
  357. # Asset Compression Focused Commands #
  358. ######################################
  359. @cli.command(name="compress")
  360. @click.option('-p', '--profile', default='default')
  361. @click.option('-c', '--content', default='all')
  362. @click.option('-d', '--destination', default=None)
  363. @click.option('--print-input-and-identity/--no-print-input-and-identity', default=False)
  364. @click.argument('files', nargs=-1)
  365. @click.pass_context
  366. def compress_assets(ctx, profile, content, destination, print_input_and_identity, files):
  367. profiles = ctx.obj['CONFIG']['profiles']
  368. if profile not in profiles:
  369. raise ValueError(f'Unrecognized profile: {profile}')
  370. default_profile: Dict[str, any] = profiles['default']
  371. profile: Dict[str, any] = profiles[profile]
  372. if content != 'all':
  373. if content not in profile and content not in default_profile:
  374. raise ValueError(f'Unrecognized content: {content}')
  375. content_configurations = []
  376. if content == 'all':
  377. content_names: set = set()
  378. for content_name in profile.keys():
  379. content_names.add(content_name)
  380. content_configurations.append(profile[content_name])
  381. for content_name in default_profile.keys():
  382. if content_name not in content_names:
  383. content_names.add(content_name)
  384. content_configurations.append(default_profile[content_name])
  385. else:
  386. if content in profile:
  387. content_configurations.append(profile[content])
  388. else:
  389. content_configurations.append(default_profile[content])
  390. if ctx.obj['READ_STDIN']:
  391. files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
  392. if destination is None:
  393. destination = tempfile.mkdtemp()
  394. compressed_files = []
  395. tasks = []
  396. def store_filename(storage_list: List[str], filename: str):
  397. """
  398. A simple lambda wrapper to asynchronously add processed files to the list
  399. :param storage_list:
  400. :param filename:
  401. :return:
  402. """
  403. return lambda: storage_list.append(filename)
  404. for input_file in files:
  405. for content_configuration in content_configurations:
  406. if any([input_file.endswith(extension) for extension in content_configuration['extensions']]):
  407. file = input_file
  408. if 'REMOVE_PREFIX' in ctx.obj and ctx.obj['REMOVE_PREFIX'] is not None:
  409. file = strip_prefix(ctx.obj['REMOVE_PREFIX'], input_file)
  410. if 'preserveInputExtension' in content_configuration \
  411. and content_configuration['preserveInputExtension']:
  412. output_file = os.path.join(destination, file)
  413. else:
  414. output_file_without_ext = os.path.splitext(os.path.join(destination, file))[0]
  415. output_file = f'{output_file_without_ext}.{content_configuration["outputExtension"]}'
  416. output_file_identity = get_file_identity({'REMOVE_PREFIX': destination}, output_file)
  417. output_file_dir = os.path.dirname(output_file)
  418. os.makedirs(output_file_dir, exist_ok=True)
  419. command: str = content_configuration['command'] \
  420. .replace('{{input_file}}', f'\'{input_file}\'') \
  421. .replace('{{output_file}}', f'\'{output_file}\'')
  422. tasks.append(
  423. run_command_shell(
  424. command,
  425. stdout=asyncio.subprocess.DEVNULL,
  426. stderr=asyncio.subprocess.DEVNULL,
  427. on_success=store_filename(
  428. compressed_files,
  429. f'{input_file}\t{output_file_identity}' if print_input_and_identity else output_file
  430. )
  431. )
  432. )
  433. results = run_asyncio_commands(
  434. tasks, max_concurrent_tasks=ctx.obj['CONFIG']['concurrency']
  435. )
  436. print(os.linesep.join(compressed_files))
  437. if __name__ == '__main__':
  438. cli(obj={})