Tooling for managing asset compression, storage, and retrieval
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

286 lines
8.4 KiB

#!/usr/bin/env python
import hashlib
import io
import json
import os
from typing import List
import click
from minio import Minio, ResponseError
from minio.error import NoSuchKey
# MinIO Metadata Prefix
METADATA_PREFIX = 'X-Amz-Meta-'
# Metadata Constants
METADATA_SHA256SUM = "Sha256sum"
# Size of the buffer to read files with
BUF_SIZE = 4096
def get_metadata_name(key):
return METADATA_PREFIX + 'SHA256SUM'.capitalize()
def get_clean_stdin_iterator(stdin_stream):
return (line.strip() for line in stdin_stream if line.strip() != '')
def get_file_identity(ctx_obj, file):
if 'PREFIX' in ctx_obj and ctx_obj['PREFIX'] is not None:
path = file.replace(ctx_obj['PREFIX'], '')
else:
path = file
if os.pathsep != '/':
path = '/'.join(path.split(os.pathsep))
return path
def list_minio_dir(minio: Minio, bucket: str, prefix: str) -> List[str]:
found_files = []
for obj in minio.list_objects_v2(bucket, prefix=prefix):
if obj.is_dir:
found_files.extend(list_minio_dir(minio, bucket, obj.object_name))
else:
found_files.append(obj.object_name)
return found_files
def get_minio_client(config: any) -> Minio:
host = config['host']
secure = config['secure']
access_key = config['access']
secret_key = config['secret']
return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key)
def load_config(path: str) -> any:
with open(path, 'r') as config_file:
config = json.load(config_file)
# Setup S3 Settings
config['s3']['access'] = os.getenv('ACM_S3_ACCESS')
config['s3']['secret'] = os.getenv('ACM_S3_SECRET')
return config
@click.group()
@click.option('-d', '--debug/--no-debug', default=False)
@click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True)
@click.option('-x', '--context', required=True)
@click.option('-s', '--stdin/--no-stdin', default=False)
@click.option('-p', '--prefix', default=None)
@click.pass_context
def cli(ctx, debug, config, context, stdin, prefix):
ctx.ensure_object(dict)
ctx.obj['DEBUG'] = debug
ctx.obj['CONFIG'] = load_config(config)
ctx.obj['CONTEXT'] = context
ctx.obj['READ_STDIN'] = stdin
ctx.obj['PREFIX'] = prefix
@cli.command(name="list")
@click.option('--sha256sum/--no-sha256sum', default=False)
@click.option('--suffix', default=None)
@click.pass_context
def list_files(ctx, sha256sum, suffix):
minio_config = ctx.obj['CONFIG']['minio']
minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket):
minio.make_bucket(minio_bucket)
found_files: List[str] = []
found_objects: List[str] = []
for obj in minio.list_objects_v2(minio_bucket, recursive=False):
if obj.is_dir:
found_objects.extend(list_minio_dir(minio, minio_bucket, obj.object_name))
else:
found_objects.append(obj.object_name)
for obj in found_objects:
file = obj
if 'PREFIX' in ctx.obj and ctx.obj['PREFIX'] is not None:
file = os.path.join(ctx.obj['PREFIX'], file)
if suffix is not None and suffix in file:
file = file.replace(suffix, '')
file = file.strip()
if sha256sum:
stat = minio.stat_object(minio_bucket, obj)
sha256sum_value = stat.metadata[get_metadata_name("SHA256SUM")]
file = f'{sha256sum_value} {file}'
found_files.append(file)
print(os.linesep.join(found_files))
@cli.command(name="check")
@click.pass_context
@click.argument('files', nargs=-1)
def check_changed_files_hashes(ctx, files):
minio_config = ctx.obj['CONFIG']['minio']
minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket):
minio.make_bucket(minio_bucket)
changed_files: List[str] = []
if ctx.obj['READ_STDIN']:
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files:
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
try:
file_object = minio.stat_object(minio_bucket, file_identity)
stored_file_hash = file_object.metadata[get_metadata_name("SHA256SUM")]
sha256sum = hashlib.sha256()
with open(file, 'rb') as f:
for byte_block in iter(lambda: f.read(BUF_SIZE), b""):
sha256sum.update(byte_block)
calculated_file_hash = sha256sum.hexdigest()
if calculated_file_hash != stored_file_hash:
changed_files.append(file)
except NoSuchKey as e:
changed_files.append(file)
except ValueError or ResponseError as e:
print(f'ERROR: {file} {e}')
print(os.linesep.join(changed_files))
@cli.command(name="update")
@click.pass_context
@click.argument('files', nargs=-1)
def update_changed_files_hashes(ctx, files):
minio_config = ctx.obj['CONFIG']['minio']
minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket):
minio.make_bucket(minio_bucket)
updated_files: List[str] = []
if ctx.obj['READ_STDIN']:
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files:
file_identity = f'{get_file_identity(ctx.obj, file)}.json'
try:
sha256sum = hashlib.sha256()
with open(file, 'rb') as f:
for byte_block in iter(lambda: f.read(BUF_SIZE), b''):
sha256sum.update(byte_block)
calculated_file_hash = sha256sum.hexdigest()
object_data = {
"path": file
}
with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data:
data.seek(0, os.SEEK_END)
data_length = data.tell()
data.seek(0)
minio.put_object(
minio_bucket,
file_identity,
data,
data_length,
content_type="application/json",
metadata={
"SHA256SUM": calculated_file_hash
}
)
updated_files.append(file)
except ValueError or ResponseError as e:
print(f'ERROR: {file} {e}')
print(os.linesep.join(updated_files))
@cli.command(name="store")
@click.pass_context
@click.argument('files', nargs=-1)
def store_files(ctx, files):
minio_config = ctx.obj['CONFIG']['minio']
minio_bucket = f'{ctx.obj["CONTEXT"]}-data'
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket):
minio.make_bucket(minio_bucket)
stored_files: List[str] = []
if ctx.obj['READ_STDIN']:
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files:
file_identity = get_file_identity(ctx.obj, file)
try:
minio.fput_object(
minio_bucket,
file_identity,
file,
content_type="application/octet-stream"
)
stored_files.append(file)
except ResponseError as e:
print(f'ERROR: {file} {e}')
print(os.linesep.join(stored_files))
@cli.command(name="retrieve")
@click.pass_context
@click.option('-d', '--destination', default=None)
@click.argument('files', nargs=-1)
def retrieve_files(ctx, destination, files):
minio_config = ctx.obj['CONFIG']['minio']
minio_bucket = f'{ctx.obj["CONTEXT"]}-data'
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket):
minio.make_bucket(minio_bucket)
retrieved_files: List[str] = []
if ctx.obj['READ_STDIN']:
files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files:
file_identity = get_file_identity(ctx.obj, file)
file_destination = file
if destination is not None:
file_destination = os.path.join(destination, file_identity)
try:
minio.fget_object(
minio_bucket,
file_identity,
file_destination
)
retrieved_files.append(file_destination)
except ResponseError as e:
print(f'ERROR: {file_destination} {e}')
print(os.linesep.join(retrieved_files))
if __name__ == '__main__':
cli(obj={})