|
|
#!/usr/bin/env python
import hashlib import io import json import os from typing import List
import click from minio import Minio, ResponseError from minio.error import NoSuchKey
# MinIO Metadata Prefix METADATA_PREFIX = 'X-Amz-Meta-'
# Metadata Constants METADATA_SHA256SUM = "Sha256sum"
# Size of the buffer to read files with BUF_SIZE = 4096
def get_metadata_name(key): return METADATA_PREFIX + 'SHA256SUM'.capitalize()
def get_clean_stdin_iterator(stdin_stream): return (line.strip() for line in stdin_stream if line.strip() != '')
def get_file_identity(ctx_obj, file): if 'PREFIX' in ctx_obj and ctx_obj['PREFIX'] is not None: path = file.replace(ctx_obj['PREFIX'], '') else: path = file
if os.pathsep != '/': path = '/'.join(path.split(os.pathsep))
return path
def list_minio_dir(minio: Minio, bucket: str, prefix: str) -> List[str]: found_files = [] for obj in minio.list_objects_v2(bucket, prefix=prefix): if obj.is_dir: found_files.extend(list_minio_dir(minio, bucket, obj.object_name)) else: found_files.append(obj.object_name) return found_files
def get_minio_client(config: any) -> Minio: host = config['host'] secure = config['secure'] access_key = config['access'] secret_key = config['secret'] return Minio(host, secure=secure, access_key=access_key, secret_key=secret_key)
def load_config(path: str) -> any: with open(path, 'r') as config_file: config = json.load(config_file)
# Setup S3 Settings config['s3']['access'] = os.getenv('ACM_S3_ACCESS') config['s3']['secret'] = os.getenv('ACM_S3_SECRET')
return config
@click.group() @click.option('-d', '--debug/--no-debug', default=False) @click.option('-c', '--config', default=lambda: os.path.join(os.getcwd(), 'acm-config.json'), show_default=True) @click.option('-x', '--context', required=True) @click.option('-s', '--stdin/--no-stdin', default=False) @click.option('-p', '--prefix', default=None) @click.pass_context def cli(ctx, debug, config, context, stdin, prefix): ctx.ensure_object(dict) ctx.obj['DEBUG'] = debug ctx.obj['CONFIG'] = load_config(config) ctx.obj['CONTEXT'] = context ctx.obj['READ_STDIN'] = stdin ctx.obj['PREFIX'] = prefix
@cli.command(name="list") @click.option('--sha256sum/--no-sha256sum', default=False) @click.option('--suffix', default=None) @click.pass_context def list_files(ctx, sha256sum, suffix): minio_config = ctx.obj['CONFIG']['minio'] minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket): minio.make_bucket(minio_bucket)
found_files: List[str] = [] found_objects: List[str] = []
for obj in minio.list_objects_v2(minio_bucket, recursive=False): if obj.is_dir: found_objects.extend(list_minio_dir(minio, minio_bucket, obj.object_name)) else: found_objects.append(obj.object_name)
for obj in found_objects: file = obj if 'PREFIX' in ctx.obj and ctx.obj['PREFIX'] is not None: file = os.path.join(ctx.obj['PREFIX'], file)
if suffix is not None and suffix in file: file = file.replace(suffix, '')
file = file.strip()
if sha256sum: stat = minio.stat_object(minio_bucket, obj) sha256sum_value = stat.metadata[get_metadata_name("SHA256SUM")] file = f'{sha256sum_value} {file}'
found_files.append(file)
print(os.linesep.join(found_files))
@cli.command(name="check") @click.pass_context @click.argument('files', nargs=-1) def check_changed_files_hashes(ctx, files): minio_config = ctx.obj['CONFIG']['minio'] minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket): minio.make_bucket(minio_bucket)
changed_files: List[str] = []
if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files: file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: file_object = minio.stat_object(minio_bucket, file_identity) stored_file_hash = file_object.metadata[get_metadata_name("SHA256SUM")] sha256sum = hashlib.sha256() with open(file, 'rb') as f: for byte_block in iter(lambda: f.read(BUF_SIZE), b""): sha256sum.update(byte_block) calculated_file_hash = sha256sum.hexdigest() if calculated_file_hash != stored_file_hash: changed_files.append(file) except NoSuchKey as e: changed_files.append(file) except ValueError or ResponseError as e: print(f'ERROR: {file} {e}')
print(os.linesep.join(changed_files))
@cli.command(name="update") @click.pass_context @click.argument('files', nargs=-1) def update_changed_files_hashes(ctx, files): minio_config = ctx.obj['CONFIG']['minio'] minio_bucket = ctx.obj['CONTEXT']
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket): minio.make_bucket(minio_bucket)
updated_files: List[str] = []
if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files: file_identity = f'{get_file_identity(ctx.obj, file)}.json' try: sha256sum = hashlib.sha256() with open(file, 'rb') as f: for byte_block in iter(lambda: f.read(BUF_SIZE), b''): sha256sum.update(byte_block) calculated_file_hash = sha256sum.hexdigest() object_data = { "path": file } with io.BytesIO(json.dumps(object_data, sort_keys=True, indent=None).encode('utf-8')) as data: data.seek(0, os.SEEK_END) data_length = data.tell() data.seek(0) minio.put_object( minio_bucket, file_identity, data, data_length, content_type="application/json", metadata={ "SHA256SUM": calculated_file_hash } ) updated_files.append(file) except ValueError or ResponseError as e: print(f'ERROR: {file} {e}')
print(os.linesep.join(updated_files))
@cli.command(name="store") @click.pass_context @click.argument('files', nargs=-1) def store_files(ctx, files): minio_config = ctx.obj['CONFIG']['minio'] minio_bucket = f'{ctx.obj["CONTEXT"]}-data'
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket): minio.make_bucket(minio_bucket)
stored_files: List[str] = []
if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files: file_identity = get_file_identity(ctx.obj, file) try: minio.fput_object( minio_bucket, file_identity, file, content_type="application/octet-stream" ) stored_files.append(file) except ResponseError as e: print(f'ERROR: {file} {e}')
print(os.linesep.join(stored_files))
@cli.command(name="retrieve") @click.pass_context @click.option('-d', '--destination', default=None) @click.argument('files', nargs=-1) def retrieve_files(ctx, destination, files): minio_config = ctx.obj['CONFIG']['minio'] minio_bucket = f'{ctx.obj["CONTEXT"]}-data'
minio = get_minio_client(minio_config)
if not minio.bucket_exists(minio_bucket): minio.make_bucket(minio_bucket)
retrieved_files: List[str] = []
if ctx.obj['READ_STDIN']: files = get_clean_stdin_iterator(click.get_text_stream('stdin'))
for file in files: file_identity = get_file_identity(ctx.obj, file) file_destination = file if destination is not None: file_destination = os.path.join(destination, file_identity) try: minio.fget_object( minio_bucket, file_identity, file_destination ) retrieved_files.append(file_destination) except ResponseError as e: print(f'ERROR: {file_destination} {e}')
print(os.linesep.join(retrieved_files))
if __name__ == '__main__': cli(obj={})
|