# builtins import time import datetime import functools import hashlib import secrets import uuid # third-party import werkzeug, flask import peewee, playhouse.postgres_ext import OpenSSL as openssl # internals from application import app import util import rendering import form import database import markdown # static salt for fake hash comparison; this is part of timing sidechannel # mitigation. generated once at boot so it doesn't take extra time to generate # when faking a login check for a non-existing user. __fake_salt__ = secrets.token_hex(64) @app.before_request def request_setup(): client_cert_verified = flask.request.environ.get('SSL_CLIENT_VERIFY') client_cert_raw = flask.request.environ.get('SSL_CLIENT_CERT') client_cert = None client_cert_info = None flask.g.user = None flask.g.tls_cipher = flask.request.environ.get('SSL_CIPHER') flask.g.client_cert_verified = False flask.g.client_cert_fingerprint = None # SHA2-512 (without ':' separators) flask.g.client_cert_fingerprint_matched = False flask.g.client_cert_info = None # ClientCert object if client_cert_verified == 'SUCCESS': flask.g.client_cert_verified = True if not client_cert_raw: app.logger.error("Successful upstream client cert authentication, but not certificate given (fix nginx.conf!)") else: try: client_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, client_cert_raw) except Exception as e: if app.debug: raise app.logger.error(f"Error while loading client certificate: {str(e)}") else: fingerprint = client_cert.digest('sha512').replace(b':', b'') flask.g.client_cert_fingerprint = fingerprint.decode('ascii') try: client_cert_info = ClientCert.get(ClientCert.fingerprint == fingerprint) flask.g.client_cert_info = client_cert_info flask.g.client_cert_fingerprint_matched = True except ClientCert.DoesNotExist as e: app.logger.error(f"Can't find certificate info for fingerprint '{fingerprint}' in database. Dropped cert or user? Error: {str(e)}") except Exception as e: if app.debug: raise app.logger.error(f"Generic exception when trying to load client certificate info from database: {str(e)}") else: if 'uid' in flask.session: try: if client_cert_info.user.id == flask.session['uid']: flask.g.user = client_cert_info.user except User.DoesNotExist: flask.flash("You did done got deleted.", 'error') del flask.session['uid'] except Exception: # silently fail all other exception types. this means that only # consequent database failures will trigger an error page, but # routes that don't use the database at all, will still work. # this is needed so things like theme asset delivery work when # there's a database problem. del flask.session['uid'] def auth(f): @functools.wraps(f) def wrapper(*args, **kwargs): if isinstance(flask.g.user, User): return f(*args, **kwargs) flask.abort(403) # raises HTTPException return wrapper class User(database.RenderableModel): name = peewee.CharField(null=False, unique=True, verbose_name='Name') password = peewee.CharField(null=False, verbose_name='Password') salt = peewee.CharField(null=False) @classmethod def load(cls, name): return cls.select().where(cls.name == name).get() @classmethod def load_by_id(cls, id): return cls.select().where(cls.id == id).get() @classmethod def __gen_salt__(cls): return secrets.token_hex(64) # 64 *byte* (512 bit), so string length of 128 as hex @classmethod def __gen_hash__(cls, salt, password): iterations = 100000 # ~50ms at ~5GHz hash = hashlib.sha3_512(bytes(app.config['PEPPER'] + salt + password, 'UTF-8')) for i in range(1, iterations): hash = hashlib.sha3_512(hash.digest()) return hash.hexdigest() @classmethod def login(cls, name, password): r = False time_rand = secrets.randbelow(100) # random delay to mitigate timing sidechannel try: user = cls.select().where(cls.name == name).get() if user.password == cls.__gen_hash__(user.salt, password): r = user except cls.DoesNotExist: # TODO: benchmark this to see whether we need an extra delay cls.__gen_hash__(__fake_salt__, password) time.sleep(time_rand / 1000) return r @classmethod def register(cls, name, password): user = cls() user.name = name user.salt = cls.__gen_salt__() user.password = cls.__gen_hash__(user.salt, password) user.save(force_insert=True) return user @property def notifications_unread(self): return self.notifications.where(Notification.read == False) def notify(self, message): n = Notification(to=self, message=message) return n.save(force_insert=True) def update_password(self, password): self.salt = self.__gen_salt__() # new random salt self.password = self.__gen_hash__(self.salt, password) return self.save() def gen_keypair_and_clientcert(self, certname, not_after): invalid_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME'] if not_after > invalid_after: raise exceptions.ExposedException(f"not_after too far into the future maximum allowed is {str(invalid_after)} but got {str(not_after)}.") common_name = f'{self.name}:{certname}@{app.config["SITE_NAME"]}' fd = open('ca/key.pem', 'rb') ca_key = openssl.crypto.load_privatekey(openssl.crypto.FILETYPE_PEM, fd.read()) fd.close() del fd fd = open('ca/cert.pem', 'rb') ca_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, fd.read()) fd.close() del fd keypair = openssl.crypto.PKey() keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH']) extensions = [ openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, keyAgreement'), openssl.crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'), ] cert = openssl.crypto.X509() cert.set_version(2) # actually means 3, openssl bad cert.add_extensions(extensions) cert.set_issuer(ca_cert.get_subject()) cert.set_pubkey(keypair) cert.gmtime_adj_notBefore(0) # now cert.set_notAfter(not_after.strftime('%Y%m%d%H%M%SZ').encode('utf-8')) cert.set_serial_number(uuid.uuid4().int) # random uuid as int cert.get_subject().CN = common_name.encode('utf-8') cert.sign(ca_key, 'sha512') pkcs12 = openssl.crypto.PKCS12() pkcs12.set_ca_certificates([ca_cert]) pkcs12.set_privatekey(keypair) pkcs12.set_certificate(cert) pkcs12.set_friendlyname(certname.encode('utf-8')) return pkcs12 class ClientCert(database.Model): class Meta: indexes = ( (('user_id', 'name'), True), # unique constraint for (user, name) ) user = peewee.ForeignKeyField(User, backref='clientcerts', on_delete='CASCADE') name = peewee.CharField(null=False) fingerprint = peewee.CharField(null=False) class AutoForm(form.Form): def __init__(self, administerable, mode, *args, **kwargs): super().__init__(*args, **kwargs) self.administerable = administerable self.menu = administerable.menu self.mode = mode if mode == 'delete': self.intro = f"You are about to delete this {self.administerable.__class__.__name__}, this cannot be undone. Proceed?" self.buttons[mode] = form.Button(label='Yes, delete') else: for field_name in administerable.__class__._meta.columns.keys(): if field_name not in administerable.__class__.autoform_blacklist: self[field_name] = administerable.form_field(field_name) button_label = f'Create {administerable._lowerclass}' if mode == 'create' else f'Save {administerable._lowerclass}' self.buttons[mode] = form.Button(label=button_label) if isinstance(administerable, Named): self.fields.position('name', 0) def process(self, submit): if self.valid: processable_fields = [field for field in self.fields.values() if not field.name in self.administerable.__class__.autoform_blacklist] for field in processable_fields: if not isinstance(field, form.Fieldset) and not isinstance(field.value, werkzeug.datastructures.FileStorage): # FIXME: hacky way to preserve old filename value for Upload setattr(self.administerable, field.name, field.value) for field in processable_fields: # call extra field handling (usually a no-op) # doing this in an extra loop so form_field_handle # can depend on properties in administerable being filled self.administerable.form_field_handle(field, self.mode) try: if self.mode == 'create': self.administerable.save(force_insert=True) elif self.mode == 'edit': self.administerable.save() elif self.mode == 'delete': self.administerable.delete_instance() except peewee.IntegrityError as e: diag = e.orig.diag # psycopg2 diagnostic object if diag.source_function == '_bt_check_unique': # we're dealing with a unique constraint # assuming _ for constraint_name parts = diag.constraint_name.split('_', 1) if len(parts) == 2: # ignore constraint names without underscores tablename, fieldname = parts if tablename == self.administerable._lowerclass and fieldname in self.fields: field = self[fieldname] error = form.ValidationError(f"{field.label} must be unique.") field.errors.append(error) self.errors.append(error) flask.flash("A problem occured when saving.", 'error') app.logger.warning(f"IntegrityError when saving object from AutoForm: {str(e)}") else: if self.mode == 'create': flask.flash(f"Successfully created new {self.administerable._lowerclass}.", 'success') return flask.redirect(self.administerable.url('edit')) elif self.mode == 'edit': flask.flash(f"Successfully saved {self.administerable._lowerclass}.", 'success') return flask.redirect(self.administerable.url('edit')) # makes sure we get to the right url even if id/name changed elif self.mode == 'delete': flask.flash(f"Successfully deleted {self.administerable._lowerclass}.", 'success') return flask.redirect(flask.url_for(f'admin.{self.administerable._lowerclass}_listing')) # TODO: make this cleaner, implement cls.url_listing? @app.abstract class Administerable(database.RenderableModel): id = playhouse.postgres_ext.IdentityField(generate_always=True, verbose_name='ID') autoform_blacklist = ('id',) autoform_class = AutoForm @classmethod def load(cls, id): return cls.select().where(cls.id == id).get() @classmethod def list(cls): return cls.select().order_by(cls.id.desc()) @classmethod def view(cls, mode='full', **kwargs): if mode in ('create', 'edit', 'delete'): @rendering.page(mode=mode) def wrapped(**kw): if mode == 'create': instance = cls() else: instance = cls.load(kw['id']) if not instance.access(mode): flask.abort(404) form = instance.form(mode) if flask.request.method == 'POST': redirect = form.handle() if isinstance(redirect, werkzeug.Response): return redirect return form else: @rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility def wrapped(**kw): instance = cls.load(kw['id']) if not instance.access(mode): flask.abort(404) return instance return wrapped(**kwargs) @property def menu(self): if self.is_in_db and flask.g.user: # TODO: make this work with .url() in a way that doesn't break trail detection in Menu items = [] if self.__class__ in app.models_exposed.values(): # full view added by @expose items.append({ 'endpoint': f'{self._lowerclass}_full', 'params': { 'id' : self.id }, 'label': 'View', }) # admin routes automatically added by register_admin items.extend([ { 'endpoint': f'admin.{self._lowerclass}_edit', 'params': { 'id' : self.id }, 'label': 'Edit', }, { 'endpoint': f'admin.{self._lowerclass}_delete', 'params': { 'id' : self.id }, 'label': 'Delete', } ]) return rendering.Menu( f'{self._lowerclass}-{self.id}-tabs', items, extra_classes=[f'menu-{self._lowerclass}', 'tabs'] ) @property def is_in_db(self): return bool(self.id) def access(self, mode='full'): if isinstance(flask.g.user, User): return True # access allowed for anything but a list of "privileged" modes return mode not in ('create', 'edit', 'delete', 'admin-teaser') def url(self, mode='full'): if mode == 'full': if self.__class__ not in app.models_exposed.values(): raise TypeError("Use of .url(mode='full') is only possible on @exposed classes.") return flask.url_for(f"{self._lowerclass}_full", id=self.id) elif mode == 'edit': return flask.url_for(f"admin.{self._lowerclass}_edit", id=self.id) elif mode == 'create': return flask.url_for(f"admin.{self._lowerclass}_create") elif mode == 'teaser': return flask.url_for(f"{cls._lowerclass}_listing") # TODO: pagination def form(self, mode, **kwargs): if 'id' not in kwargs: kwargs['id'] = f'{self._lowerclass}-{mode}' if 'title' not in kwargs: kwargs['title'] = f"{mode.capitalize()} {self.__class__.__name__}" return self.autoform_class(self, mode, **kwargs) def form_field(self, field_name): db_field = getattr(self.__class__, field_name) field_label = db_field.verbose_name or field_name field_help = db_field.help_text field_value = getattr(self, field_name) field_required = not db_field.null if isinstance(db_field, peewee.ForeignKeyField): return form.ForeignKeySelect(db_field, label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.IntegerField): return form.Integer(label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.FloatField): return form.Float(label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.TextField): return form.TextArea(label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.CharField): return form.Text(label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.BooleanField): # required for checkbox means it MUST be checked (forbidding unchecked, i.e. False) return form.Checkbox(label=field_label, value=field_value, help=field_help, required=False) if isinstance(db_field, peewee.DateField): return form.DateField(label=field_label, value=field_value, help=field_help, required=field_required) if isinstance(db_field, peewee.DateTimeField): return form.DateTime(label=field_label, value=field_value, help=field_help, required=field_required) # fallback app.logger.warning(f"AutoForm using fallback field construction for {self.__class__.__name__}.{field_name}") return form.Text(label=field_label, value=field_value, help=field_help, required=field_required) def form_field_handle(self, field, mode): pass class Notification(database.RenderableModel): to = peewee.ForeignKeyField(User, backref='notifications') created = peewee.DateTimeField(default=datetime.datetime.utcnow, null=False) read = peewee.BooleanField(default=False, null=False) message = markdown.MarkdownTextField() @app.abstract class Named(Administerable): name = peewee.CharField(unique=True, null=False, verbose_name='Name', constraints=[peewee.Check('"name" ~ \'^[a-z0-9_\-]+$\'')]) @classmethod def load(cls, name): return cls.select().where(cls.name == name).get() @classmethod def view(cls, mode='full', **kwargs): if mode in ('create', 'edit', 'delete'): @rendering.page(mode=mode) def wrapped(**kw): if mode == 'create': instance = cls() else: instance = cls.load(kw['name']) if not instance.access(mode): flask.abort(404) form = instance.form(mode) if flask.request.method == 'POST': redirect = form.handle() for error in form.errors: flask.flash(str(error), 'error') if isinstance(redirect, werkzeug.Response): return redirect return form else: @rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility def wrapped(**kw): instance = cls.load(kw['name']) if not instance.access(mode): flask.abort(404) return instance return wrapped(**kwargs) @property def menu(self): if self.is_in_db and flask.g.user: # TODO: make this work with .url() in a way that doesn't break trail detection in Menu items = [] if self.__class__ in app.models_exposed.values(): # full view added by @expose items.append({ 'endpoint': f'{self._lowerclass}_full', 'params': { 'name' : self.name }, 'label': 'View', }) # admin routes automatically added by register_admin items.extend([ { 'endpoint': f'admin.{self._lowerclass}_edit', 'params': { 'name' : self.name }, 'label': 'Edit', }, { 'endpoint': f'admin.{self._lowerclass}_delete', 'params': { 'name' : self.name }, 'label': 'Delete', } ]) return rendering.Menu( f'{self._lowerclass}-{self.id}-tabs', items, extra_classes=[f'menu-{self._lowerclass}', 'tabs'] ) def url(self, mode='full'): if mode in ('teaser', 'full') and self.__class__ not in app.models_exposed.values(): raise TypeError("Trying to get public URL for non-exposed model {self.__class__.__name__}.") if mode == 'full': return flask.url_for(f"{self._lowerclass}_full", name=self.name) elif mode == 'edit': return flask.url_for(f"admin.{self._lowerclass}_edit", name=self.name) elif mode == 'create': return flask.url_for(f"admin.{self._lowerclass}_create") elif mode == 'teaser': return flask.url_for(f"{self._lowerclass}_listing") # TODO: pagination def form_field(self, field_name): f = super().form_field(field_name) if field_name == 'name': f.validators.append( functools.partial(form.validate_regexp, r'^[a-z0-9_\-]+$') ) return f def register_administerable(cls): # not in register_admin below so view functions are in a new scope # for each model (otherwise the last view function definitions would # get registered to all routes). @auth @rendering.page(title_show=False) def view_listing(): menu = rendering.Menu( f'{cls._lowerclass}-listing-actions', [{ 'endpoint': f'admin.{cls._lowerclass}_create', 'label': 'Create new', }] ) return rendering.Listing(cls.select(), title=cls.__name__, mode='admin-teaser', menu=menu) @auth def view_create(*args, **kwargs): kwargs['mode'] = 'create' return cls.view(*args, **kwargs) @auth def view_edit(*args, **kwargs): kwargs['mode'] = 'edit' return cls.view(*args, **kwargs) @auth def view_delete(*args, **kwargs): kwargs['mode'] = 'delete' return cls.view(*args, **kwargs) app.admin.add_url_rule(f'/{cls._lowerclass}/', f'{cls._lowerclass}_listing', view_listing) app.admin.add_url_rule(f'/{cls._lowerclass}/create/', f'{cls._lowerclass}_create', view_create, methods=['GET', 'POST']) if issubclass(cls, Named): app.admin.add_url_rule(f'/{cls._lowerclass}//', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST']) app.admin.add_url_rule(f'/{cls._lowerclass}//delete/', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST']) else: app.admin.add_url_rule(f'/{cls._lowerclass}//', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST']) app.admin.add_url_rule(f'/{cls._lowerclass}//delete', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST']) class Dashboard(rendering.Renderable): def __init__(self, **kwargs): super().__init__(**kwargs) self.commentables = {} Commentable = Administerable.__class_descendants__['Commentable'] # avoids circular dependency Upload = Administerable.__class_descendants__['Upload'] preview_classes = list(Commentable.__class_descendants__.values()) preview_classes.extend(Upload.__class_descendants__.values()) preview_classes.append(Administerable.__class_descendants__['Gallery']) for cls in preview_classes: if cls not in app.models_abstract: info = {} info['menu'] = rendering.Menu( f'dashboard-preview-actions-{cls._lowerclass}', [ { 'endpoint': f'admin.{cls._lowerclass}_listing', 'label': cls.__name__, }, { 'endpoint': f'admin.{cls._lowerclass}_create', 'label': 'Create new', }, ] ) if hasattr(cls, 'created'): info['listing'] = rendering.Listing(cls.select().order_by(cls.created.desc()).limit(3)) else: info['listing'] = rendering.Listing(cls.select().order_by(cls.id.desc()).limit(3)) self.commentables[cls._lowerclass] = info self.menu_others = rendering.Menu('dashboard-others') for cls in Administerable.__class_descendants__.values(): if cls not in app.models_abstract and cls not in preview_classes: self.menu_others.append( endpoint=f'admin.{cls._lowerclass}_listing', label=cls.__name__ ) @app.admin.route('/') @auth @rendering.page() def admin_home(): return Dashboard() @app.admin.route('/login/', methods=['GET', 'POST']) @rendering.page() def admin_login(): if not flask.g.client_cert_verified: flask.abort(403) elif not flask.g.client_cert_fingerprint_matched: flask.abort(400) else: f = form.Form(title='Login') #f['name'] = form.Text(label='Name', required=True) f['password'] = form.Password(label='Password', required=True) f.buttons['login'] = form.Button(name='login', label='Log in') if flask.request.method == 'POST': f.handle() # binds and validates field values #name = f['name'].value name = flask.g.client_cert_info.user.name password = f['password'].value user = User.login(name, password) if user: flask.session['uid'] = user.id flask.flash(f"Welcome, {user.name}.", 'success') return flask.redirect(flask.url_for('admin.admin_home')) else: flask.flash("Invalid login.", 'error') return f @app.admin.route('/logout/') @auth def admin_logout(): del flask.session['uid'] return flask.redirect(flask.url_for('frontpage')) class NotificationForm(form.Form): def __init__(self, filter, **kwargs): super().__init__(**kwargs) if filter == 'unread': notifications = flask.g.user.notifications_unread elif filter == 'read': notifications = flask.g.user.notifications.where(Notification.read == True) elif filter == 'all': notifications = flask.g.user.notifications else: raise ValueError("Invalid filter") self.filters = rendering.Menu( 'notification-actions', items=( { 'endpoint': 'admin.admin_notifications', 'params': {'filter': 'unread'}, 'label': 'Unread', }, { 'endpoint': 'admin.admin_notifications', 'params': {'filter': 'read'}, 'label': 'Read', }, { 'endpoint': 'admin.admin_notifications', 'params': {'filter': 'all'}, 'label': 'All', }, ) ) self.notifications = form.MultiGroup(form.types.INT, 'notifications', required=True) for notification in notifications: #fs = form.Fieldset(name=f'notification-{notification.id}') fs = form.Fieldset() fs[f'notification-preview-{notification.id}'] = form.RenderableField(notification, mode='inline') fs[f'notification-{notification.id}'] = form.Checkbox(group=self.notifications, value=notification.id) self[f'notification-{notification.id}'] = fs self.buttons['mark'] = form.Button(label='Mark as read') self.buttons['delete'] = form.Button(label='Delete') def process(self, submit): if submit == 'mark': count = Notification.update( read=True ).where(Notification.id << self.notifications.values).execute() flask.flash(f"Marked {count} notifications as read.", 'success') elif submit == 'delete': count = Notification.delete().where( Notification.id << self.notifications.values ).execute() flask.flash(f"Deleted {count} notifications.", 'success') return flask.redirect('') @app.admin.route('/notifications/', methods=['GET', 'POST']) @app.admin.route('/notifications//', methods=['GET', 'POST']) @auth @rendering.page() def admin_notifications(filter='unread'): f = NotificationForm(filter, title='Notifications') if flask.request.method == 'POST': redirect = f.handle() if redirect: return redirect return f @app.admin.menu('main') def admin_menu_main(): items = [ { 'endpoint': 'frontpage', 'label': 'Home', }, ] if isinstance(flask.g.user, User): # empty menu without login items.append({ 'endpoint': 'admin.admin_home', 'label': 'Admin', }) notification_count = len(flask.g.user.notifications_unread) if notification_count == 0: notification_label = 'No unread notifications' elif notification_count == 1: notification_label = 'One unread notification' else: notification_label = f"{notification_count} unread notifications" items.append({ 'endpoint': 'admin.admin_notifications', 'label': notification_label, }) items.append({ 'endpoint': 'admin.comment_moderation', 'label': 'Comment moderation', }) return rendering.Menu('main', items, burger=True) @app.admin.menu('secondary') def admin_menu_secondary(): items = [] if isinstance(flask.g.user, User): # logged in items.append({ 'endpoint': 'admin.admin_logout', 'label': 'Log out', }) else: items.append({ 'endpoint': 'admin.admin_login', 'label': 'Log in', }) return rendering.Menu('secondary', items) @app.boot def register_admin(): for cls in Administerable.__class_descendants__.values(): if not cls in app.models_abstract: register_administerable(cls) app.register_blueprint(app.admin)