951 lines
30 KiB
Python
951 lines
30 KiB
Python
# builtins
|
|
import time
|
|
import datetime
|
|
import functools
|
|
import hashlib
|
|
import secrets
|
|
import uuid
|
|
|
|
# third-party
|
|
import werkzeug, flask
|
|
import peewee, playhouse.postgres_ext
|
|
import OpenSSL as openssl
|
|
|
|
# internals
|
|
from application import app
|
|
|
|
import util
|
|
import rendering
|
|
import form
|
|
import database
|
|
import markdown
|
|
|
|
# static salt for fake hash comparison; this is part of timing sidechannel
|
|
# mitigation. generated once at boot so it doesn't take extra time to generate
|
|
# when faking a login check for a non-existing user.
|
|
__fake_salt__ = secrets.token_hex(64)
|
|
|
|
@app.before_request
|
|
def request_setup():
|
|
|
|
client_cert_verified = flask.request.environ.get('SSL_CLIENT_VERIFY')
|
|
client_cert_raw = flask.request.environ.get('SSL_CLIENT_CERT')
|
|
client_cert = None
|
|
client_cert_info = None
|
|
|
|
flask.g.user = None
|
|
flask.g.tls_cipher = flask.request.environ.get('SSL_CIPHER')
|
|
flask.g.client_cert_verified = False
|
|
flask.g.client_cert_fingerprint = None # SHA2-512 (without ':' separators)
|
|
flask.g.client_cert_fingerprint_matched = False
|
|
flask.g.client_cert_info = None # ClientCert object
|
|
|
|
if client_cert_verified == 'SUCCESS':
|
|
|
|
flask.g.client_cert_verified = True
|
|
|
|
if not client_cert_raw:
|
|
app.logger.error("Successful upstream client cert authentication, but not certificate given (fix nginx.conf!)")
|
|
else:
|
|
|
|
try:
|
|
client_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, client_cert_raw)
|
|
|
|
except Exception as e:
|
|
|
|
if app.debug:
|
|
raise
|
|
|
|
app.logger.error(f"Error while loading client certificate: {str(e)}")
|
|
|
|
else:
|
|
|
|
fingerprint = client_cert.digest('sha512').replace(b':', b'')
|
|
flask.g.client_cert_fingerprint = fingerprint.decode('ascii')
|
|
|
|
try:
|
|
|
|
client_cert_info = ClientCert.get(ClientCert.fingerprint == fingerprint)
|
|
flask.g.client_cert_info = client_cert_info
|
|
flask.g.client_cert_fingerprint_matched = True
|
|
|
|
except ClientCert.DoesNotExist as e:
|
|
|
|
app.logger.error(f"Can't find certificate info for fingerprint '{fingerprint}' in database. Dropped cert or user? Error: {str(e)}")
|
|
|
|
except Exception as e:
|
|
|
|
if app.debug:
|
|
raise
|
|
|
|
app.logger.error(f"Generic exception when trying to load client certificate info from database: {str(e)}")
|
|
|
|
else:
|
|
|
|
if 'uid' in flask.session:
|
|
try:
|
|
|
|
if client_cert_info.user.id == flask.session['uid']:
|
|
flask.g.user = client_cert_info.user
|
|
|
|
except User.DoesNotExist:
|
|
|
|
flask.flash("You did done got deleted.", 'error')
|
|
del flask.session['uid']
|
|
|
|
except Exception:
|
|
|
|
# silently fail all other exception types. this means that only
|
|
# consequent database failures will trigger an error page, but
|
|
# routes that don't use the database at all, will still work.
|
|
# this is needed so things like theme asset delivery work when
|
|
# there's a database problem.
|
|
del flask.session['uid']
|
|
|
|
def auth(f):
|
|
|
|
@functools.wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
|
|
if isinstance(flask.g.user, User):
|
|
return f(*args, **kwargs)
|
|
|
|
flask.abort(403) # raises HTTPException
|
|
|
|
return wrapper
|
|
|
|
class User(database.RenderableModel):
|
|
|
|
name = peewee.CharField(null=False, unique=True, verbose_name='Name')
|
|
password = peewee.CharField(null=False, verbose_name='Password')
|
|
salt = peewee.CharField(null=False)
|
|
|
|
@classmethod
|
|
def load(cls, name):
|
|
return cls.select().where(cls.name == name).get()
|
|
|
|
@classmethod
|
|
def load_by_id(cls, id):
|
|
return cls.select().where(cls.id == id).get()
|
|
|
|
@classmethod
|
|
def __gen_salt__(cls):
|
|
return secrets.token_hex(64) # 64 *byte* (512 bit), so string length of 128 as hex
|
|
|
|
@classmethod
|
|
def __gen_hash__(cls, salt, password):
|
|
|
|
iterations = 100000 # ~50ms at ~5GHz
|
|
hash = hashlib.sha3_512(bytes(app.config['PEPPER'] + salt + password, 'UTF-8'))
|
|
|
|
for i in range(1, iterations):
|
|
hash = hashlib.sha3_512(hash.digest())
|
|
|
|
return hash.hexdigest()
|
|
|
|
@classmethod
|
|
def login(cls, name, password):
|
|
|
|
r = False
|
|
time_rand = secrets.randbelow(100) # random delay to mitigate timing sidechannel
|
|
|
|
try:
|
|
|
|
user = cls.select().where(cls.name == name).get()
|
|
|
|
if user.password == cls.__gen_hash__(user.salt, password):
|
|
r = user
|
|
|
|
except cls.DoesNotExist:
|
|
|
|
# TODO: benchmark this to see whether we need an extra delay
|
|
cls.__gen_hash__(__fake_salt__, password)
|
|
|
|
time.sleep(time_rand / 1000)
|
|
|
|
return r
|
|
|
|
@classmethod
|
|
def register(cls, name, password):
|
|
|
|
user = cls()
|
|
user.name = name
|
|
user.salt = cls.__gen_salt__()
|
|
user.password = cls.__gen_hash__(user.salt, password)
|
|
|
|
user.save(force_insert=True)
|
|
|
|
return user
|
|
|
|
@property
|
|
def notifications_unread(self):
|
|
return self.notifications.where(Notification.read == False)
|
|
|
|
def notify(self, message):
|
|
|
|
n = Notification(to=self, message=message)
|
|
|
|
return n.save(force_insert=True)
|
|
|
|
def update_password(self, password):
|
|
|
|
self.salt = self.__gen_salt__() # new random salt
|
|
self.password = self.__gen_hash__(self.salt, password)
|
|
|
|
return self.save()
|
|
|
|
def gen_keypair_and_clientcert(self, certname, not_after):
|
|
|
|
invalid_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME']
|
|
|
|
if not_after > invalid_after:
|
|
raise exceptions.ExposedException(f"not_after too far into the future maximum allowed is {str(invalid_after)} but got {str(not_after)}.")
|
|
|
|
common_name = f'{self.name}:{certname}@{app.config["SITE_NAME"]}'
|
|
|
|
fd = open('ca/key.pem', 'rb')
|
|
ca_key = openssl.crypto.load_privatekey(openssl.crypto.FILETYPE_PEM, fd.read())
|
|
fd.close()
|
|
del fd
|
|
|
|
fd = open('ca/cert.pem', 'rb')
|
|
ca_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, fd.read())
|
|
fd.close()
|
|
del fd
|
|
|
|
keypair = openssl.crypto.PKey()
|
|
keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH'])
|
|
|
|
extensions = [
|
|
openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, keyAgreement'),
|
|
openssl.crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'),
|
|
]
|
|
|
|
cert = openssl.crypto.X509()
|
|
cert.set_version(2) # actually means 3, openssl bad
|
|
cert.add_extensions(extensions)
|
|
cert.set_issuer(ca_cert.get_subject())
|
|
cert.set_pubkey(keypair)
|
|
cert.gmtime_adj_notBefore(0) # now
|
|
cert.set_notAfter(not_after.strftime('%Y%m%d%H%M%SZ').encode('utf-8'))
|
|
cert.set_serial_number(uuid.uuid4().int) # random uuid as int
|
|
cert.get_subject().CN = common_name.encode('utf-8')
|
|
|
|
cert.sign(ca_key, 'sha512')
|
|
|
|
pkcs12 = openssl.crypto.PKCS12()
|
|
pkcs12.set_ca_certificates([ca_cert])
|
|
pkcs12.set_privatekey(keypair)
|
|
pkcs12.set_certificate(cert)
|
|
pkcs12.set_friendlyname(certname.encode('utf-8'))
|
|
|
|
return pkcs12
|
|
|
|
class ClientCert(database.Model):
|
|
|
|
class Meta:
|
|
indexes = (
|
|
(('user_id', 'name'), True), # unique constraint for (user, name)
|
|
)
|
|
|
|
user = peewee.ForeignKeyField(User, backref='clientcerts', on_delete='CASCADE')
|
|
name = peewee.CharField(null=False)
|
|
fingerprint = peewee.CharField(null=False)
|
|
|
|
class AutoForm(form.Form):
|
|
|
|
def __init__(self, administerable, mode, *args, **kwargs):
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.administerable = administerable
|
|
self.menu = administerable.menu
|
|
self.mode = mode
|
|
|
|
if mode == 'delete':
|
|
|
|
self.intro = f"You are about to delete this {self.administerable.__class__.__name__}, this cannot be undone. Proceed?"
|
|
self.buttons[mode] = form.Button(label='Yes, delete')
|
|
|
|
else:
|
|
|
|
for field_name in administerable.__class__._meta.columns.keys():
|
|
|
|
if field_name not in administerable.__class__.autoform_blacklist:
|
|
self[field_name] = administerable.form_field(field_name)
|
|
|
|
button_label = f'Create {administerable._lowerclass}' if mode == 'create' else f'Save {administerable._lowerclass}'
|
|
self.buttons[mode] = form.Button(label=button_label)
|
|
|
|
if isinstance(administerable, Named):
|
|
self.fields.position('name', 0)
|
|
|
|
def process(self, submit):
|
|
|
|
if self.valid:
|
|
|
|
processable_fields = [field for field in self.fields.values() if not field.name in self.administerable.__class__.autoform_blacklist]
|
|
|
|
for field in processable_fields:
|
|
|
|
if not isinstance(field, form.Fieldset) and not isinstance(field.value, werkzeug.datastructures.FileStorage): # FIXME: hacky way to preserve old filename value for Upload
|
|
setattr(self.administerable, field.name, field.value)
|
|
|
|
for field in processable_fields:
|
|
|
|
# call extra field handling (usually a no-op)
|
|
# doing this in an extra loop so form_field_handle
|
|
# can depend on properties in administerable being filled
|
|
self.administerable.form_field_handle(field, self.mode)
|
|
|
|
try:
|
|
if self.mode == 'create':
|
|
self.administerable.save(force_insert=True)
|
|
elif self.mode == 'edit':
|
|
self.administerable.save()
|
|
elif self.mode == 'delete':
|
|
self.administerable.delete_instance()
|
|
|
|
except peewee.IntegrityError as e:
|
|
|
|
diag = e.orig.diag # psycopg2 diagnostic object
|
|
|
|
if diag.source_function == '_bt_check_unique': # we're dealing with a unique constraint
|
|
|
|
# assuming <tablename>_<fieldname> for constraint_name
|
|
parts = diag.constraint_name.split('_', 1)
|
|
|
|
if len(parts) == 2: # ignore constraint names without underscores
|
|
|
|
tablename, fieldname = parts
|
|
|
|
if tablename == self.administerable._lowerclass and fieldname in self.fields:
|
|
|
|
field = self[fieldname]
|
|
error = form.ValidationError(f"{field.label} must be unique.")
|
|
|
|
field.errors.append(error)
|
|
self.errors.append(error)
|
|
|
|
flask.flash("A problem occured when saving.", 'error')
|
|
app.logger.warning(f"IntegrityError when saving object from AutoForm: {str(e)}")
|
|
|
|
else:
|
|
if self.mode == 'create':
|
|
flask.flash(f"Successfully created new {self.administerable._lowerclass}.", 'success')
|
|
return flask.redirect(self.administerable.url('edit'))
|
|
elif self.mode == 'edit':
|
|
flask.flash(f"Successfully saved {self.administerable._lowerclass}.", 'success')
|
|
return flask.redirect(self.administerable.url('edit')) # makes sure we get to the right url even if id/name changed
|
|
elif self.mode == 'delete':
|
|
flask.flash(f"Successfully deleted {self.administerable._lowerclass}.", 'success')
|
|
return flask.redirect(flask.url_for(f'admin.{self.administerable._lowerclass}_listing')) # TODO: make this cleaner, implement cls.url_listing?
|
|
|
|
@app.abstract
|
|
class Administerable(database.RenderableModel):
|
|
|
|
id = playhouse.postgres_ext.IdentityField(generate_always=True, verbose_name='ID')
|
|
autoform_blacklist = ('id',)
|
|
autoform_class = AutoForm
|
|
|
|
@classmethod
|
|
def load(cls, id):
|
|
return cls.select().where(cls.id == id).get()
|
|
|
|
@classmethod
|
|
def list(cls):
|
|
return cls.select().order_by(cls.id.desc())
|
|
|
|
@classmethod
|
|
def view(cls, mode='full', **kwargs):
|
|
|
|
if mode in ('create', 'edit', 'delete'):
|
|
|
|
@rendering.page(mode=mode)
|
|
def wrapped(**kw):
|
|
|
|
if mode == 'create':
|
|
instance = cls()
|
|
|
|
else:
|
|
instance = cls.load(kw['id'])
|
|
|
|
if not instance.access(mode):
|
|
flask.abort(404)
|
|
|
|
form = instance.form(mode)
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
redirect = form.handle()
|
|
|
|
if isinstance(redirect, werkzeug.Response):
|
|
return redirect
|
|
|
|
return form
|
|
|
|
else:
|
|
@rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility
|
|
def wrapped(**kw):
|
|
|
|
instance = cls.load(kw['id'])
|
|
|
|
if not instance.access(mode):
|
|
flask.abort(404)
|
|
|
|
return instance
|
|
|
|
return wrapped(**kwargs)
|
|
|
|
@property
|
|
def menu(self):
|
|
|
|
if self.is_in_db and flask.g.user:
|
|
|
|
# TODO: make this work with .url() in a way that doesn't break trail detection in Menu
|
|
items = []
|
|
|
|
if self.__class__ in app.models_exposed.values():
|
|
|
|
# full view added by @expose
|
|
items.append({
|
|
'endpoint': f'{self._lowerclass}_full',
|
|
'params': { 'id' : self.id },
|
|
'label': 'View',
|
|
})
|
|
|
|
# admin routes automatically added by register_admin
|
|
items.extend([
|
|
{
|
|
'endpoint': f'admin.{self._lowerclass}_edit',
|
|
'params': { 'id' : self.id },
|
|
'label': 'Edit',
|
|
},
|
|
{
|
|
'endpoint': f'admin.{self._lowerclass}_delete',
|
|
'params': { 'id' : self.id },
|
|
'label': 'Delete',
|
|
}
|
|
])
|
|
|
|
return rendering.Menu(
|
|
f'{self._lowerclass}-{self.id}-tabs',
|
|
items,
|
|
extra_classes=[f'menu-{self._lowerclass}', 'tabs']
|
|
)
|
|
|
|
@property
|
|
def is_in_db(self):
|
|
return bool(self.id)
|
|
|
|
def access(self, mode='full'):
|
|
|
|
if isinstance(flask.g.user, User):
|
|
return True
|
|
|
|
# access allowed for anything but a list of "privileged" modes
|
|
return mode not in ('create', 'edit', 'delete', 'admin-teaser')
|
|
|
|
def url(self, mode='full'):
|
|
|
|
if mode == 'full':
|
|
|
|
if self.__class__ not in app.models_exposed.values():
|
|
raise TypeError("Use of .url(mode='full') is only possible on @exposed classes.")
|
|
|
|
return flask.url_for(f"{self._lowerclass}_full", id=self.id)
|
|
|
|
elif mode == 'edit':
|
|
|
|
return flask.url_for(f"admin.{self._lowerclass}_edit", id=self.id)
|
|
|
|
elif mode == 'create':
|
|
|
|
return flask.url_for(f"admin.{self._lowerclass}_create")
|
|
|
|
elif mode == 'teaser':
|
|
|
|
return flask.url_for(f"{cls._lowerclass}_listing") # TODO: pagination
|
|
|
|
def form(self, mode, **kwargs):
|
|
|
|
if 'id' not in kwargs:
|
|
kwargs['id'] = f'{self._lowerclass}-{mode}'
|
|
|
|
if 'title' not in kwargs:
|
|
kwargs['title'] = f"{mode.capitalize()} {self.__class__.__name__}"
|
|
|
|
return self.autoform_class(self, mode, **kwargs)
|
|
|
|
def form_field(self, field_name):
|
|
|
|
db_field = getattr(self.__class__, field_name)
|
|
|
|
field_label = db_field.verbose_name or field_name
|
|
field_help = db_field.help_text
|
|
field_value = getattr(self, field_name)
|
|
field_required = not db_field.null
|
|
|
|
if isinstance(db_field, peewee.ForeignKeyField):
|
|
return form.ForeignKeySelect(db_field, label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.IntegerField):
|
|
return form.Integer(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.FloatField):
|
|
return form.Float(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.TextField):
|
|
return form.TextArea(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.CharField):
|
|
return form.Text(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.BooleanField):
|
|
# required for checkbox means it MUST be checked (forbidding unchecked, i.e. False)
|
|
return form.Checkbox(label=field_label, value=field_value, help=field_help, required=False)
|
|
|
|
if isinstance(db_field, peewee.DateField):
|
|
return form.DateField(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
if isinstance(db_field, peewee.DateTimeField):
|
|
return form.DateTime(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
# fallback
|
|
app.logger.warning(f"AutoForm using fallback field construction for {self.__class__.__name__}.{field_name}")
|
|
return form.Text(label=field_label, value=field_value, help=field_help, required=field_required)
|
|
|
|
def form_field_handle(self, field, mode):
|
|
pass
|
|
|
|
class Notification(database.RenderableModel):
|
|
|
|
to = peewee.ForeignKeyField(User, backref='notifications')
|
|
created = peewee.DateTimeField(default=datetime.datetime.utcnow, null=False)
|
|
read = peewee.BooleanField(default=False, null=False)
|
|
message = markdown.MarkdownTextField()
|
|
|
|
@app.abstract
|
|
class Named(Administerable):
|
|
|
|
name = peewee.CharField(unique=True, null=False, verbose_name='Name', constraints=[peewee.Check('"name" ~ \'^[a-z0-9_\-]+$\'')])
|
|
|
|
@classmethod
|
|
def load(cls, name):
|
|
return cls.select().where(cls.name == name).get()
|
|
|
|
@classmethod
|
|
def view(cls, mode='full', **kwargs):
|
|
|
|
if mode in ('create', 'edit', 'delete'):
|
|
|
|
@rendering.page(mode=mode)
|
|
def wrapped(**kw):
|
|
|
|
if mode == 'create':
|
|
instance = cls()
|
|
else:
|
|
instance = cls.load(kw['name'])
|
|
|
|
if not instance.access(mode):
|
|
flask.abort(404)
|
|
|
|
form = instance.form(mode)
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
redirect = form.handle()
|
|
|
|
for error in form.errors:
|
|
flask.flash(str(error), 'error')
|
|
|
|
if isinstance(redirect, werkzeug.Response):
|
|
return redirect
|
|
|
|
return form
|
|
|
|
else:
|
|
|
|
@rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility
|
|
def wrapped(**kw):
|
|
|
|
instance = cls.load(kw['name'])
|
|
|
|
if not instance.access(mode):
|
|
flask.abort(404)
|
|
|
|
return instance
|
|
|
|
return wrapped(**kwargs)
|
|
|
|
@property
|
|
def menu(self):
|
|
|
|
if self.is_in_db and flask.g.user:
|
|
|
|
# TODO: make this work with .url() in a way that doesn't break trail detection in Menu
|
|
items = []
|
|
|
|
if self.__class__ in app.models_exposed.values():
|
|
|
|
# full view added by @expose
|
|
items.append({
|
|
'endpoint': f'{self._lowerclass}_full',
|
|
'params': { 'name' : self.name },
|
|
'label': 'View',
|
|
})
|
|
|
|
# admin routes automatically added by register_admin
|
|
items.extend([
|
|
{
|
|
'endpoint': f'admin.{self._lowerclass}_edit',
|
|
'params': { 'name' : self.name },
|
|
'label': 'Edit',
|
|
},
|
|
{
|
|
'endpoint': f'admin.{self._lowerclass}_delete',
|
|
'params': { 'name' : self.name },
|
|
'label': 'Delete',
|
|
}
|
|
])
|
|
|
|
return rendering.Menu(
|
|
f'{self._lowerclass}-{self.id}-tabs',
|
|
items,
|
|
extra_classes=[f'menu-{self._lowerclass}', 'tabs']
|
|
)
|
|
|
|
def url(self, mode='full'):
|
|
|
|
if mode in ('teaser', 'full') and self.__class__ not in app.models_exposed.values():
|
|
raise TypeError("Trying to get public URL for non-exposed model {self.__class__.__name__}.")
|
|
|
|
if mode == 'full':
|
|
return flask.url_for(f"{self._lowerclass}_full", name=self.name)
|
|
elif mode == 'edit':
|
|
return flask.url_for(f"admin.{self._lowerclass}_edit", name=self.name)
|
|
elif mode == 'create':
|
|
return flask.url_for(f"admin.{self._lowerclass}_create")
|
|
elif mode == 'teaser':
|
|
return flask.url_for(f"{self._lowerclass}_listing") # TODO: pagination
|
|
|
|
def form_field(self, field_name):
|
|
|
|
f = super().form_field(field_name)
|
|
|
|
if field_name == 'name':
|
|
f.validators.append(
|
|
functools.partial(form.validate_regexp, r'^[a-z0-9_\-]+$')
|
|
)
|
|
|
|
return f
|
|
|
|
def register_administerable(cls):
|
|
|
|
# not in register_admin below so view functions are in a new scope
|
|
# for each model (otherwise the last view function definitions would
|
|
# get registered to all routes).
|
|
|
|
@auth
|
|
@rendering.page(title_show=False)
|
|
def view_listing():
|
|
|
|
menu = rendering.Menu(
|
|
f'{cls._lowerclass}-listing-actions',
|
|
[{
|
|
'endpoint': f'admin.{cls._lowerclass}_create',
|
|
'label': 'Create new',
|
|
}]
|
|
)
|
|
|
|
return rendering.Listing(cls.select(), title=cls.__name__, mode='admin-teaser', menu=menu)
|
|
|
|
@auth
|
|
def view_create(*args, **kwargs):
|
|
|
|
kwargs['mode'] = 'create'
|
|
|
|
return cls.view(*args, **kwargs)
|
|
|
|
@auth
|
|
def view_edit(*args, **kwargs):
|
|
|
|
kwargs['mode'] = 'edit'
|
|
|
|
return cls.view(*args, **kwargs)
|
|
|
|
@auth
|
|
def view_delete(*args, **kwargs):
|
|
|
|
kwargs['mode'] = 'delete'
|
|
|
|
return cls.view(*args, **kwargs)
|
|
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/', f'{cls._lowerclass}_listing', view_listing)
|
|
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/create/', f'{cls._lowerclass}_create', view_create, methods=['GET', 'POST'])
|
|
if issubclass(cls, Named):
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/<string:name>/', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST'])
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/<string:name>/delete/', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST'])
|
|
else:
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/<int:id>/', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST'])
|
|
app.admin.add_url_rule(f'/{cls._lowerclass}/<int:id>/delete', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST'])
|
|
|
|
class Dashboard(rendering.Renderable):
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
self.commentables = {}
|
|
|
|
Commentable = Administerable.__class_descendants__['Commentable'] # avoids circular dependency
|
|
Upload = Administerable.__class_descendants__['Upload']
|
|
|
|
preview_classes = list(Commentable.__class_descendants__.values())
|
|
preview_classes.extend(Upload.__class_descendants__.values())
|
|
preview_classes.append(Administerable.__class_descendants__['Gallery'])
|
|
|
|
for cls in preview_classes:
|
|
|
|
if cls not in app.models_abstract:
|
|
|
|
info = {}
|
|
info['menu'] = rendering.Menu(
|
|
f'dashboard-preview-actions-{cls._lowerclass}',
|
|
[
|
|
{
|
|
'endpoint': f'admin.{cls._lowerclass}_listing',
|
|
'label': cls.__name__,
|
|
},
|
|
{
|
|
'endpoint': f'admin.{cls._lowerclass}_create',
|
|
'label': 'Create new',
|
|
},
|
|
]
|
|
)
|
|
|
|
if hasattr(cls, 'created'):
|
|
info['listing'] = rendering.Listing(cls.select().order_by(cls.created.desc()).limit(3))
|
|
else:
|
|
info['listing'] = rendering.Listing(cls.select().order_by(cls.id.desc()).limit(3))
|
|
|
|
self.commentables[cls._lowerclass] = info
|
|
|
|
self.menu_others = rendering.Menu('dashboard-others')
|
|
for cls in Administerable.__class_descendants__.values():
|
|
|
|
if cls not in app.models_abstract and cls not in preview_classes:
|
|
self.menu_others.append(
|
|
endpoint=f'admin.{cls._lowerclass}_listing',
|
|
label=cls.__name__
|
|
)
|
|
|
|
@app.admin.route('/')
|
|
@auth
|
|
@rendering.page()
|
|
def admin_home():
|
|
|
|
return Dashboard()
|
|
|
|
@app.admin.route('/login/', methods=['GET', 'POST'])
|
|
@rendering.page()
|
|
def admin_login():
|
|
|
|
if not flask.g.client_cert_verified:
|
|
|
|
flask.abort(403)
|
|
|
|
elif not flask.g.client_cert_fingerprint_matched:
|
|
|
|
flask.abort(400)
|
|
|
|
else:
|
|
|
|
f = form.Form(title='Login')
|
|
|
|
#f['name'] = form.Text(label='Name', required=True)
|
|
f['password'] = form.Password(label='Password', required=True)
|
|
f.buttons['login'] = form.Button(name='login', label='Log in')
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
f.handle() # binds and validates field values
|
|
|
|
#name = f['name'].value
|
|
name = flask.g.client_cert_info.user.name
|
|
password = f['password'].value
|
|
|
|
user = User.login(name, password)
|
|
|
|
if user:
|
|
|
|
flask.session['uid'] = user.id
|
|
flask.flash(f"Welcome, {user.name}.", 'success')
|
|
return flask.redirect(flask.url_for('admin.admin_home'))
|
|
|
|
else:
|
|
|
|
flask.flash("Invalid login.", 'error')
|
|
|
|
return f
|
|
|
|
@app.admin.route('/logout/')
|
|
@auth
|
|
def admin_logout():
|
|
|
|
del flask.session['uid']
|
|
return flask.redirect(flask.url_for('frontpage'))
|
|
|
|
class NotificationForm(form.Form):
|
|
|
|
def __init__(self, filter, **kwargs):
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
if filter == 'unread':
|
|
notifications = flask.g.user.notifications_unread
|
|
elif filter == 'read':
|
|
notifications = flask.g.user.notifications.where(Notification.read == True)
|
|
elif filter == 'all':
|
|
notifications = flask.g.user.notifications
|
|
else:
|
|
raise ValueError("Invalid filter")
|
|
|
|
self.filters = rendering.Menu(
|
|
'notification-actions',
|
|
items=(
|
|
{
|
|
'endpoint': 'admin.admin_notifications',
|
|
'params': {'filter': 'unread'},
|
|
'label': 'Unread',
|
|
},
|
|
{
|
|
'endpoint': 'admin.admin_notifications',
|
|
'params': {'filter': 'read'},
|
|
'label': 'Read',
|
|
},
|
|
{
|
|
'endpoint': 'admin.admin_notifications',
|
|
'params': {'filter': 'all'},
|
|
'label': 'All',
|
|
},
|
|
)
|
|
)
|
|
|
|
self.notifications = form.MultiGroup(form.types.INT, 'notifications', required=True)
|
|
|
|
for notification in notifications:
|
|
|
|
#fs = form.Fieldset(name=f'notification-{notification.id}')
|
|
fs = form.Fieldset()
|
|
fs[f'notification-preview-{notification.id}'] = form.RenderableField(notification, mode='inline')
|
|
fs[f'notification-{notification.id}'] = form.Checkbox(group=self.notifications, value=notification.id)
|
|
|
|
self[f'notification-{notification.id}'] = fs
|
|
|
|
self.buttons['mark'] = form.Button(label='Mark as read')
|
|
self.buttons['delete'] = form.Button(label='Delete')
|
|
|
|
def process(self, submit):
|
|
|
|
if submit == 'mark':
|
|
count = Notification.update(
|
|
read=True
|
|
).where(Notification.id << self.notifications.values).execute()
|
|
flask.flash(f"Marked {count} notifications as read.", 'success')
|
|
|
|
elif submit == 'delete':
|
|
count = Notification.delete().where(
|
|
Notification.id << self.notifications.values
|
|
).execute()
|
|
flask.flash(f"Deleted {count} notifications.", 'success')
|
|
|
|
return flask.redirect('')
|
|
|
|
@app.admin.route('/notifications/', methods=['GET', 'POST'])
|
|
@app.admin.route('/notifications/<string:filter>/', methods=['GET', 'POST'])
|
|
@auth
|
|
@rendering.page()
|
|
def admin_notifications(filter='unread'):
|
|
|
|
|
|
f = NotificationForm(filter, title='Notifications')
|
|
|
|
if flask.request.method == 'POST':
|
|
|
|
redirect = f.handle()
|
|
|
|
if redirect:
|
|
return redirect
|
|
|
|
return f
|
|
|
|
@app.admin.menu('main')
|
|
def admin_menu_main():
|
|
|
|
items = [
|
|
{
|
|
'endpoint': 'frontpage',
|
|
'label': 'Home',
|
|
},
|
|
]
|
|
|
|
if isinstance(flask.g.user, User): # empty menu without login
|
|
|
|
items.append({
|
|
'endpoint': 'admin.admin_home',
|
|
'label': 'Admin',
|
|
})
|
|
|
|
notification_count = len(flask.g.user.notifications_unread)
|
|
if notification_count == 0:
|
|
notification_label = 'No unread notifications'
|
|
elif notification_count == 1:
|
|
notification_label = 'One unread notification'
|
|
else:
|
|
notification_label = f"{notification_count} unread notifications"
|
|
|
|
items.append({
|
|
'endpoint': 'admin.admin_notifications',
|
|
'label': notification_label,
|
|
})
|
|
|
|
items.append({
|
|
'endpoint': 'admin.comment_moderation',
|
|
'label': 'Comment moderation',
|
|
})
|
|
|
|
return rendering.Menu('main', items, burger=True)
|
|
|
|
@app.admin.menu('secondary')
|
|
def admin_menu_secondary():
|
|
|
|
items = []
|
|
|
|
if isinstance(flask.g.user, User): # logged in
|
|
|
|
items.append({
|
|
'endpoint': 'admin.admin_logout',
|
|
'label': 'Log out',
|
|
})
|
|
|
|
else:
|
|
|
|
items.append({
|
|
'endpoint': 'admin.admin_login',
|
|
'label': 'Log in',
|
|
})
|
|
|
|
return rendering.Menu('secondary', items)
|
|
|
|
@app.boot
|
|
def register_admin():
|
|
|
|
for cls in Administerable.__class_descendants__.values():
|
|
|
|
if not cls in app.models_abstract:
|
|
register_administerable(cls)
|
|
|
|
app.register_blueprint(app.admin)
|