Compare commits

...

2 Commits

11 changed files with 422 additions and 25 deletions

4
.gitignore vendored
View File

@ -1,5 +1,9 @@
**/__pycache__
**/*.sw*
.cache
.config
tags
config.py
upload/*/*
ca
*.p12

163
admin.py
View File

@ -4,10 +4,12 @@ import datetime
import functools
import hashlib
import secrets
import uuid
# third-party
import werkzeug, flask
import peewee, playhouse.postgres_ext
import OpenSSL as openssl
# internals
from application import app
@ -26,23 +28,80 @@ __fake_salt__ = secrets.token_hex(64)
@app.before_request
def request_setup():
if 'uid' in flask.session:
try:
flask.g.user = User.load(flask.session['uid'])
except User.DoesNotExist:
flask.flash("You did done got deleted.", 'error')
del flask.session['uid']
flask.g.user = None
except Exception:
# silently fail all other exception types. this means that only
# consequent database failures will trigger an error page, but
# routes that don't use the database at all, will still work.
# this is needed so things like theme asset delivery work when
# there's a database problem.
del flask.session['uid']
flask.g.user = None
else:
flask.g.user = None
client_cert_verified = flask.request.environ.get('SSL_CLIENT_VERIFY')
client_cert_raw = flask.request.environ.get('SSL_CLIENT_CERT')
client_cert = None
client_cert_info = None
flask.g.user = None
flask.g.tls_cipher = flask.request.environ.get('SSL_CIPHER')
flask.g.client_cert_verified = False
flask.g.client_cert_fingerprint = None
flask.g.client_cert_fingerprint_matched = False
if client_cert_verified == 'SUCCESS':
flask.g.client_cert_verified = True
if not client_cert_raw:
app.logger.error("Successful upstream client cert authentication, but not certificate given (fix nginx.conf!)")
else:
try:
client_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, client_cert_raw)
except Exception as e:
if app.debug:
raise
app.logger.error(f"Error while loading client certificate: {str(e)}")
else:
fingerprint = client_cert.digest('sha512').replace(b':', b'')
flask.g.client_cert_fingerprint = fingerprint.decode('ascii')
try:
client_cert_info = ClientCert.get(ClientCert.fingerprint == fingerprint)
flask.g.client_cert_fingerprint_matched = True
except ClientCert.DoesNotExist as e:
if app.debug:
raise
app.logger.error(f"Can't find certificate info for fingerprint '{fingerprint}' in database. Dropped cert or user? Error: {str(e)}")
except Exception as e:
if app.debug:
raise
app.logger.error(f"Generic exception when trying to load client certificate info from database: {str(e)}")
else:
if 'uid' in flask.session:
try:
if client_cert_info.user.id == flask.session['uid']:
flask.g.user = client_cert_info.user
except User.DoesNotExist:
flask.flash("You did done got deleted.", 'error')
del flask.session['uid']
except Exception:
# silently fail all other exception types. this means that only
# consequent database failures will trigger an error page, but
# routes that don't use the database at all, will still work.
# this is needed so things like theme asset delivery work when
# there's a database problem.
del flask.session['uid']
def auth(f):
@ -63,7 +122,11 @@ class User(database.RenderableModel):
salt = peewee.CharField(null=False)
@classmethod
def load(cls, id):
def load(cls, name):
return cls.select().where(cls.name == name).get()
@classmethod
def load_by_id(cls, id):
return cls.select().where(cls.id == id).get()
@classmethod
@ -115,15 +178,73 @@ class User(database.RenderableModel):
return user
@property
def notifications_unread(self):
return self.notifications.where(Notification.read == False)
def notify(self, message):
n = Notification(to=self, message=message)
return n.save(force_insert=True)
@property
def notifications_unread(self):
return self.notifications.where(Notification.read == False)
def gen_keypair_and_clientcert(self, certname, not_after):
invalid_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME']
if not_after > invalid_after:
raise exceptions.ExposedException(f"not_after too far into the future maximum allowed is {str(invalid_after)} but got {str(not_after)}.")
common_name = f'{self.name}:{certname}@{app.config["SITE_NAME"]}'
fd = open('ca/key.pem', 'rb')
ca_key = openssl.crypto.load_privatekey(openssl.crypto.FILETYPE_PEM, fd.read())
fd.close()
del fd
fd = open('ca/cert.pem', 'rb')
ca_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, fd.read())
fd.close()
del fd
keypair = openssl.crypto.PKey()
keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH'])
extensions = [
openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, keyAgreement'),
openssl.crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'),
]
cert = openssl.crypto.X509()
cert.set_version(2) # actually means 3, openssl bad
cert.add_extensions(extensions)
cert.set_issuer(ca_cert.get_subject())
cert.set_pubkey(keypair)
cert.gmtime_adj_notBefore(0) # now
cert.set_notAfter(not_after.strftime('%Y%m%d%H%M%SZ').encode('utf-8'))
cert.set_serial_number(uuid.uuid4().int) # random uuid as int
cert.get_subject().CN = common_name.encode('utf-8')
cert.sign(ca_key, 'sha512')
pkcs12 = openssl.crypto.PKCS12()
pkcs12.set_ca_certificates([ca_cert])
pkcs12.set_privatekey(keypair)
pkcs12.set_certificate(cert)
pkcs12.set_friendlyname(certname.encode('utf-8'))
return pkcs12
class ClientCert(database.Model):
class Meta:
indexes = (
(('user_id', 'name'), True), # unique constraint for (user, name)
)
user = peewee.ForeignKeyField(User, backref='clientcerts')
name = peewee.CharField(null=False)
fingerprint = peewee.CharField(null=False)
class AutoForm(form.Form):

128
cli.py
View File

@ -1,14 +1,20 @@
# builtins
import os
import datetime
import uuid
# third-party
import click
import flask
import OpenSSL as openssl
# internals
from application import app
import util
import database
import admin
# utility functions
def show_model(value):
@ -45,9 +51,10 @@ def cron():
click.secho(f"Cron function {func} failed to run: {str(e)}", fg='red')
cli_user = flask.cli.AppGroup('user')
@cli_user.command('create')
@click.argument("name")
@click.argument("password")
@click.argument('name')
@click.argument('password')
def adduser(name, password):
try:
@ -59,4 +66,119 @@ def adduser(name, password):
else:
click.secho(f"Created user '{name}' (#{user.id}).", fg='green')
cli_ca = flask.cli.AppGroup('ca')
@cli_ca.command('create')
@click.option('--lifetime', prompt="How long should this CA live (in seconds, 0 means infinite)?", default=0)
def ca_create(lifetime):
if os.path.exists('ca'):
click.secho("Directory 'ca' already exists, move or delete it and re-run.", fg='red')
raise click.Abort()
click.echo("Generating keypair…")
keypair = openssl.crypto.PKey()
keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH'])
click.echo("Generating certificate…")
cert = openssl.crypto.X509()
cert.set_serial_number(uuid.uuid4().int) # random uuid, mathmatically ~guaranteed to be unique
issuer = cert.get_issuer()
issuer.commonName = app.config['SITE_NAME']
issuer.C = 'AQ'
issuer.L = 'Free Nation Of Radiant Dissent'
issuer.O = 'Erisian Liberation Front'
issuer.OU = 'Cyber Confusion Center'
cert.set_subject(issuer)
cert.set_pubkey(keypair)
cert.gmtime_adj_notBefore(0)
if lifetime == 0:
cert.set_notAfter(b'99991231235959Z') # "indefinitely valid" as defined in RFC 5280 4.1.2.5.
else:
cert.gmtime_adj_notAfter(lifetime)
extensions = [
openssl.crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, dataEncipherment, keyAgreement, keyCertSign, cRLSign'),
]
cert.add_extensions(extensions)
# sign cert with private key
cert.sign(keypair, "sha512")
click.echo("Creating directory…")
os.mkdir('ca')
click.echo("Saving certificate…")
cert_pem = openssl.crypto.dump_certificate(openssl.crypto.FILETYPE_PEM, cert)
fd = open(os.path.join('ca', 'cert.pem'), 'w')
fd.write(cert_pem.decode('ascii'))
fd.close()
del fd
click.echo("Saving keypair…")
key_pem = openssl.crypto.dump_privatekey(openssl.crypto.FILETYPE_PEM, keypair)
fd = open(os.path.join('ca', 'key.pem'), 'w')
fd.write(key_pem.decode('ascii'))
fd.close()
del fd
click.secho("All done!", fg='green')
cli_clientcert = flask.cli.AppGroup('clientcert')
@cli_clientcert.command('create')
@click.argument('username')
@click.argument('certname')
#@click.option('--not-after', default=None) # probably requires datetime?
def cert_create(username, certname):
import pudb; pudb.set_trace()
try:
user = admin.User.load(username)
except User.DoesNotExist:
click.secho(f"User '{username}' does not exist", fg='red')
raise click.Abort()
not_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME']
try:
pkcs12 = user.gen_keypair_and_clientcert(certname, not_after)
except Exception:
if app.debug:
raise
click.secho("Client certificate creation failed.", fg='red')
raise click.Abort()
passphrase = util.random_string_light()
filename = f'{username}-{certname}.p12'
fd = open(filename, 'wb')
fd.write(pkcs12.export(passphrase=passphrase))
fd.close()
cert_info = admin.ClientCert()
cert_info.user = user
cert_info.name = certname
cert_info.fingerprint = pkcs12.get_certificate().digest('sha512').replace(b':', b'')
cert_info.save(force_insert=True)
click.secho(f"Bundled keypair and cert saved as '{filename}'.", fg='green')
click.echo("Passphrase: " + click.style(passphrase, bold=True, fg='yellow', bg='black'))
app.cli.add_command(cli_user)
cli_ca.add_command(cli_clientcert)
app.cli.add_command(cli_ca)

View File

@ -37,8 +37,12 @@ def page(**template_params):
def wrapper(*args, **kwargs):
params = {
'site_name': app.config['SITE_NAME'],
'user': flask.g.user,
'site_name': app.config['SITE_NAME']
'tls_cipher': flask.g.tls_cipher,
'client_cert_verified': flask.g.client_cert_verified,
'client_cert_fingerprint': flask.g.client_cert_fingerprint,
'client_cert_fingerprint_matched': flask.g.client_cert_fingerprint_matched,
}
params.update(template_params)

View File

@ -1,9 +1,12 @@
# third-party
import werkzeug
import flask
import peewee
import playhouse.postgres_ext
# internals
from application import app
import database
import markdown
import form

0
tests/__init__.py Normal file
View File

23
tests/conftest.py Normal file
View File

@ -0,0 +1,23 @@
# third-party
import pytest
import flask
import peewee
# local imports
import main
@pytest.fixture
def app():
main.app.config['TESTING'] = True
with main.app.app_context():
yield main.app
@pytest.fixture
def echo(capsys):
def echo(msg):
with capsys.disabled():
print(f"Echo: {msg}")
return echo

View File

@ -0,0 +1,85 @@
# built-ins
import secrets
import timeit
import statistics
# third-party
import pytest
import timeit
# internals
import main
# TODO: @pytest.mark.slow or something to exclude this test unless explicitly requested
def test_sha3_timing(echo):
User = main.admin.User
User.delete().where(User.name == 'test').execute()
password = secrets.token_hex(32)
u = User.register('test', password)
times = {}
#repeats = 10000
repeats = 100000
stmt = "User.login('test', password)"
t = timeit.Timer(stmt, globals=locals())
times['pw_correct'] = t.repeat(repeat=repeats, number=1)
#for length in range(1, 32 + 1):
#for length in [4, 8, 16, 32]:
for length in [4, 32]:
password_wrong = secrets.token_hex(length)
stmt = "User.login('test', password_wrong)"
t = timeit.Timer(stmt, globals=locals())
times[f'pw_incorrect_{length}'] = t.repeat(repeat=repeats, number=1)
#for length in range(1, 32 + 1):
#for length in [4, 8, 16, 32]:
for length in [4, 32]:
password_wrong = secrets.token_hex(length)
stmt = "User.login('doesnotexist', password_wrong)"
t = timeit.Timer(stmt, globals=locals())
times[f'user_incorrect_{length}'] = t.repeat(repeat=repeats, number=1)
u.delete_instance()
mins = []
maxs = []
spreads = []
means = []
medians = []
echo("Timing sidechannel test:")
for key, timings in times.items():
mins.append(min(timings))
maxs.append(max(timings))
spreads.append(maxs[-1] - mins[-1])
means.append(statistics.mean(timings))
medians.append(statistics.median(timings))
echo(f"{key}: min {min(timings)}, max {max(timings)}, spread {max(timings) - min(timings)}, mean {statistics.mean(timings)}, median {statistics.median(timings)}")
spread_min = max(mins) - min(mins)
spread_max = max(maxs) - min(maxs)
spread_spread = max(spreads) - min(spreads)
spread_mean = max(means) - min(means)
spread_median = max(medians) - min(medians)
echo(f"spread min: {spread_min}")
echo(f"spread max: {spread_max}")
echo(f"spread spread: {spread_spread}")
echo(f"spread mean: {spread_mean}")
echo(f"spread median: {spread_median}")
assert spread_min < 0.001, "spread for minimum execution time too large"
assert spread_max < 0.001, "spread for maximum execution time too large"
assert spread_spread < 0.001, "spread for execution time spread too large"
assert spread_mean < 0.01, "spread for mean execution time too large"
assert spread_median < 0.01, "spread for median execution time too large"

View File

@ -112,6 +112,24 @@ body > .menus {
body > header {
}
.security-info {
position: absolute;
max-width: 36ex;
}
.security-info span {
font-size: 70%;
color: var(--color-highlight-inactive);
}
.security-info .client-cert-fingerprint {
word-wrap: break-word;
}
.security-info .client-cert-fingerprint.unmatched {
color: var(--color-error);
}
.logo {
margin: 0 var(--distance-main);
}

View File

@ -11,8 +11,24 @@
class="content-type-{{ content._lowerclass }}"
{% endif %}
>
<header>
<div class="security-info">
{% if tls_cipher %}
<span class="tls-cipher">{{ tls_cipher }}</span>
{% endif %}
{% if client_cert_verified %}
{% if client_cert_fingerprint %}
<span
class="client-cert-fingerprint {% if client_cert_fingerprint_matched %}matched{% else %}unmatched{% endif %}"
>{{ client_cert_fingerprint }}</span>
{% endif %}
{% endif %}
</div>
<a class="logo" href="/"><img src="/theme/svg/logo.svg" /></a>
</header>
<div class="menus">

View File

@ -1,3 +1,4 @@
# builtins
import warnings
import random
import datetime