pdnew/cli.py

290 lines
8.2 KiB
Python

# builtins
import os
import datetime
import uuid
# third-party
import click
import flask
import OpenSSL as openssl
# internals
from application import app
import util
import database
import admin
# utility functions
def show_model(value):
if value is None:
return None
return value.__name__
# cli endpoints (i.e. commands and groups)
@app.cli.command()
def install():
models = []
for cls in database.Model.__class_descendants__.values():
if not cls in app.models_abstract:
models.append(cls)
with click.progressbar(models, label="Creating tables…", item_show_func=show_model) as models:
for cls in sorted(models, key=lambda x: x.__class_created__):
cls.create_table()
@app.cli.command()
def cron():
with click.progressbar(app.cron_functions, label="Running cron functions:", item_show_func=lambda f:f.__name__ if f else '') as cron_functions:
for func in cron_functions:
try:
func()
except Exception as e:
click.secho(f"Cron function {func} failed to run: {str(e)}", fg='red')
cli_user = flask.cli.AppGroup('user')
@cli_user.command('list')
def user_list():
for user in admin.User.select():
click.secho(user.name, bold=True)
stmt = admin.ClientCert.select().where(admin.ClientCert.user == user)
count = len(stmt)
for i, cert in enumerate(stmt):
if i == (count - 1):
char_list = ''
else:
char_list = ''
click.echo(f"{char_list} {cert.name}: " +click.style(cert.fingerprint, fg='green'))
@cli_user.command('create')
@click.argument('name')
@click.argument('password')
def user_create(name, password):
try:
user = admin.User.register(name, password)
except Exception as e:
click.secho("Could not create user, exception was as follows:", fg='red')
click.echo(str(e))
else:
click.secho(f"Created user '{name}' (#{user.id}).", fg='green')
@cli_user.command('edit')
@click.argument('name')
def user_edit(name):
try:
user = admin.User.load(name)
except admin.User.DoesNotExist:
click.secho(f"User '{name}' does not exist!", fg='red')
raise click.Abort()
password = click.prompt("New password", hide_input=True, confirmation_prompt=True)
click.echo(f"pw: {password}")
if user.update_password(password):
click.secho(f"Password for user '{name}' successfully updated.", fg='green')
else:
click.secho(f"Could not update password.", fg='red')
@cli_user.command('delete')
@click.argument('name')
@click.option('--yes', is_flag=True, prompt='Really delete user?')
def user_delete(name, yes):
if not yes:
click.secho("No opt-in, doing nothing.", fg='yellow')
raise click.Abort()
try:
user = admin.User.load(name)
except admin.User.DoesNotExist:
click.secho(f"User '{name}' does not exist!", fg='red')
raise click.Abort()
if user.delete_instance():
click.secho(f"Successfully deleted user '{name}'.", fg='green')
else:
click.secho(f"Could not delete user '{name}'.", fg='red')
cli_clientcert = flask.cli.AppGroup('clientcert')
@cli_clientcert.command('create')
@click.argument('username')
@click.argument('certname')
#@click.option('--not-after', default=None) # probably requires datetime?
def cert_create(username, certname):
try:
user = admin.User.load(username)
except admin.User.DoesNotExist:
click.secho(f"User '{username}' does not exist", fg='red')
raise click.Abort()
not_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME']
try:
pkcs12 = user.gen_keypair_and_clientcert(certname, not_after)
except Exception:
if app.debug:
raise
click.secho("Client certificate creation failed.", fg='red')
raise click.Abort()
passphrase = util.random_string_light()
filename = f'{username}-{certname}.p12'
fd = open(filename, 'wb')
fd.write(pkcs12.export(passphrase=passphrase))
fd.close()
cert_info = admin.ClientCert()
cert_info.user = user
cert_info.name = certname
cert_info.fingerprint = pkcs12.get_certificate().digest('sha512').replace(b':', b'')
cert_info.save(force_insert=True)
click.secho(f"Bundled keypair and cert saved as '{filename}'.", fg='green')
click.echo("Passphrase: " + click.style(passphrase, bold=True, fg='yellow', bg='black'))
@cli_clientcert.command('assign')
@click.argument('username')
@click.argument('certname')
@click.argument('fingerprint')
def cert_assign(username, certname, fingerprint):
try:
user = admin.User.load(username)
except admin.User.DoesNotExist:
click.secho(f"User '{username}' does not exist", fg='red')
raise click.Abort()
cert_info = admin.ClientCert()
cert_info.user = user
cert_info.name = certname
cert_info.fingerprint = fingerprint
if cert_info.save():
click.secho(f"Assigned certificate fingerprint to user '{username}'.", fg='green')
else:
click.secho("Could not assign fingerprint.", fg='red')
@cli_clientcert.command('revoke')
@click.argument('username')
@click.argument('certname')
def cert_revoke(username, certname):
# TODO: CRL - add entry and host the CRL itself somewhere
try:
user = admin.User.load(username)
except admin.User.DoesNotExist:
click.secho(f"User '{username}' does not exist", fg='red')
raise click.Abort()
try:
cert_info = admin.ClientCert().get((admin.ClientCert.user == user) & (admin.ClientCert.name == certname))
except admin.ClientCert.DoesNotExist:
click.secho(f"Client certificate '{certname}' for user '{username}' does not exist.", fg='red')
raise click.Abort()
if cert_info.delete_instance():
click.secho(f"Successfully deleted assignment of certificate '{certname}' to user '{username}'.", fg='green')
else:
click.secho(f"Could not delete client certificate assignment.", fg='red')
cli_ca = flask.cli.AppGroup('ca')
@cli_ca.command('create')
@click.option('--lifetime', prompt="How long should this CA live (in seconds, 0 means infinite)?", default=0)
def ca_create(lifetime):
if os.path.exists('ca'):
click.secho("Directory 'ca' already exists, move or delete it and re-run.", fg='red')
raise click.Abort()
click.echo("Generating keypair…")
keypair = openssl.crypto.PKey()
keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH'])
click.echo("Generating certificate…")
cert = openssl.crypto.X509()
cert.set_serial_number(uuid.uuid4().int) # random uuid, mathmatically ~guaranteed to be unique
issuer = cert.get_issuer()
issuer.commonName = app.config['SITE_NAME']
issuer.C = 'AQ'
issuer.L = 'Free Nation Of Radiant Dissent'
issuer.O = 'Erisian Liberation Front'
issuer.OU = 'Cyber Confusion Center'
cert.set_subject(issuer)
cert.set_pubkey(keypair)
cert.gmtime_adj_notBefore(0)
if lifetime == 0:
cert.set_notAfter(b'99991231235959Z') # "indefinitely valid" as defined in RFC 5280 4.1.2.5.
else:
cert.gmtime_adj_notAfter(lifetime)
extensions = [
openssl.crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, dataEncipherment, keyAgreement, keyCertSign, cRLSign'),
]
cert.add_extensions(extensions)
# sign cert with private key
cert.sign(keypair, "sha512")
click.echo("Creating directory…")
os.mkdir('ca')
click.echo("Saving certificate…")
cert_pem = openssl.crypto.dump_certificate(openssl.crypto.FILETYPE_PEM, cert)
fd = open(os.path.join('ca', 'cert.pem'), 'w')
fd.write(cert_pem.decode('ascii'))
fd.close()
del fd
click.echo("Saving keypair…")
key_pem = openssl.crypto.dump_privatekey(openssl.crypto.FILETYPE_PEM, keypair)
fd = open(os.path.join('ca', 'key.pem'), 'w')
fd.write(key_pem.decode('ascii'))
fd.close()
del fd
click.secho("All done!", fg='green')
cli_user.add_command(cli_clientcert)
app.cli.add_command(cli_user)
app.cli.add_command(cli_ca)