# builtins import os import datetime import uuid # third-party import click import flask import OpenSSL as openssl # internals from application import app import util import database import admin # utility functions def show_model(value): if value is None: return None return value.__name__ # cli endpoints (i.e. commands and groups) @app.cli.command() def install(): models = [] for cls in database.Model.__class_descendants__.values(): if not cls in app.models_abstract: models.append(cls) with click.progressbar(models, label="Creating tables…", item_show_func=show_model) as models: for cls in sorted(models, key=lambda x: x.__class_created__): cls.create_table() @app.cli.command() def cron(): with click.progressbar(app.cron_functions, label="Running cron functions:", item_show_func=lambda f:f.__name__ if f else '') as cron_functions: for func in cron_functions: try: func() except Exception as e: click.secho(f"Cron function {func} failed to run: {str(e)}", fg='red') cli_user = flask.cli.AppGroup('user') @cli_user.command('list') def user_list(): for user in admin.User.select(): click.secho(user.name, bold=True) stmt = admin.ClientCert.select().where(admin.ClientCert.user == user) count = len(stmt) for i, cert in enumerate(stmt): if i == (count - 1): char_list = '└' else: char_list = '├' click.echo(f"{char_list} {cert.name}: " +click.style(cert.fingerprint, fg='green')) @cli_user.command('create') @click.argument('name') @click.argument('password') def user_create(name, password): try: user = admin.User.register(name, password) except Exception as e: click.secho("Could not create user, exception was as follows:", fg='red') click.echo(str(e)) else: click.secho(f"Created user '{name}' (#{user.id}).", fg='green') @cli_user.command('edit') @click.argument('name') def user_edit(name): try: user = admin.User.load(name) except admin.User.DoesNotExist: click.secho(f"User '{name}' does not exist!", fg='red') raise click.Abort() password = click.prompt("New password", hide_input=True, confirmation_prompt=True) click.echo(f"pw: {password}") if user.update_password(password): click.secho(f"Password for user '{name}' successfully updated.", fg='green') else: click.secho(f"Could not update password.", fg='red') @cli_user.command('delete') @click.argument('name') @click.option('--yes', is_flag=True, prompt='Really delete user?') def user_delete(name, yes): if not yes: click.secho("No opt-in, doing nothing.", fg='yellow') raise click.Abort() try: user = admin.User.load(name) except admin.User.DoesNotExist: click.secho(f"User '{name}' does not exist!", fg='red') raise click.Abort() if user.delete_instance(): click.secho(f"Successfully deleted user '{name}'.", fg='green') else: click.secho(f"Could not delete user '{name}'.", fg='red') cli_clientcert = flask.cli.AppGroup('clientcert') @cli_clientcert.command('create') @click.argument('username') @click.argument('certname') #@click.option('--not-after', default=None) # probably requires datetime? def cert_create(username, certname): try: user = admin.User.load(username) except admin.User.DoesNotExist: click.secho(f"User '{username}' does not exist", fg='red') raise click.Abort() not_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME'] try: pkcs12 = user.gen_keypair_and_clientcert(certname, not_after) except Exception: if app.debug: raise click.secho("Client certificate creation failed.", fg='red') raise click.Abort() passphrase = util.random_string_light() filename = f'{username}-{certname}.p12' fd = open(filename, 'wb') fd.write(pkcs12.export(passphrase=passphrase)) fd.close() cert_info = admin.ClientCert() cert_info.user = user cert_info.name = certname cert_info.fingerprint = pkcs12.get_certificate().digest('sha512').replace(b':', b'') cert_info.save(force_insert=True) click.secho(f"Bundled keypair and cert saved as '{filename}'.", fg='green') click.echo("Passphrase: " + click.style(passphrase, bold=True, fg='yellow', bg='black')) @cli_clientcert.command('assign') @click.argument('username') @click.argument('certname') @click.argument('fingerprint') def cert_assign(username, certname, fingerprint): try: user = admin.User.load(username) except admin.User.DoesNotExist: click.secho(f"User '{username}' does not exist", fg='red') raise click.Abort() cert_info = admin.ClientCert() cert_info.user = user cert_info.name = certname cert_info.fingerprint = fingerprint if cert_info.save(): click.secho(f"Assigned certificate fingerprint to user '{username}'.", fg='green') else: click.secho("Could not assign fingerprint.", fg='red') @cli_clientcert.command('revoke') @click.argument('username') @click.argument('certname') def cert_revoke(username, certname): # TODO: CRL - add entry and host the CRL itself somewhere try: user = admin.User.load(username) except admin.User.DoesNotExist: click.secho(f"User '{username}' does not exist", fg='red') raise click.Abort() try: cert_info = admin.ClientCert().get((admin.ClientCert.user == user) & (admin.ClientCert.name == certname)) except admin.ClientCert.DoesNotExist: click.secho(f"Client certificate '{certname}' for user '{username}' does not exist.", fg='red') raise click.Abort() if cert_info.delete_instance(): click.secho(f"Successfully deleted assignment of certificate '{certname}' to user '{username}'.", fg='green') else: click.secho(f"Could not delete client certificate assignment.", fg='red') cli_ca = flask.cli.AppGroup('ca') @cli_ca.command('create') @click.option('--lifetime', prompt="How long should this CA live (in seconds, 0 means infinite)?", default=0) def ca_create(lifetime): if os.path.exists('ca'): click.secho("Directory 'ca' already exists, move or delete it and re-run.", fg='red') raise click.Abort() click.echo("Generating keypair…") keypair = openssl.crypto.PKey() keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH']) click.echo("Generating certificate…") cert = openssl.crypto.X509() cert.set_serial_number(uuid.uuid4().int) # random uuid, mathmatically ~guaranteed to be unique issuer = cert.get_issuer() issuer.commonName = app.config['SITE_NAME'] issuer.C = 'AQ' issuer.L = 'Free Nation Of Radiant Dissent' issuer.O = 'Erisian Liberation Front' issuer.OU = 'Cyber Confusion Center' cert.set_subject(issuer) cert.set_pubkey(keypair) cert.gmtime_adj_notBefore(0) if lifetime == 0: cert.set_notAfter(b'99991231235959Z') # "indefinitely valid" as defined in RFC 5280 4.1.2.5. else: cert.gmtime_adj_notAfter(lifetime) extensions = [ openssl.crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'), openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, dataEncipherment, keyAgreement, keyCertSign, cRLSign'), ] cert.add_extensions(extensions) # sign cert with private key cert.sign(keypair, "sha512") click.echo("Creating directory…") os.mkdir('ca') click.echo("Saving certificate…") cert_pem = openssl.crypto.dump_certificate(openssl.crypto.FILETYPE_PEM, cert) fd = open(os.path.join('ca', 'cert.pem'), 'w') fd.write(cert_pem.decode('ascii')) fd.close() del fd click.echo("Saving keypair…") key_pem = openssl.crypto.dump_privatekey(openssl.crypto.FILETYPE_PEM, keypair) fd = open(os.path.join('ca', 'key.pem'), 'w') fd.write(key_pem.decode('ascii')) fd.close() del fd click.secho("All done!", fg='green') cli_user.add_command(cli_clientcert) app.cli.add_command(cli_user) app.cli.add_command(cli_ca)