pdnew/admin.py

951 lines
30 KiB
Python
Raw Permalink Normal View History

2024-04-12 19:22:00 +00:00
# builtins
import time
import datetime
2024-04-12 19:22:00 +00:00
import functools
import hashlib
import secrets
2024-05-20 02:10:17 +00:00
import uuid
2024-04-12 19:22:00 +00:00
# third-party
import werkzeug, flask
2024-04-12 19:22:00 +00:00
import peewee, playhouse.postgres_ext
2024-05-20 02:10:17 +00:00
import OpenSSL as openssl
2024-04-12 19:22:00 +00:00
# internals
from application import app
import util
import rendering
import form
import database
import markdown
2024-04-12 19:22:00 +00:00
# static salt for fake hash comparison; this is part of timing sidechannel
# mitigation. generated once at boot so it doesn't take extra time to generate
# when faking a login check for a non-existing user.
__fake_salt__ = secrets.token_hex(64)
@app.before_request
def request_setup():
2024-05-20 02:10:17 +00:00
client_cert_verified = flask.request.environ.get('SSL_CLIENT_VERIFY')
client_cert_raw = flask.request.environ.get('SSL_CLIENT_CERT')
client_cert = None
client_cert_info = None
flask.g.user = None
flask.g.tls_cipher = flask.request.environ.get('SSL_CIPHER')
flask.g.client_cert_verified = False
flask.g.client_cert_fingerprint = None # SHA2-512 (without ':' separators)
2024-05-20 02:10:17 +00:00
flask.g.client_cert_fingerprint_matched = False
flask.g.client_cert_info = None # ClientCert object
2024-05-20 02:10:17 +00:00
if client_cert_verified == 'SUCCESS':
flask.g.client_cert_verified = True
if not client_cert_raw:
app.logger.error("Successful upstream client cert authentication, but not certificate given (fix nginx.conf!)")
else:
try:
client_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, client_cert_raw)
except Exception as e:
if app.debug:
raise
app.logger.error(f"Error while loading client certificate: {str(e)}")
else:
fingerprint = client_cert.digest('sha512').replace(b':', b'')
flask.g.client_cert_fingerprint = fingerprint.decode('ascii')
try:
client_cert_info = ClientCert.get(ClientCert.fingerprint == fingerprint)
flask.g.client_cert_info = client_cert_info
2024-05-20 02:10:17 +00:00
flask.g.client_cert_fingerprint_matched = True
except ClientCert.DoesNotExist as e:
app.logger.error(f"Can't find certificate info for fingerprint '{fingerprint}' in database. Dropped cert or user? Error: {str(e)}")
except Exception as e:
if app.debug:
raise
app.logger.error(f"Generic exception when trying to load client certificate info from database: {str(e)}")
else:
if 'uid' in flask.session:
try:
if client_cert_info.user.id == flask.session['uid']:
flask.g.user = client_cert_info.user
except User.DoesNotExist:
flask.flash("You did done got deleted.", 'error')
del flask.session['uid']
except Exception:
# silently fail all other exception types. this means that only
# consequent database failures will trigger an error page, but
# routes that don't use the database at all, will still work.
# this is needed so things like theme asset delivery work when
# there's a database problem.
del flask.session['uid']
def auth(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if isinstance(flask.g.user, User):
return f(*args, **kwargs)
flask.abort(403) # raises HTTPException
return wrapper
class User(database.RenderableModel):
name = peewee.CharField(null=False, unique=True, verbose_name='Name')
password = peewee.CharField(null=False, verbose_name='Password')
salt = peewee.CharField(null=False)
@classmethod
2024-05-20 02:10:17 +00:00
def load(cls, name):
return cls.select().where(cls.name == name).get()
@classmethod
def load_by_id(cls, id):
return cls.select().where(cls.id == id).get()
@classmethod
def __gen_salt__(cls):
return secrets.token_hex(64) # 64 *byte* (512 bit), so string length of 128 as hex
@classmethod
def __gen_hash__(cls, salt, password):
iterations = 100000 # ~50ms at ~5GHz
hash = hashlib.sha3_512(bytes(app.config['PEPPER'] + salt + password, 'UTF-8'))
for i in range(1, iterations):
hash = hashlib.sha3_512(hash.digest())
return hash.hexdigest()
@classmethod
def login(cls, name, password):
r = False
time_rand = secrets.randbelow(100) # random delay to mitigate timing sidechannel
try:
user = cls.select().where(cls.name == name).get()
if user.password == cls.__gen_hash__(user.salt, password):
r = user
except cls.DoesNotExist:
# TODO: benchmark this to see whether we need an extra delay
cls.__gen_hash__(__fake_salt__, password)
time.sleep(time_rand / 1000)
return r
@classmethod
def register(cls, name, password):
user = cls()
user.name = name
user.salt = cls.__gen_salt__()
user.password = cls.__gen_hash__(user.salt, password)
user.save(force_insert=True)
return user
2024-05-20 02:10:17 +00:00
@property
def notifications_unread(self):
return self.notifications.where(Notification.read == False)
def notify(self, message):
n = Notification(to=self, message=message)
return n.save(force_insert=True)
2024-05-20 21:33:18 +00:00
def update_password(self, password):
self.salt = self.__gen_salt__() # new random salt
self.password = self.__gen_hash__(self.salt, password)
return self.save()
2024-05-20 02:10:17 +00:00
def gen_keypair_and_clientcert(self, certname, not_after):
invalid_after = datetime.datetime.utcnow() + app.config['CLIENTCERT_MAX_LIFETIME']
if not_after > invalid_after:
raise exceptions.ExposedException(f"not_after too far into the future maximum allowed is {str(invalid_after)} but got {str(not_after)}.")
common_name = f'{self.name}:{certname}@{app.config["SITE_NAME"]}'
fd = open('ca/key.pem', 'rb')
ca_key = openssl.crypto.load_privatekey(openssl.crypto.FILETYPE_PEM, fd.read())
fd.close()
del fd
fd = open('ca/cert.pem', 'rb')
ca_cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, fd.read())
fd.close()
del fd
keypair = openssl.crypto.PKey()
keypair.generate_key(openssl.crypto.TYPE_RSA, app.config['CLIENTCERT_KEYLENGTH'])
extensions = [
openssl.crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment, keyAgreement'),
openssl.crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'),
]
cert = openssl.crypto.X509()
cert.set_version(2) # actually means 3, openssl bad
cert.add_extensions(extensions)
cert.set_issuer(ca_cert.get_subject())
cert.set_pubkey(keypair)
cert.gmtime_adj_notBefore(0) # now
cert.set_notAfter(not_after.strftime('%Y%m%d%H%M%SZ').encode('utf-8'))
cert.set_serial_number(uuid.uuid4().int) # random uuid as int
cert.get_subject().CN = common_name.encode('utf-8')
cert.sign(ca_key, 'sha512')
pkcs12 = openssl.crypto.PKCS12()
pkcs12.set_ca_certificates([ca_cert])
pkcs12.set_privatekey(keypair)
pkcs12.set_certificate(cert)
pkcs12.set_friendlyname(certname.encode('utf-8'))
return pkcs12
class ClientCert(database.Model):
class Meta:
indexes = (
(('user_id', 'name'), True), # unique constraint for (user, name)
)
2024-05-20 21:33:18 +00:00
user = peewee.ForeignKeyField(User, backref='clientcerts', on_delete='CASCADE')
2024-05-20 02:10:17 +00:00
name = peewee.CharField(null=False)
fingerprint = peewee.CharField(null=False)
2024-04-12 19:22:00 +00:00
class AutoForm(form.Form):
2024-04-12 19:22:00 +00:00
def __init__(self, administerable, mode, *args, **kwargs):
super().__init__(*args, **kwargs)
self.administerable = administerable
self.menu = administerable.menu
2024-04-12 19:22:00 +00:00
self.mode = mode
if mode == 'delete':
2024-04-12 19:22:00 +00:00
self.intro = f"You are about to delete this {self.administerable.__class__.__name__}, this cannot be undone. Proceed?"
self.buttons[mode] = form.Button(label='Yes, delete')
2024-04-12 19:22:00 +00:00
else:
for field_name in administerable.__class__._meta.columns.keys():
if field_name not in administerable.__class__.autoform_blacklist:
self[field_name] = administerable.form_field(field_name)
button_label = f'Create {administerable._lowerclass}' if mode == 'create' else f'Save {administerable._lowerclass}'
self.buttons[mode] = form.Button(label=button_label)
2024-04-12 19:22:00 +00:00
if isinstance(administerable, Named):
self.fields.position('name', 0)
def process(self, submit):
2024-04-12 19:22:00 +00:00
if self.valid:
2024-04-12 19:22:00 +00:00
processable_fields = [field for field in self.fields.values() if not field.name in self.administerable.__class__.autoform_blacklist]
for field in processable_fields:
if not isinstance(field, form.Fieldset) and not isinstance(field.value, werkzeug.datastructures.FileStorage): # FIXME: hacky way to preserve old filename value for Upload
setattr(self.administerable, field.name, field.value)
for field in processable_fields:
# call extra field handling (usually a no-op)
# doing this in an extra loop so form_field_handle
# can depend on properties in administerable being filled
self.administerable.form_field_handle(field, self.mode)
2024-04-12 19:22:00 +00:00
try:
if self.mode == 'create':
self.administerable.save(force_insert=True)
elif self.mode == 'edit':
self.administerable.save()
elif self.mode == 'delete':
self.administerable.delete_instance()
except peewee.IntegrityError as e:
2024-04-12 19:22:00 +00:00
diag = e.orig.diag # psycopg2 diagnostic object
2024-04-12 19:22:00 +00:00
if diag.source_function == '_bt_check_unique': # we're dealing with a unique constraint
2024-04-12 19:22:00 +00:00
# assuming <tablename>_<fieldname> for constraint_name
parts = diag.constraint_name.split('_', 1)
if len(parts) == 2: # ignore constraint names without underscores
tablename, fieldname = parts
if tablename == self.administerable._lowerclass and fieldname in self.fields:
field = self[fieldname]
error = form.ValidationError(f"{field.label} must be unique.")
field.errors.append(error)
self.errors.append(error)
flask.flash("A problem occured when saving.", 'error')
app.logger.warning(f"IntegrityError when saving object from AutoForm: {str(e)}")
else:
if self.mode == 'create':
flask.flash(f"Successfully created new {self.administerable._lowerclass}.", 'success')
return flask.redirect(self.administerable.url('edit'))
elif self.mode == 'edit':
flask.flash(f"Successfully saved {self.administerable._lowerclass}.", 'success')
return flask.redirect(self.administerable.url('edit')) # makes sure we get to the right url even if id/name changed
elif self.mode == 'delete':
flask.flash(f"Successfully deleted {self.administerable._lowerclass}.", 'success')
return flask.redirect(flask.url_for(f'admin.{self.administerable._lowerclass}_listing')) # TODO: make this cleaner, implement cls.url_listing?
@app.abstract
2024-04-12 19:22:00 +00:00
class Administerable(database.RenderableModel):
id = playhouse.postgres_ext.IdentityField(generate_always=True, verbose_name='ID')
2024-04-12 19:22:00 +00:00
autoform_blacklist = ('id',)
autoform_class = AutoForm
2024-04-12 19:22:00 +00:00
@classmethod
def load(cls, id):
return cls.select().where(cls.id == id).get()
@classmethod
def list(cls):
2024-05-21 05:31:13 +00:00
return cls.select().order_by(cls.id.desc())
2024-04-12 19:22:00 +00:00
@classmethod
def view(cls, mode='full', **kwargs):
if mode in ('create', 'edit', 'delete'):
2024-04-12 19:22:00 +00:00
@rendering.page(mode=mode)
def wrapped(**kw):
if mode == 'create':
instance = cls()
else:
instance = cls.load(kw['id'])
if not instance.access(mode):
flask.abort(404)
form = instance.form(mode)
2024-04-12 19:22:00 +00:00
if flask.request.method == 'POST':
redirect = form.handle()
if isinstance(redirect, werkzeug.Response):
return redirect
2024-04-12 19:22:00 +00:00
return form
else:
@rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility
2024-04-12 19:22:00 +00:00
def wrapped(**kw):
instance = cls.load(kw['id'])
if not instance.access(mode):
flask.abort(404)
return instance
2024-04-12 19:22:00 +00:00
return wrapped(**kwargs)
@property
def menu(self):
if self.is_in_db and flask.g.user:
# TODO: make this work with .url() in a way that doesn't break trail detection in Menu
items = []
if self.__class__ in app.models_exposed.values():
# full view added by @expose
items.append({
'endpoint': f'{self._lowerclass}_full',
'params': { 'id' : self.id },
'label': 'View',
})
# admin routes automatically added by register_admin
items.extend([
{
'endpoint': f'admin.{self._lowerclass}_edit',
'params': { 'id' : self.id },
'label': 'Edit',
},
{
'endpoint': f'admin.{self._lowerclass}_delete',
'params': { 'id' : self.id },
'label': 'Delete',
}
])
return rendering.Menu(
f'{self._lowerclass}-{self.id}-tabs',
items,
extra_classes=[f'menu-{self._lowerclass}', 'tabs']
)
@property
def is_in_db(self):
return bool(self.id)
def access(self, mode='full'):
if isinstance(flask.g.user, User):
return True
# access allowed for anything but a list of "privileged" modes
return mode not in ('create', 'edit', 'delete', 'admin-teaser')
def url(self, mode='full'):
if mode == 'full':
if self.__class__ not in app.models_exposed.values():
raise TypeError("Use of .url(mode='full') is only possible on @exposed classes.")
return flask.url_for(f"{self._lowerclass}_full", id=self.id)
elif mode == 'edit':
return flask.url_for(f"admin.{self._lowerclass}_edit", id=self.id)
elif mode == 'create':
return flask.url_for(f"admin.{self._lowerclass}_create")
elif mode == 'teaser':
return flask.url_for(f"{cls._lowerclass}_listing") # TODO: pagination
def form(self, mode, **kwargs):
if 'id' not in kwargs:
kwargs['id'] = f'{self._lowerclass}-{mode}'
if 'title' not in kwargs:
kwargs['title'] = f"{mode.capitalize()} {self.__class__.__name__}"
return self.autoform_class(self, mode, **kwargs)
def form_field(self, field_name):
db_field = getattr(self.__class__, field_name)
field_label = db_field.verbose_name or field_name
field_help = db_field.help_text
field_value = getattr(self, field_name)
field_required = not db_field.null
if isinstance(db_field, peewee.ForeignKeyField):
return form.ForeignKeySelect(db_field, label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.IntegerField):
return form.Integer(label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.FloatField):
return form.Float(label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.TextField):
return form.TextArea(label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.CharField):
return form.Text(label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.BooleanField):
# required for checkbox means it MUST be checked (forbidding unchecked, i.e. False)
return form.Checkbox(label=field_label, value=field_value, help=field_help, required=False)
if isinstance(db_field, peewee.DateField):
return form.DateField(label=field_label, value=field_value, help=field_help, required=field_required)
if isinstance(db_field, peewee.DateTimeField):
return form.DateTime(label=field_label, value=field_value, help=field_help, required=field_required)
# fallback
app.logger.warning(f"AutoForm using fallback field construction for {self.__class__.__name__}.{field_name}")
return form.Text(label=field_label, value=field_value, help=field_help, required=field_required)
def form_field_handle(self, field, mode):
pass
class Notification(database.RenderableModel):
to = peewee.ForeignKeyField(User, backref='notifications')
created = peewee.DateTimeField(default=datetime.datetime.utcnow, null=False)
read = peewee.BooleanField(default=False, null=False)
message = markdown.MarkdownTextField()
@app.abstract
class Named(Administerable):
name = peewee.CharField(unique=True, null=False, verbose_name='Name', constraints=[peewee.Check('"name" ~ \'^[a-z0-9_\-]+$\'')])
@classmethod
def load(cls, name):
return cls.select().where(cls.name == name).get()
@classmethod
def view(cls, mode='full', **kwargs):
if mode in ('create', 'edit', 'delete'):
@rendering.page(mode=mode)
def wrapped(**kw):
if mode == 'create':
instance = cls()
else:
instance = cls.load(kw['name'])
if not instance.access(mode):
flask.abort(404)
form = instance.form(mode)
if flask.request.method == 'POST':
redirect = form.handle()
for error in form.errors:
flask.flash(str(error), 'error')
if isinstance(redirect, werkzeug.Response):
return redirect
return form
else:
@rendering.page(mode=mode, title_show=False) # show title in own template, gives better flexibility
def wrapped(**kw):
instance = cls.load(kw['name'])
if not instance.access(mode):
flask.abort(404)
return instance
return wrapped(**kwargs)
@property
def menu(self):
if self.is_in_db and flask.g.user:
# TODO: make this work with .url() in a way that doesn't break trail detection in Menu
items = []
if self.__class__ in app.models_exposed.values():
# full view added by @expose
items.append({
'endpoint': f'{self._lowerclass}_full',
'params': { 'name' : self.name },
'label': 'View',
})
# admin routes automatically added by register_admin
items.extend([
{
'endpoint': f'admin.{self._lowerclass}_edit',
'params': { 'name' : self.name },
'label': 'Edit',
},
{
'endpoint': f'admin.{self._lowerclass}_delete',
'params': { 'name' : self.name },
'label': 'Delete',
}
])
return rendering.Menu(
f'{self._lowerclass}-{self.id}-tabs',
items,
extra_classes=[f'menu-{self._lowerclass}', 'tabs']
)
def url(self, mode='full'):
if mode in ('teaser', 'full') and self.__class__ not in app.models_exposed.values():
raise TypeError("Trying to get public URL for non-exposed model {self.__class__.__name__}.")
if mode == 'full':
return flask.url_for(f"{self._lowerclass}_full", name=self.name)
elif mode == 'edit':
return flask.url_for(f"admin.{self._lowerclass}_edit", name=self.name)
elif mode == 'create':
return flask.url_for(f"admin.{self._lowerclass}_create")
elif mode == 'teaser':
return flask.url_for(f"{self._lowerclass}_listing") # TODO: pagination
def form_field(self, field_name):
f = super().form_field(field_name)
if field_name == 'name':
f.validators.append(
functools.partial(form.validate_regexp, r'^[a-z0-9_\-]+$')
)
return f
def register_administerable(cls):
# not in register_admin below so view functions are in a new scope
# for each model (otherwise the last view function definitions would
# get registered to all routes).
@auth
@rendering.page(title_show=False)
def view_listing():
menu = rendering.Menu(
f'{cls._lowerclass}-listing-actions',
[{
'endpoint': f'admin.{cls._lowerclass}_create',
'label': 'Create new',
}]
)
return rendering.Listing(cls.select(), title=cls.__name__, mode='admin-teaser', menu=menu)
@auth
def view_create(*args, **kwargs):
kwargs['mode'] = 'create'
return cls.view(*args, **kwargs)
@auth
def view_edit(*args, **kwargs):
kwargs['mode'] = 'edit'
return cls.view(*args, **kwargs)
@auth
def view_delete(*args, **kwargs):
kwargs['mode'] = 'delete'
return cls.view(*args, **kwargs)
app.admin.add_url_rule(f'/{cls._lowerclass}/', f'{cls._lowerclass}_listing', view_listing)
app.admin.add_url_rule(f'/{cls._lowerclass}/create/', f'{cls._lowerclass}_create', view_create, methods=['GET', 'POST'])
if issubclass(cls, Named):
app.admin.add_url_rule(f'/{cls._lowerclass}/<string:name>/', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST'])
app.admin.add_url_rule(f'/{cls._lowerclass}/<string:name>/delete/', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST'])
else:
app.admin.add_url_rule(f'/{cls._lowerclass}/<int:id>/', f'{cls._lowerclass}_edit', view_edit, methods=['GET', 'POST'])
app.admin.add_url_rule(f'/{cls._lowerclass}/<int:id>/delete', f'{cls._lowerclass}_delete', view_delete, methods=['GET', 'POST'])
2024-05-21 05:31:13 +00:00
class Dashboard(rendering.Renderable):
def __init__(self, **kwargs):
super().__init__(**kwargs)
2024-05-21 05:31:13 +00:00
self.commentables = {}
Commentable = Administerable.__class_descendants__['Commentable'] # avoids circular dependency
Upload = Administerable.__class_descendants__['Upload']
2024-05-21 05:31:13 +00:00
preview_classes = list(Commentable.__class_descendants__.values())
preview_classes.extend(Upload.__class_descendants__.values())
preview_classes.append(Administerable.__class_descendants__['Gallery'])
for cls in preview_classes:
2024-05-21 05:31:13 +00:00
if cls not in app.models_abstract:
2024-05-21 05:31:13 +00:00
info = {}
info['menu'] = rendering.Menu(
f'dashboard-preview-actions-{cls._lowerclass}',
[
{
'endpoint': f'admin.{cls._lowerclass}_listing',
'label': cls.__name__,
},
{
'endpoint': f'admin.{cls._lowerclass}_create',
'label': 'Create new',
},
]
)
if hasattr(cls, 'created'):
info['listing'] = rendering.Listing(cls.select().order_by(cls.created.desc()).limit(3))
else:
info['listing'] = rendering.Listing(cls.select().order_by(cls.id.desc()).limit(3))
2024-05-21 05:31:13 +00:00
self.commentables[cls._lowerclass] = info
self.menu_others = rendering.Menu('dashboard-others')
2024-05-21 05:31:13 +00:00
for cls in Administerable.__class_descendants__.values():
if cls not in app.models_abstract and cls not in preview_classes:
2024-05-21 05:31:13 +00:00
self.menu_others.append(
endpoint=f'admin.{cls._lowerclass}_listing',
label=cls.__name__
)
@app.admin.route('/')
@auth
@rendering.page()
def admin_home():
2024-05-21 05:31:13 +00:00
return Dashboard()
@app.admin.route('/login/', methods=['GET', 'POST'])
@rendering.page()
def admin_login():
if not flask.g.client_cert_verified:
flask.abort(403)
elif not flask.g.client_cert_fingerprint_matched:
flask.abort(400)
else:
f = form.Form(title='Login')
#f['name'] = form.Text(label='Name', required=True)
f['password'] = form.Password(label='Password', required=True)
f.buttons['login'] = form.Button(name='login', label='Log in')
if flask.request.method == 'POST':
f.handle() # binds and validates field values
#name = f['name'].value
name = flask.g.client_cert_info.user.name
password = f['password'].value
user = User.login(name, password)
if user:
flask.session['uid'] = user.id
flask.flash(f"Welcome, {user.name}.", 'success')
return flask.redirect(flask.url_for('admin.admin_home'))
else:
flask.flash("Invalid login.", 'error')
return f
@app.admin.route('/logout/')
@auth
def admin_logout():
del flask.session['uid']
return flask.redirect(flask.url_for('frontpage'))
class NotificationForm(form.Form):
def __init__(self, filter, **kwargs):
super().__init__(**kwargs)
if filter == 'unread':
notifications = flask.g.user.notifications_unread
elif filter == 'read':
notifications = flask.g.user.notifications.where(Notification.read == True)
elif filter == 'all':
notifications = flask.g.user.notifications
else:
raise ValueError("Invalid filter")
self.filters = rendering.Menu(
'notification-actions',
items=(
{
'endpoint': 'admin.admin_notifications',
'params': {'filter': 'unread'},
'label': 'Unread',
},
{
'endpoint': 'admin.admin_notifications',
'params': {'filter': 'read'},
'label': 'Read',
},
{
'endpoint': 'admin.admin_notifications',
'params': {'filter': 'all'},
'label': 'All',
},
)
)
self.notifications = form.MultiGroup(form.types.INT, 'notifications', required=True)
for notification in notifications:
#fs = form.Fieldset(name=f'notification-{notification.id}')
fs = form.Fieldset()
fs[f'notification-preview-{notification.id}'] = form.RenderableField(notification, mode='inline')
fs[f'notification-{notification.id}'] = form.Checkbox(group=self.notifications, value=notification.id)
self[f'notification-{notification.id}'] = fs
self.buttons['mark'] = form.Button(label='Mark as read')
self.buttons['delete'] = form.Button(label='Delete')
def process(self, submit):
if submit == 'mark':
count = Notification.update(
read=True
).where(Notification.id << self.notifications.values).execute()
flask.flash(f"Marked {count} notifications as read.", 'success')
elif submit == 'delete':
count = Notification.delete().where(
Notification.id << self.notifications.values
).execute()
flask.flash(f"Deleted {count} notifications.", 'success')
return flask.redirect('')
@app.admin.route('/notifications/', methods=['GET', 'POST'])
@app.admin.route('/notifications/<string:filter>/', methods=['GET', 'POST'])
@auth
@rendering.page()
def admin_notifications(filter='unread'):
f = NotificationForm(filter, title='Notifications')
if flask.request.method == 'POST':
redirect = f.handle()
if redirect:
return redirect
return f
@app.admin.menu('main')
def admin_menu_main():
items = [
{
'endpoint': 'frontpage',
'label': 'Home',
},
]
if isinstance(flask.g.user, User): # empty menu without login
items.append({
'endpoint': 'admin.admin_home',
'label': 'Admin',
})
notification_count = len(flask.g.user.notifications_unread)
if notification_count == 0:
notification_label = 'No unread notifications'
elif notification_count == 1:
notification_label = 'One unread notification'
else:
notification_label = f"{notification_count} unread notifications"
items.append({
'endpoint': 'admin.admin_notifications',
'label': notification_label,
})
items.append({
'endpoint': 'admin.comment_moderation',
'label': 'Comment moderation',
})
return rendering.Menu('main', items, burger=True)
2024-04-12 19:22:00 +00:00
@app.admin.menu('secondary')
def admin_menu_secondary():
items = []
if isinstance(flask.g.user, User): # logged in
items.append({
'endpoint': 'admin.admin_logout',
'label': 'Log out',
})
else:
items.append({
'endpoint': 'admin.admin_login',
'label': 'Log in',
})
return rendering.Menu('secondary', items)
2024-04-12 19:22:00 +00:00
@app.boot
def register_admin():
for cls in Administerable.__class_descendants__.values():
if not cls in app.models_abstract:
register_administerable(cls)
2024-04-12 19:22:00 +00:00
app.register_blueprint(app.admin)