pdnew/main.py

1046 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# builtins
import math
import datetime
import functools
import random
# third-party
import markupsafe
import werkzeug
import click
import flask
import peewee
# scraping-specific third party
import requests
import bs4
# internals
from application import app
import util
import exceptions
import geoip
import rendering
import markdown
import form
import database
import admin
import upload
import tagging
import commenting
import search
import cli
@app.errorhandler(Exception)
def error_catchall(e):
if app.debug:
raise
app.logger.error(e)
@rendering.page(title_show=False)
def errorpage():
if isinstance(e, exceptions.ExposedException):
return rendering.ErrorPage(e.message)
if isinstance(e, werkzeug.exceptions.HTTPException):
return rendering.ErrorPage(e.description, title=e.name, status=e.code)
if isinstance(e, peewee.DoesNotExist):
return rendering.ErrorPage("Item could not be found.", title="Not found", status=404)
if isinstance(e, peewee.OperationalError):
return rendering.ErrorPage("Temporary malfunction.", status=503)
return rendering.ErrorPage("An error has occured.")
try:
return errorpage()
except Exception as e_errorpage:
return error_fallback(e_errorpage)
def error_fallback(e=None):
if app.debug:
raise e
app.logger.error(f"Error fallback triggered: {str(e)}")
template_candidates = [
f'{app.config["THEME"]}/templates/layout/error_fallback.html'
]
if app.config['THEME'] != 'default':
template_candidates.append('default/templates/layout/error_fallback.html')
return flask.render_template(template_candidates), 500
@app.menu('main')
def menu_main():
items = [
{
'endpoint': 'frontpage',
'label': 'Home',
},
{
'endpoint': 'article_listing',
'label': 'Articles',
},
{
'endpoint': 'project_listing',
'label': 'Projects',
},
{
'endpoint': 'propaganda_listing',
'label': 'Propaganda',
},
{
'endpoint': 'curatedart_listing',
'label': 'Curated art',
},
{
'endpoint': 'tag_listing',
'label': 'Tags',
},
{
'url': 'https://rnd.phryk.net',
'label': 'R&D',
},
]
return rendering.Menu('main', items, burger=True)
@app.menu('footer')
def menu_footer():
items = [
{
'endpoint': 'the_button',
'label': 'The Button',
},
{
'endpoint': 'view_page',
'params': {
'path': '/donate',
},
'label': 'Donate',
},
{
'endpoint': 'view_page',
'params': {
'path': '/hire',
},
'label': 'Hire',
},
]
return rendering.Menu('secondary', items)
@app.block('seal')
def seal_message():
messages = app.config['SEAL_MESSAGES']
idx = random.randint(0, len(messages) - 1)
return messages[idx]
class FrontPage(rendering.Renderable):
__commentables__ = []
@classmethod
def include(cls, commentable):
"""
Decorator to make Commentable-derived types show up in frontpage listing.
"""
if not issubclass(commentable, commenting.Commentable):
raise TypeError("FrontPage.include only takes classes derived from Commentable")
cls.__commentables__.append(commentable)
return commentable
def __init__(self, offset=0, **kwargs):
super().__init__(**kwargs)
self.title = None
self.page = None
self.offset = offset
try:
self.page = Page.get((Page.path == '/') & (Page.published == True))
self.title = self.page.title
except Page.DoesNotExist:
pass
stmt = None
#for name, commentable in commenting.Commentable.__class_descendants__.items():
for commentable in self.__class__.__commentables__:
select = commentable.select(
peewee.SQL(f'\'{commentable.__name__}\' AS "type"'), # so we know what class to instantiate
commentable.name, # ditto, but identifying the exact row
commentable.created
).where(commentable.published == True)
if stmt is None:
stmt = select
else:
stmt = stmt.union_all(select)
stmt = stmt.order_by(stmt.c.created.desc(), stmt.c.name.asc())
results = stmt.dicts()
def item_constructor(item):
commentable = commenting.Commentable.__class_descendants__[ item['type'] ]
return commentable.load(item['name'])
self.results = rendering.Listing(results, endpoint='frontpage', offset=offset, item_constructor=item_constructor)
@app.route('/')
@app.route('/+<int:offset>')
@rendering.page(title_show=False)
def frontpage(offset=0):
return FrontPage(offset=offset)
class ButtonForm(form.Form):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.intro = markupsafe.Markup(f"Push The Button to <em>kill a billionaire</em>.")
self.buttons['kill'] = form.Button(id='the-button', label='')
def process(self, submit):
if submit == 'kill':
location = geoip.lookup_city(flask.request.remote_addr)
press = ButtonPress()
if location:
press.geo_continent = location.continent.name
press.geo_country = location.country.name
press.geo_city = location.city.name
press.geo_latitude = location.location.latitude
press.geo_longitude = location.location.longitude
press.save(force_insert=True)
self.count = ButtonPress.select().count()
self.intro = markupsafe.Markup(f"Your pledge has been confirmed.<br>You are person <em>#{ self.count }</em> to make the pledge.")
del self.buttons['kill']
class ButtonPress(database.RenderableModel):
created = peewee.DateTimeField(null=False, default=datetime.datetime.utcnow)
geo_continent = peewee.CharField(null=True)
geo_country = peewee.CharField(null=True)
geo_city = peewee.CharField(null=True)
geo_latitude = peewee.FloatField(null=True)
geo_longitude = peewee.FloatField(null=True)
@app.route('/button/', methods=['GET', 'POST'])
@rendering.page(title_show=False)
def the_button():
f = ButtonForm(title="The Button")
if flask.request.method == 'POST':
f.handle()
return f
class ScoredLink(admin.Administerable):
autoform_blacklist = ['id', 'external_site_count', 'last_scrape']
hue_range = (80,320) # hue (degrees) from 0 to .max
url = peewee.CharField(unique=True) # TODO: Regexp constraint # FIXME: overrides url function
external_site_count = peewee.IntegerField(null=True)
last_scrape = peewee.DateTimeField(null=True, default=None)
explanation_external_site_count = markdown.SafeMarkdownString("""
How many **third-party sites** the linked page loads data from.
This is relevant to your privacy because each of those third-party sites
gets at the very least to see that you visited the linked page.
*Lower* is better, **0** is *ideal*.""")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def scrape(self):
external_site_count = None
if self.url:
external_site_count = 0
url_domain = self.url.split('/')[2]
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36', # such stealth, many agent
}
html = requests.get(self.url, timeout=30, headers=headers).text
dom = bs4.BeautifulSoup(html, 'lxml')
scored_elements = {
'script': 'src',
'link': 'href',
'img': 'src',
'object': 'data'
}
external_domains = set()
for tag, attribute in scored_elements.items():
for element in dom.find_all(tag):
attribute_value = element.get(attribute)
# attribute is set and contains an absolute URL
if isinstance(attribute_value, str) and attribute_value.find('://') >= 0:
attribute_domain = attribute_value.split('/')[2]
if attribute_domain != url_domain and\
not attribute_domain.endswith(f'.{url_domain}'): # not a subdomain of url_domain
#external_site_count += 1
external_domains.add(attribute_domain)
self.external_site_count = len(external_domains)
self.last_scrape = datetime.datetime.utcnow()
self.save()
@classmethod
def load_by_url(cls, url):
return cls.select().where(cls.url == url).get()
@property
def set_size(self):
# returns the size of the set of all scored links
return self.__class__.select().count()
@property
def external_site_counts(self):
# TODO: We should probably cache this
# (and invalidate/rebuild cache at least in .save?)
cls = self.__class__
return [
row['external_site_count'] for row in cls.select(cls.external_site_count).where(cls.external_site_count != None).order_by(cls.external_site_count).dicts()
]
@property
def median(self):
if len(self.external_site_counts) == 0:
return 0
elif len(self.external_site_counts) == 1:
return self.external_site_counts[0]
median_idx = int(math.floor(len(self.external_site_counts) / 2.0))
if len(self.external_site_counts) % 2 == 0:
a = self.external_site_counts[median_idx -1]
b = self.external_site_counts[median_idx]
median = (a + b) / 2.0
else:
median = float(self.external_site_counts[median_idx])
return median
@property
def mean(self):
if len(self.external_site_counts) == 0:
return 0
elif len(self.external_site_counts) == 1:
return self.external_site_counts[0]
return sum(self.external_site_counts) / float(len(self.external_site_counts))
@property
def min(self):
if len(self.external_site_counts) == 0:
return 0
elif len(self.external_site_counts) == 1:
return self.external_site_counts[0]
return min(self.external_site_counts)
@property
def max(self):
if len(self.external_site_counts) == 0:
return 0
elif len(self.external_site_counts) == 1:
return self.external_site_counts[0]
return max(self.external_site_counts)
@property
def color(self):
color_good = util.Color(hue=self.hue_range[0], saturation=1, value=1)
color_bad = util.Color(hue=self.hue_range[1], saturation=1, value=1)
if self.external_site_count == 0:
return color_good
elif self.external_site_count == self.max:
return color_bad
if self.external_site_count is None or self.set_size == 1:
influence_good = 0.5
influence_bad = 0.5
else:
influence_good = 1.0 - (self.external_site_count / self.max)
influence_bad = 1.0 - ( (self.max - self.external_site_count) / self.max )
color_good.alpha = influence_good
color_bad.alpha = influence_bad
return color_good.blend(color_bad)
@app.cron
def scrape_scoredlinks():
now = datetime.datetime.utcnow()
period = datetime.timedelta(days=7)
count = 0
with click.progressbar(ScoredLink.select().where(
(ScoredLink.last_scrape < (now - period)) | (ScoredLink.last_scrape == None)
), label="Updating link scores where necessary", item_show_func=lambda x: x.url if x else '') as links:
for link in links:
try:
link.scrape()
except Exception: # TODO: more specific error handling
pass
else:
count += 1
click.secho(f"Updated {count} link scores.", fg='green')
def renderer_link_open(self, tokens, idx, options, env):
token = tokens[idx]
if 'href' in token.attrs and(
token.attrs['href'].startswith('http://') or\
token.attrs['href'].startswith('https://')
):
token.attrs['target'] = '_blank'
token.attrs['rel'] = 'noreferrer noopener'
if 'class' in token.attrs:
token.attrs['class'] += ' external'
else:
token.attrs['class'] = 'external'
attribute_string = ' '.join([f'{name}="{value}"' for (name, value) in token.attrs.items()])
return f'<a {attribute_string}>'
def renderer_link_close(self, tokens, idx, options, env):
# look back token by token until we find the closest preceding link_open token
# (i.e. the one corresponding to this closing token
for i in range(idx - 1, -1, -1): # range counting down from idx -1 to 0
if tokens[i].type == 'link_open':
token_opening = tokens[i]
break
icon = None
if 'href' in token_opening.attrs and (
token_opening.attrs['href'].startswith('http://') or\
token_opening.attrs['href'].startswith('https://')
):
try:
link = ScoredLink.load_by_url(token_opening.attrs['href'])
except ScoredLink.DoesNotExist:
link = ScoredLink(url=token_opening.attrs['href'])
link.save(force_insert=True)
if isinstance(link.external_site_count, int):
icon = f'<img class="scoredlink-icon" src="/theme/dynamic/scoredlink/{link.id}/" title="This URL loads data from {link.external_site_count} external sites (median: {link.median}, mean: {link.mean}, min {link.min}, max: {link.max}, set size: {link.set_size})" />'
if icon:
return f'</a>{icon}'
return '</a>'
markdown.md_unsafe.add_render_rule('link_open', renderer_link_open)
markdown.md_unsafe.add_render_rule('link_close', renderer_link_close)
@app.route('/theme/dynamic/scoredlink/<int:id>/')
def icon_link_external(id):
link = ScoredLink.load(id)
svg = link.render(mode='inline', format='svg')
response = flask.Response(svg, 200)
response.headers['Content-Type'] = 'image/svg+xml'
return response
@app.abstract
class LeadImageContent(commenting.Commentable):
lead_image = peewee.ForeignKeyField(upload.Image, null=True, verbose_name='Lead image', on_delete='SET NULL')
@FrontPage.include
@app.expose('/article/')
class Article(LeadImageContent):
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
class LinkForm(tagging.TaggableForm):
"""
More comfortable handing for a 'link' field that's a foreign key to ScoredLink.
Used by Project and CuratedContent.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
link = self.administerable.link
if link:
link_value = link.url
else:
link_value = ''
self['link_id'] = form.Text(
label=self.administerable.__class__.link.verbose_name,
help=self.administerable.__class__.link.help_text,
value=link_value
)
def process(self, submit):
if self.valid:
if self.mode in ('create', 'edit'):
if self['link_id'].value:
try:
link = ScoredLink.load_by_url(self['link_id'].value)
self.administerable.link = link
flask.flash(f"Assigned existing ScoredLink #{link.id}")
except ScoredLink.DoesNotExist:
link = ScoredLink(url=self['link_id'].value)
link.save(force_insert=True)
self.administerable.link = link
flask.flash(f"Created new Scoredlink #{link.id}")
return super().process(submit)
@app.expose('/project/')
class Project(LeadImageContent):
autoform_class = LinkForm
autoform_blacklist = ('id', 'comment_collection_id', 'link_id')
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
link = peewee.ForeignKeyField(ScoredLink, null=True, verbose_name='Link', help_text='URL for the original content, if any')
@FrontPage.include
@app.expose('/curated/')
class CuratedArt(LeadImageContent):
autoform_class = LinkForm
autoform_blacklist = ('id', 'comment_collection_id', 'link_id')
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
link = peewee.ForeignKeyField(ScoredLink, null=True, verbose_name='Link', help_text='URL for the original content, if any')
class PropagandaForm(tagging.TaggableForm):
def __init__(self, administerable, mode, *args, **kwargs):
super().__init__(administerable, mode, *args, **kwargs)
if mode == 'edit':
for piece in administerable.pieces_ordered:
# construct and modify fieldsets for PropagandaPiece
fieldset_piece = form.ProxyFieldset(piece.form('edit'), name=f'propagandapiece-{piece.id}')
fieldset_piece['collection_id'] = form.ValueField(value=administerable.id)
del(fieldset_piece['order_pos'])
fieldset_piece.fields.position('description', -1)
fieldset_delete = form.ProxyFieldset(piece.form('delete', name=f'propagandapiece-{piece.id}-delete'))
fieldset_delete.intro = None
fieldset_delete.buttons['delete'].label = ''
# construct fieldset for to change order_pos
fieldset_order = form.Fieldset(name='order')
fieldset_order.buttons['up'] = form.Button(name='up', label='')
fieldset_order.buttons['down'] = form.Button(name='down', label='')
# combine them in one fieldset that can be styled in one block
fieldset = form.Fieldset(name=f'{fieldset_piece.name}-wrapper', extra_classes=['propagandapiece-wrapper'])
fieldset['piece'] = fieldset_piece
fieldset['delete'] = fieldset_delete
fieldset['order'] = fieldset_order
# and add it to the form
self[fieldset.name] = fieldset
fieldset = form.ProxyFieldset(PropagandaPiece().form('create'))
fieldset['collection_id'] = form.ValueField(value=administerable.id)
del(fieldset['order_pos'])
self[fieldset.name] = fieldset
def handle(self, skip_bind=False):
if 'submit' not in flask.request.form:
self.errors.append(ValidationError("Missing submit information."))
return
submit = flask.request.form.get('submit')
if not skip_bind:
self.bind(flask.request.form, flask.request.files)
fieldset = self.submitted_fieldset(submit)
if fieldset is not None:
if (submit.endswith('.order.up') or submit.endswith('.order.down')) and 'propagandapieceitem' not in submit:
# one of the arrow buttons to reorder propagandapieces was pressed
self.process_order(submit, fieldset)
return flask.redirect('')
fieldset.handle(skip_bind=True)
return flask.redirect('')
else:
submit_relative = self.submit_relative(submit)
self.validate(submit_relative)
if len(self.errors):
for e in self.errors:
flask.flash(str(e), 'error')
return self.process(submit_relative)
def process_order(self, submit, fieldset):
piece = fieldset['piece'].form.administerable # grab PropagandaPiece from ProxyFieldset
if submit.endswith('.up'):
piece.order_pos -= 1
elif submit.endswith('.down'):
piece.order_pos += 1
piece.save()
@FrontPage.include
@app.expose('/propaganda/')
class Propaganda(commenting.Commentable):
autoform_class = PropagandaForm
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
@property
def pieces_ordered(self):
return self.pieces.order_by(PropagandaPiece.order_pos)
class PropagandaPieceForm(admin.AutoForm):
def __init__(self, administerable, mode, *args, **kwargs):
super().__init__(administerable, mode, *args, **kwargs)
if mode == 'edit':
self['preview'] = form.RenderableField(administerable, 'inline')
self['items'] = form.Fieldset()
for item in administerable.items_ordered:
fieldset_item = form.ProxyFieldset(item.form('edit'), name=f'propagandapieceitem-{item.id}')
fieldset_item['piece_id'] = form.ValueField(value=administerable.id)
del(fieldset_item['order_pos'])
fieldset_delete = form.ProxyFieldset(item.form('delete'), name=f'propagandapieceitem-{item.id}-delete')
fieldset_delete.intro = None
del(fieldset_delete['preview'])
fieldset_delete.buttons['delete'].label = ''
fieldset_order = form.Fieldset(name='order')
fieldset_order.buttons['up'] = form.Button(name='up', label='')
fieldset_order.buttons['down'] = form.Button(name='down', label='')
# HERE
fieldset = form.Fieldset(name=f'{fieldset_item.name}-wrapper', extra_classes=['propagandapieceitem-wrapper'])
fieldset['item'] = fieldset_item
fieldset['delete'] = fieldset_delete
fieldset['order'] = fieldset_order
self['items'][fieldset.name] = fieldset
fieldset = form.ProxyFieldset(PropagandaPieceItem().form('create'))
fieldset['piece_id'] = form.ValueField(value=administerable.id)
del(fieldset['order_pos'])
self['items']['new'] = fieldset
def handle(self, skip_bind=False):
if 'submit' not in flask.request.form:
self.errors.append(ValidationError("Missing submit information."))
return
submit = flask.request.form.get('submit')
if not skip_bind:
self.bind(flask.request.form, flask.request.files)
fieldset = self.submitted_fieldset(submit)
if fieldset is not None:
if submit.endswith('.order.up') or submit.endswith('.order.down'):
# one of the arrow buttons to reorder propagandapieceitems was pressed
self.process_order(submit, fieldset)
return flask.redirect('')
fieldset.handle(skip_bind=True)
return flask.redirect('')
else:
submit_relative = self.submit_relative(submit)
self.validate(submit_relative)
if len(self.errors):
for e in self.errors:
flask.flash(str(e), 'error')
return self.process(submit_relative)
def process(self, submit):
super().process(submit)
def process_order(self, submit, fieldset):
inner_fieldset = fieldset.submitted_fieldset(submit)
item = inner_fieldset['item'].form.administerable # grab PropagandaPieceItem from ProxyFieldset
if submit.endswith('.up'):
item.order_pos -= 1
elif submit.endswith('.down'):
item.order_pos += 1
flask.flash("Moved propaganda piece item.", 'success')
item.save()
class PropagandaPiece(admin.Administerable):
autoform_class = PropagandaPieceForm
collection = peewee.ForeignKeyField(Propaganda, null=False, verbose_name="Collection", backref='pieces', on_delete='CASCADE')
order_pos = peewee.IntegerField(null=False, default=0, verbose_name="Order position")
description = markdown.MarkdownTextField(verbose_name="Description")
@property
def items_ordered(self):
return self.items.order_by(PropagandaPieceItem.order_pos)
class PropagandaPieceItemForm(admin.AutoForm):
def __init__(self, administerable, mode, *args, **kwargs):
super().__init__(administerable, mode, *args, **kwargs)
if administerable.upload:
self['preview'] = form.RenderableField(administerable.upload, 'inline')
if mode != 'delete':
fieldset = form.Fieldset(name='video')
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.video, label='Existing video', value=administerable.video_id)
fieldset['file'] = form.File(label='New video')
self[fieldset.name] = fieldset
fieldset = form.Fieldset(name='image')
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.image, label='Existing image', value=administerable.image_id)
fieldset['file'] = form.File(label='New image')
self[fieldset.name] = fieldset
fieldset = form.Fieldset(name='file')
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.file, label='Existing file', value=administerable.file_id)
fieldset['file'] = form.File(label='New file')
self[fieldset.name] = fieldset
def process(self, submit):
if self.valid:
filled_ids = []
filled_files = []
types = ('video', 'image', 'file')
for type in types:
# collect which fields were filled
id = self[type]['id'].value
file = self[type]['file'].value
if file:
filled_files.append(type)
if id:
filled_ids.append(type)
if len(filled_files) > 0:
type = filled_files[0]
if len(filled_files) > 1:
flask.flash(f"You uploaded multiple files but only one can be assigned, using {type}.", 'warning')
cls = admin.Named.__class_descendants_lowercase__[type]
file = self[type]['file'].value
name = werkzeug.utils.secure_filename(file.filename).split('.')[0].lower()
try:
cls.load(name)
except cls.DoesNotExist:
# this is what we want, no collision
pass
else:
# this is what we don't want. generate random name instead
name = util.random_string_light()
instance = cls()
instance.name = name
instance.write_file(file)
instance.save(force_insert=True)
setattr(self.administerable, type, instance)
for other in [t for t in types if t != type]:
# remove any references to other types
setattr(self.administerable, other, None)
flask.flash(f"Created new {type} '{name}'.")
elif len(filled_ids) > 0: # elif means file uploads take precedence over ids
type = filled_ids[0]
if len(filled_ids) > 1:
flask.flash(f"You chose multiple existing uploads but only one can be assigned, using {type}.", 'warning')
cls = admin.Named.__class_descendants_lowercase__[type]
id = self[type]['id'].value
try:
instance = cls.get(cls.id == id)
except cls.DoesNotExist:
flask.flash(f"Tried assigning non-existent {type}.", 'error')
else:
setattr(self.administerable, type, instance)
return super().process(submit)
class PropagandaPieceItem(admin.Administerable):
class Meta:
constraints = [
peewee.SQL("""
CONSTRAINT only_one_file CHECK(
(
("video_id" IS NOT NULL)::INTEGER +
("image_id" IS NOT NULL)::INTEGER +
("file_id" IS NOT NULL)::INTEGER
) = 1
)
""")
]
autoform_class = PropagandaPieceItemForm
autoform_blacklist = ('id', 'video_id', 'image_id', 'file_id')
piece = peewee.ForeignKeyField(PropagandaPiece, null=False, backref='items', on_delete='CASCADE', verbose_name='Propaganda piece')
order_pos = peewee.IntegerField(null=False, default=0, verbose_name='Order position')
video = peewee.ForeignKeyField(upload.Video, null=True, on_delete='CASCADE')
image = peewee.ForeignKeyField(upload.Image, null=True, on_delete='CASCADE')
file = peewee.ForeignKeyField(upload.File, null=True, on_delete='CASCADE')
@property
def upload(self):
if self.video_id:
return self.video
elif self.image_id:
return self.image
elif self.file_id:
return self.file
class Page(admin.Administerable):
path = peewee.CharField(unique=True)
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
lead_image = peewee.ForeignKeyField(upload.Image, null=True, verbose_name='Lead image', on_delete='SET NULL')
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
published = peewee.BooleanField(null=False, default=False, verbose_name='Published')
@app.route('/<path:path>')
@rendering.page(title_show=False)
def view_page(path):
# thrown Page.DoesNotExist will automatically trigger 404.
page = Page.get(Page.path == path)
if page.published or flask.g.user:
return page
raise Page.DoesNotExist()
class PermaRedirect(admin.Administerable):
# WARNING: This entire class is hacky and might break with virtually any flask or werkzeug update
__cache__ = {} # collect simplified instances in memory, keyed by id
match_path = peewee.CharField(null=False, unique=True, verbose_name='Match path', help_text='The path to match (i.e. as it was in URL on the old site')
redirect_path = peewee.CharField(null=False, verbose_name='Redirect path', help_text='The path to redirect to')
@classmethod
def view(cls, mode='full', **kwargs):
if mode != 'full':
return super().view(mode=mode, **kwargs)
info = cls.__cache__[kwargs['id']]
return flask.redirect(info['redirect_path'])
@classmethod
def boot(cls):
try:
for instance in cls.select():
cls.__cache__[instance.id] = {'match_path': instance.match_path, 'redirect_path': instance.redirect_path}
cls.update_routes()
except Exception as e:
app.logger.warning('Could not initialize PermaRedirect cache.')
@classmethod
def update_routes(cls):
for id, item in cls.__cache__.items():
# this is a simplified version of add_url_rule's logic
endpoint = f'permaredirect-{id}'
if endpoint in app.url_map._rules_by_endpoint:
app.url_map.remove_endpoint(endpoint)
#app.add_url_rule(item['match_path'], endpoint, functools.partial(cls.view, mode='full', id=id))
rule = app.url_rule_class(item['match_path'], methods=('GET', 'OPTIONS'), endpoint=endpoint)
rule.provide_automatic_options = True
app.url_map.add(rule)
app.view_functions[endpoint] = functools.partial(cls.view, mode='full', id=id)
def save(self, **kwargs):
# NOTE/TODO: This is an ugly hack (but so is the rest of this class).
# Should™ (also) be checked/handled via form validation
for path in (self.match_path, self.redirect_path):
if '<' in path or '>' in path:
raise ValueError(f"PermaRedirect paths MUST NOT contain '<' or '>' but got: {path}")
super().save(**kwargs)
self.__class__.__cache__[self.id] = {'match_path': self.match_path, 'redirect_path': self.redirect_path}
self.__class__.update_routes()
def delete_instance(self, **kwargs):
del(self.__class__.__cache__[self.id])
self.__class__.update_routes()
super().delete_instance(**kwargs)
app.boot(PermaRedirect.boot)
app.boot_run()
if __name__ == '__main__':
import flask.cli # needed for least worst app discovery hack
app.cli(obj=flask.cli.ScriptInfo(create_app = lambda: app)) # expose CLI as executable