1046 lines
33 KiB
Python
1046 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
|
||
# builtins
|
||
import math
|
||
import datetime
|
||
import functools
|
||
import random
|
||
|
||
# third-party
|
||
import markupsafe
|
||
import werkzeug
|
||
import click
|
||
import flask
|
||
import peewee
|
||
|
||
# scraping-specific third party
|
||
import requests
|
||
import bs4
|
||
|
||
# internals
|
||
from application import app
|
||
|
||
import util
|
||
import exceptions
|
||
import geoip
|
||
import rendering
|
||
import markdown
|
||
import form
|
||
import database
|
||
import admin
|
||
import upload
|
||
import tagging
|
||
import commenting
|
||
import search
|
||
import cli
|
||
|
||
@app.errorhandler(Exception)
|
||
def error_catchall(e):
|
||
|
||
if app.debug:
|
||
raise
|
||
|
||
app.logger.error(e)
|
||
|
||
@rendering.page(title_show=False)
|
||
def errorpage():
|
||
|
||
if isinstance(e, exceptions.ExposedException):
|
||
return rendering.ErrorPage(e.message)
|
||
|
||
if isinstance(e, werkzeug.exceptions.HTTPException):
|
||
return rendering.ErrorPage(e.description, title=e.name, status=e.code)
|
||
|
||
if isinstance(e, peewee.DoesNotExist):
|
||
return rendering.ErrorPage("Item could not be found.", title="Not found", status=404)
|
||
|
||
if isinstance(e, peewee.OperationalError):
|
||
return rendering.ErrorPage("Temporary malfunction.", status=503)
|
||
|
||
return rendering.ErrorPage("An error has occured.")
|
||
|
||
try:
|
||
return errorpage()
|
||
except Exception as e_errorpage:
|
||
return error_fallback(e_errorpage)
|
||
|
||
def error_fallback(e=None):
|
||
|
||
if app.debug:
|
||
raise e
|
||
|
||
app.logger.error(f"Error fallback triggered: {str(e)}")
|
||
|
||
template_candidates = [
|
||
f'{app.config["THEME"]}/templates/layout/error_fallback.html'
|
||
]
|
||
|
||
if app.config['THEME'] != 'default':
|
||
template_candidates.append('default/templates/layout/error_fallback.html')
|
||
|
||
return flask.render_template(template_candidates), 500
|
||
|
||
@app.menu('main')
|
||
def menu_main():
|
||
|
||
items = [
|
||
{
|
||
'endpoint': 'frontpage',
|
||
'label': 'Home',
|
||
},
|
||
{
|
||
'endpoint': 'article_listing',
|
||
'label': 'Articles',
|
||
},
|
||
{
|
||
'endpoint': 'project_listing',
|
||
'label': 'Projects',
|
||
},
|
||
{
|
||
'endpoint': 'propaganda_listing',
|
||
'label': 'Propaganda',
|
||
},
|
||
{
|
||
'endpoint': 'curatedart_listing',
|
||
'label': 'Curated art',
|
||
},
|
||
{
|
||
'endpoint': 'tag_listing',
|
||
'label': 'Tags',
|
||
},
|
||
{
|
||
'url': 'https://rnd.phryk.net',
|
||
'label': 'R&D',
|
||
},
|
||
]
|
||
|
||
return rendering.Menu('main', items, burger=True)
|
||
|
||
@app.menu('footer')
|
||
def menu_footer():
|
||
|
||
items = [
|
||
{
|
||
'endpoint': 'the_button',
|
||
'label': 'The Button',
|
||
},
|
||
{
|
||
'endpoint': 'view_page',
|
||
'params': {
|
||
'path': '/donate',
|
||
},
|
||
'label': 'Donate',
|
||
},
|
||
{
|
||
'endpoint': 'view_page',
|
||
'params': {
|
||
'path': '/hire',
|
||
},
|
||
'label': 'Hire',
|
||
},
|
||
]
|
||
|
||
return rendering.Menu('secondary', items)
|
||
|
||
@app.block('seal')
|
||
def seal_message():
|
||
|
||
messages = app.config['SEAL_MESSAGES']
|
||
idx = random.randint(0, len(messages) - 1)
|
||
|
||
return messages[idx]
|
||
|
||
class FrontPage(rendering.Renderable):
|
||
|
||
__commentables__ = []
|
||
|
||
@classmethod
|
||
def include(cls, commentable):
|
||
|
||
"""
|
||
Decorator to make Commentable-derived types show up in frontpage listing.
|
||
"""
|
||
|
||
if not issubclass(commentable, commenting.Commentable):
|
||
raise TypeError("FrontPage.include only takes classes derived from Commentable")
|
||
|
||
cls.__commentables__.append(commentable)
|
||
|
||
return commentable
|
||
|
||
def __init__(self, offset=0, **kwargs):
|
||
|
||
super().__init__(**kwargs)
|
||
|
||
self.title = None
|
||
self.page = None
|
||
self.offset = offset
|
||
|
||
try:
|
||
self.page = Page.get((Page.path == '/') & (Page.published == True))
|
||
self.title = self.page.title
|
||
|
||
except Page.DoesNotExist:
|
||
pass
|
||
|
||
stmt = None
|
||
|
||
#for name, commentable in commenting.Commentable.__class_descendants__.items():
|
||
for commentable in self.__class__.__commentables__:
|
||
|
||
select = commentable.select(
|
||
peewee.SQL(f'\'{commentable.__name__}\' AS "type"'), # so we know what class to instantiate
|
||
commentable.name, # ditto, but identifying the exact row
|
||
commentable.created
|
||
).where(commentable.published == True)
|
||
|
||
if stmt is None:
|
||
stmt = select
|
||
else:
|
||
stmt = stmt.union_all(select)
|
||
|
||
stmt = stmt.order_by(stmt.c.created.desc(), stmt.c.name.asc())
|
||
|
||
results = stmt.dicts()
|
||
|
||
def item_constructor(item):
|
||
|
||
commentable = commenting.Commentable.__class_descendants__[ item['type'] ]
|
||
return commentable.load(item['name'])
|
||
|
||
self.results = rendering.Listing(results, endpoint='frontpage', offset=offset, item_constructor=item_constructor)
|
||
|
||
@app.route('/')
|
||
@app.route('/+<int:offset>')
|
||
@rendering.page(title_show=False)
|
||
def frontpage(offset=0):
|
||
|
||
return FrontPage(offset=offset)
|
||
|
||
class ButtonForm(form.Form):
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
|
||
super().__init__(*args, **kwargs)
|
||
|
||
self.intro = markupsafe.Markup(f"Push The Button to <em>kill a billionaire</em>.")
|
||
self.buttons['kill'] = form.Button(id='the-button', label='☠')
|
||
|
||
def process(self, submit):
|
||
|
||
if submit == 'kill':
|
||
|
||
location = geoip.lookup_city(flask.request.remote_addr)
|
||
|
||
press = ButtonPress()
|
||
|
||
if location:
|
||
|
||
press.geo_continent = location.continent.name
|
||
press.geo_country = location.country.name
|
||
press.geo_city = location.city.name
|
||
press.geo_latitude = location.location.latitude
|
||
press.geo_longitude = location.location.longitude
|
||
|
||
press.save(force_insert=True)
|
||
|
||
self.count = ButtonPress.select().count()
|
||
|
||
self.intro = markupsafe.Markup(f"Your pledge has been confirmed.<br>You are person <em>#{ self.count }</em> to make the pledge.")
|
||
del self.buttons['kill']
|
||
|
||
class ButtonPress(database.RenderableModel):
|
||
|
||
created = peewee.DateTimeField(null=False, default=datetime.datetime.utcnow)
|
||
geo_continent = peewee.CharField(null=True)
|
||
geo_country = peewee.CharField(null=True)
|
||
geo_city = peewee.CharField(null=True)
|
||
geo_latitude = peewee.FloatField(null=True)
|
||
geo_longitude = peewee.FloatField(null=True)
|
||
|
||
@app.route('/button/', methods=['GET', 'POST'])
|
||
@rendering.page(title_show=False)
|
||
def the_button():
|
||
|
||
f = ButtonForm(title="The Button")
|
||
|
||
if flask.request.method == 'POST':
|
||
f.handle()
|
||
|
||
return f
|
||
|
||
class ScoredLink(admin.Administerable):
|
||
|
||
autoform_blacklist = ['id', 'external_site_count', 'last_scrape']
|
||
hue_range = (80,320) # hue (degrees) from 0 to .max
|
||
|
||
url = peewee.CharField(unique=True) # TODO: Regexp constraint # FIXME: overrides url function
|
||
external_site_count = peewee.IntegerField(null=True)
|
||
last_scrape = peewee.DateTimeField(null=True, default=None)
|
||
|
||
explanation_external_site_count = markdown.SafeMarkdownString("""
|
||
How many **third-party sites** the linked page loads data from.
|
||
|
||
This is relevant to your privacy because each of those third-party sites
|
||
gets – at the very least – to see that you visited the linked page.
|
||
|
||
*Lower* is better, **0** is *ideal*.""")
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
|
||
super().__init__(*args, **kwargs)
|
||
|
||
def scrape(self):
|
||
|
||
external_site_count = None
|
||
|
||
if self.url:
|
||
|
||
external_site_count = 0
|
||
url_domain = self.url.split('/')[2]
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36', # such stealth, many agent
|
||
}
|
||
|
||
html = requests.get(self.url, timeout=30, headers=headers).text
|
||
dom = bs4.BeautifulSoup(html, 'lxml')
|
||
|
||
scored_elements = {
|
||
'script': 'src',
|
||
'link': 'href',
|
||
'img': 'src',
|
||
'object': 'data'
|
||
}
|
||
|
||
external_domains = set()
|
||
|
||
for tag, attribute in scored_elements.items():
|
||
|
||
for element in dom.find_all(tag):
|
||
|
||
attribute_value = element.get(attribute)
|
||
|
||
# attribute is set and contains an absolute URL
|
||
if isinstance(attribute_value, str) and attribute_value.find('://') >= 0:
|
||
|
||
attribute_domain = attribute_value.split('/')[2]
|
||
|
||
if attribute_domain != url_domain and\
|
||
not attribute_domain.endswith(f'.{url_domain}'): # not a subdomain of url_domain
|
||
#external_site_count += 1
|
||
external_domains.add(attribute_domain)
|
||
|
||
self.external_site_count = len(external_domains)
|
||
self.last_scrape = datetime.datetime.utcnow()
|
||
self.save()
|
||
|
||
@classmethod
|
||
def load_by_url(cls, url):
|
||
|
||
return cls.select().where(cls.url == url).get()
|
||
|
||
@property
|
||
def set_size(self):
|
||
|
||
# returns the size of the set of all scored links
|
||
return self.__class__.select().count()
|
||
|
||
@property
|
||
def external_site_counts(self):
|
||
|
||
# TODO: We should probably cache this
|
||
# (and invalidate/rebuild cache at least in .save?)
|
||
|
||
cls = self.__class__
|
||
|
||
return [
|
||
row['external_site_count'] for row in cls.select(cls.external_site_count).where(cls.external_site_count != None).order_by(cls.external_site_count).dicts()
|
||
]
|
||
|
||
@property
|
||
def median(self):
|
||
|
||
if len(self.external_site_counts) == 0:
|
||
return 0
|
||
|
||
elif len(self.external_site_counts) == 1:
|
||
return self.external_site_counts[0]
|
||
|
||
median_idx = int(math.floor(len(self.external_site_counts) / 2.0))
|
||
if len(self.external_site_counts) % 2 == 0:
|
||
|
||
a = self.external_site_counts[median_idx -1]
|
||
b = self.external_site_counts[median_idx]
|
||
|
||
median = (a + b) / 2.0
|
||
else:
|
||
median = float(self.external_site_counts[median_idx])
|
||
|
||
return median
|
||
|
||
@property
|
||
def mean(self):
|
||
|
||
if len(self.external_site_counts) == 0:
|
||
return 0
|
||
|
||
elif len(self.external_site_counts) == 1:
|
||
return self.external_site_counts[0]
|
||
|
||
return sum(self.external_site_counts) / float(len(self.external_site_counts))
|
||
|
||
@property
|
||
def min(self):
|
||
|
||
if len(self.external_site_counts) == 0:
|
||
return 0
|
||
|
||
elif len(self.external_site_counts) == 1:
|
||
return self.external_site_counts[0]
|
||
|
||
return min(self.external_site_counts)
|
||
|
||
@property
|
||
def max(self):
|
||
|
||
if len(self.external_site_counts) == 0:
|
||
return 0
|
||
|
||
elif len(self.external_site_counts) == 1:
|
||
return self.external_site_counts[0]
|
||
|
||
return max(self.external_site_counts)
|
||
|
||
@property
|
||
def color(self):
|
||
|
||
color_good = util.Color(hue=self.hue_range[0], saturation=1, value=1)
|
||
color_bad = util.Color(hue=self.hue_range[1], saturation=1, value=1)
|
||
|
||
if self.external_site_count == 0:
|
||
return color_good
|
||
elif self.external_site_count == self.max:
|
||
return color_bad
|
||
|
||
if self.external_site_count is None or self.set_size == 1:
|
||
|
||
influence_good = 0.5
|
||
influence_bad = 0.5
|
||
|
||
else:
|
||
|
||
influence_good = 1.0 - (self.external_site_count / self.max)
|
||
influence_bad = 1.0 - ( (self.max - self.external_site_count) / self.max )
|
||
|
||
color_good.alpha = influence_good
|
||
color_bad.alpha = influence_bad
|
||
|
||
return color_good.blend(color_bad)
|
||
|
||
@app.cron
|
||
def scrape_scoredlinks():
|
||
|
||
now = datetime.datetime.utcnow()
|
||
period = datetime.timedelta(days=7)
|
||
count = 0
|
||
|
||
with click.progressbar(ScoredLink.select().where(
|
||
(ScoredLink.last_scrape < (now - period)) | (ScoredLink.last_scrape == None)
|
||
), label="Updating link scores where necessary", item_show_func=lambda x: x.url if x else '') as links:
|
||
|
||
for link in links:
|
||
|
||
try:
|
||
link.scrape()
|
||
except Exception: # TODO: more specific error handling
|
||
pass
|
||
else:
|
||
count += 1
|
||
|
||
click.secho(f"Updated {count} link scores.", fg='green')
|
||
|
||
def renderer_link_open(self, tokens, idx, options, env):
|
||
|
||
token = tokens[idx]
|
||
|
||
if 'href' in token.attrs and(
|
||
token.attrs['href'].startswith('http://') or\
|
||
token.attrs['href'].startswith('https://')
|
||
):
|
||
|
||
token.attrs['target'] = '_blank'
|
||
token.attrs['rel'] = 'noreferrer noopener'
|
||
|
||
if 'class' in token.attrs:
|
||
token.attrs['class'] += ' external'
|
||
else:
|
||
token.attrs['class'] = 'external'
|
||
|
||
attribute_string = ' '.join([f'{name}="{value}"' for (name, value) in token.attrs.items()])
|
||
return f'<a {attribute_string}>'
|
||
|
||
def renderer_link_close(self, tokens, idx, options, env):
|
||
|
||
# look back token by token until we find the closest preceding link_open token
|
||
# (i.e. the one corresponding to this closing token
|
||
|
||
for i in range(idx - 1, -1, -1): # range counting down from idx -1 to 0
|
||
if tokens[i].type == 'link_open':
|
||
token_opening = tokens[i]
|
||
break
|
||
|
||
icon = None
|
||
|
||
if 'href' in token_opening.attrs and (
|
||
token_opening.attrs['href'].startswith('http://') or\
|
||
token_opening.attrs['href'].startswith('https://')
|
||
):
|
||
|
||
try:
|
||
|
||
link = ScoredLink.load_by_url(token_opening.attrs['href'])
|
||
|
||
except ScoredLink.DoesNotExist:
|
||
|
||
link = ScoredLink(url=token_opening.attrs['href'])
|
||
link.save(force_insert=True)
|
||
|
||
if isinstance(link.external_site_count, int):
|
||
icon = f'<img class="scoredlink-icon" src="/theme/dynamic/scoredlink/{link.id}/" title="This URL loads data from {link.external_site_count} external sites (median: {link.median}, mean: {link.mean}, min {link.min}, max: {link.max}, set size: {link.set_size})" />'
|
||
|
||
if icon:
|
||
return f'</a>{icon}'
|
||
|
||
return '</a>'
|
||
|
||
markdown.md_unsafe.add_render_rule('link_open', renderer_link_open)
|
||
markdown.md_unsafe.add_render_rule('link_close', renderer_link_close)
|
||
|
||
@app.route('/theme/dynamic/scoredlink/<int:id>/')
|
||
def icon_link_external(id):
|
||
|
||
link = ScoredLink.load(id)
|
||
|
||
svg = link.render(mode='inline', format='svg')
|
||
|
||
response = flask.Response(svg, 200)
|
||
|
||
response.headers['Content-Type'] = 'image/svg+xml'
|
||
|
||
return response
|
||
|
||
@app.abstract
|
||
class LeadImageContent(commenting.Commentable):
|
||
|
||
lead_image = peewee.ForeignKeyField(upload.Image, null=True, verbose_name='Lead image', on_delete='SET NULL')
|
||
|
||
@FrontPage.include
|
||
@app.expose('/article/')
|
||
class Article(LeadImageContent):
|
||
|
||
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
|
||
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
|
||
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
|
||
|
||
class LinkForm(tagging.TaggableForm):
|
||
|
||
"""
|
||
More comfortable handing for a 'link' field that's a foreign key to ScoredLink.
|
||
Used by Project and CuratedContent.
|
||
"""
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
|
||
super().__init__(*args, **kwargs)
|
||
link = self.administerable.link
|
||
if link:
|
||
link_value = link.url
|
||
else:
|
||
link_value = ''
|
||
|
||
self['link_id'] = form.Text(
|
||
label=self.administerable.__class__.link.verbose_name,
|
||
help=self.administerable.__class__.link.help_text,
|
||
value=link_value
|
||
)
|
||
|
||
def process(self, submit):
|
||
|
||
if self.valid:
|
||
|
||
if self.mode in ('create', 'edit'):
|
||
|
||
if self['link_id'].value:
|
||
|
||
try:
|
||
|
||
link = ScoredLink.load_by_url(self['link_id'].value)
|
||
self.administerable.link = link
|
||
flask.flash(f"Assigned existing ScoredLink #{link.id}")
|
||
|
||
except ScoredLink.DoesNotExist:
|
||
|
||
link = ScoredLink(url=self['link_id'].value)
|
||
link.save(force_insert=True)
|
||
self.administerable.link = link
|
||
flask.flash(f"Created new Scoredlink #{link.id}")
|
||
|
||
return super().process(submit)
|
||
|
||
|
||
@app.expose('/project/')
|
||
class Project(LeadImageContent):
|
||
|
||
autoform_class = LinkForm
|
||
autoform_blacklist = ('id', 'comment_collection_id', 'link_id')
|
||
|
||
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
|
||
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
|
||
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
|
||
link = peewee.ForeignKeyField(ScoredLink, null=True, verbose_name='Link', help_text='URL for the original content, if any')
|
||
|
||
@FrontPage.include
|
||
@app.expose('/curated/')
|
||
class CuratedArt(LeadImageContent):
|
||
|
||
autoform_class = LinkForm
|
||
autoform_blacklist = ('id', 'comment_collection_id', 'link_id')
|
||
|
||
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
|
||
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
|
||
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
|
||
link = peewee.ForeignKeyField(ScoredLink, null=True, verbose_name='Link', help_text='URL for the original content, if any')
|
||
|
||
class PropagandaForm(tagging.TaggableForm):
|
||
|
||
def __init__(self, administerable, mode, *args, **kwargs):
|
||
|
||
super().__init__(administerable, mode, *args, **kwargs)
|
||
|
||
if mode == 'edit':
|
||
|
||
for piece in administerable.pieces_ordered:
|
||
|
||
# construct and modify fieldsets for PropagandaPiece
|
||
fieldset_piece = form.ProxyFieldset(piece.form('edit'), name=f'propagandapiece-{piece.id}')
|
||
fieldset_piece['collection_id'] = form.ValueField(value=administerable.id)
|
||
del(fieldset_piece['order_pos'])
|
||
fieldset_piece.fields.position('description', -1)
|
||
|
||
fieldset_delete = form.ProxyFieldset(piece.form('delete', name=f'propagandapiece-{piece.id}-delete'))
|
||
fieldset_delete.intro = None
|
||
fieldset_delete.buttons['delete'].label = '⊗'
|
||
|
||
# construct fieldset for to change order_pos
|
||
fieldset_order = form.Fieldset(name='order')
|
||
fieldset_order.buttons['up'] = form.Button(name='up', label='⌃')
|
||
fieldset_order.buttons['down'] = form.Button(name='down', label='⌄')
|
||
|
||
# combine them in one fieldset that can be styled in one block
|
||
fieldset = form.Fieldset(name=f'{fieldset_piece.name}-wrapper', extra_classes=['propagandapiece-wrapper'])
|
||
fieldset['piece'] = fieldset_piece
|
||
fieldset['delete'] = fieldset_delete
|
||
fieldset['order'] = fieldset_order
|
||
|
||
# and add it to the form
|
||
self[fieldset.name] = fieldset
|
||
|
||
fieldset = form.ProxyFieldset(PropagandaPiece().form('create'))
|
||
fieldset['collection_id'] = form.ValueField(value=administerable.id)
|
||
del(fieldset['order_pos'])
|
||
|
||
self[fieldset.name] = fieldset
|
||
|
||
def handle(self, skip_bind=False):
|
||
|
||
if 'submit' not in flask.request.form:
|
||
self.errors.append(ValidationError("Missing submit information."))
|
||
return
|
||
|
||
submit = flask.request.form.get('submit')
|
||
|
||
if not skip_bind:
|
||
self.bind(flask.request.form, flask.request.files)
|
||
|
||
fieldset = self.submitted_fieldset(submit)
|
||
|
||
if fieldset is not None:
|
||
|
||
if (submit.endswith('.order.up') or submit.endswith('.order.down')) and 'propagandapieceitem' not in submit:
|
||
|
||
# one of the arrow buttons to reorder propagandapieces was pressed
|
||
self.process_order(submit, fieldset)
|
||
return flask.redirect('')
|
||
|
||
fieldset.handle(skip_bind=True)
|
||
return flask.redirect('')
|
||
|
||
else:
|
||
submit_relative = self.submit_relative(submit)
|
||
self.validate(submit_relative)
|
||
|
||
if len(self.errors):
|
||
for e in self.errors:
|
||
flask.flash(str(e), 'error')
|
||
|
||
return self.process(submit_relative)
|
||
|
||
def process_order(self, submit, fieldset):
|
||
|
||
piece = fieldset['piece'].form.administerable # grab PropagandaPiece from ProxyFieldset
|
||
|
||
if submit.endswith('.up'):
|
||
piece.order_pos -= 1
|
||
|
||
elif submit.endswith('.down'):
|
||
piece.order_pos += 1
|
||
|
||
piece.save()
|
||
|
||
@FrontPage.include
|
||
@app.expose('/propaganda/')
|
||
class Propaganda(commenting.Commentable):
|
||
|
||
autoform_class = PropagandaForm
|
||
|
||
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
|
||
teaser = markdown.MarkdownTextField(null=False, verbose_name='Teaser')
|
||
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
|
||
|
||
@property
|
||
def pieces_ordered(self):
|
||
return self.pieces.order_by(PropagandaPiece.order_pos)
|
||
|
||
class PropagandaPieceForm(admin.AutoForm):
|
||
|
||
def __init__(self, administerable, mode, *args, **kwargs):
|
||
|
||
super().__init__(administerable, mode, *args, **kwargs)
|
||
|
||
if mode == 'edit':
|
||
|
||
self['preview'] = form.RenderableField(administerable, 'inline')
|
||
self['items'] = form.Fieldset()
|
||
|
||
for item in administerable.items_ordered:
|
||
|
||
fieldset_item = form.ProxyFieldset(item.form('edit'), name=f'propagandapieceitem-{item.id}')
|
||
fieldset_item['piece_id'] = form.ValueField(value=administerable.id)
|
||
del(fieldset_item['order_pos'])
|
||
|
||
fieldset_delete = form.ProxyFieldset(item.form('delete'), name=f'propagandapieceitem-{item.id}-delete')
|
||
fieldset_delete.intro = None
|
||
del(fieldset_delete['preview'])
|
||
fieldset_delete.buttons['delete'].label = '⊗'
|
||
|
||
fieldset_order = form.Fieldset(name='order')
|
||
fieldset_order.buttons['up'] = form.Button(name='up', label='⌃')
|
||
fieldset_order.buttons['down'] = form.Button(name='down', label='⌄')
|
||
|
||
# HERE
|
||
fieldset = form.Fieldset(name=f'{fieldset_item.name}-wrapper', extra_classes=['propagandapieceitem-wrapper'])
|
||
fieldset['item'] = fieldset_item
|
||
fieldset['delete'] = fieldset_delete
|
||
fieldset['order'] = fieldset_order
|
||
|
||
self['items'][fieldset.name] = fieldset
|
||
|
||
fieldset = form.ProxyFieldset(PropagandaPieceItem().form('create'))
|
||
fieldset['piece_id'] = form.ValueField(value=administerable.id)
|
||
del(fieldset['order_pos'])
|
||
|
||
self['items']['new'] = fieldset
|
||
|
||
def handle(self, skip_bind=False):
|
||
|
||
if 'submit' not in flask.request.form:
|
||
self.errors.append(ValidationError("Missing submit information."))
|
||
return
|
||
|
||
submit = flask.request.form.get('submit')
|
||
|
||
if not skip_bind:
|
||
self.bind(flask.request.form, flask.request.files)
|
||
|
||
fieldset = self.submitted_fieldset(submit)
|
||
|
||
if fieldset is not None:
|
||
|
||
if submit.endswith('.order.up') or submit.endswith('.order.down'):
|
||
# one of the arrow buttons to reorder propagandapieceitems was pressed
|
||
self.process_order(submit, fieldset)
|
||
return flask.redirect('')
|
||
|
||
fieldset.handle(skip_bind=True)
|
||
return flask.redirect('')
|
||
|
||
else:
|
||
submit_relative = self.submit_relative(submit)
|
||
self.validate(submit_relative)
|
||
|
||
if len(self.errors):
|
||
for e in self.errors:
|
||
flask.flash(str(e), 'error')
|
||
|
||
return self.process(submit_relative)
|
||
|
||
def process(self, submit):
|
||
|
||
super().process(submit)
|
||
|
||
def process_order(self, submit, fieldset):
|
||
|
||
inner_fieldset = fieldset.submitted_fieldset(submit)
|
||
item = inner_fieldset['item'].form.administerable # grab PropagandaPieceItem from ProxyFieldset
|
||
|
||
if submit.endswith('.up'):
|
||
item.order_pos -= 1
|
||
|
||
elif submit.endswith('.down'):
|
||
item.order_pos += 1
|
||
|
||
flask.flash("Moved propaganda piece item.", 'success')
|
||
item.save()
|
||
|
||
class PropagandaPiece(admin.Administerable):
|
||
|
||
autoform_class = PropagandaPieceForm
|
||
|
||
collection = peewee.ForeignKeyField(Propaganda, null=False, verbose_name="Collection", backref='pieces', on_delete='CASCADE')
|
||
order_pos = peewee.IntegerField(null=False, default=0, verbose_name="Order position")
|
||
description = markdown.MarkdownTextField(verbose_name="Description")
|
||
|
||
@property
|
||
def items_ordered(self):
|
||
return self.items.order_by(PropagandaPieceItem.order_pos)
|
||
|
||
class PropagandaPieceItemForm(admin.AutoForm):
|
||
|
||
def __init__(self, administerable, mode, *args, **kwargs):
|
||
|
||
super().__init__(administerable, mode, *args, **kwargs)
|
||
|
||
if administerable.upload:
|
||
self['preview'] = form.RenderableField(administerable.upload, 'inline')
|
||
|
||
if mode != 'delete':
|
||
|
||
fieldset = form.Fieldset(name='video')
|
||
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.video, label='Existing video', value=administerable.video_id)
|
||
fieldset['file'] = form.File(label='New video')
|
||
self[fieldset.name] = fieldset
|
||
|
||
fieldset = form.Fieldset(name='image')
|
||
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.image, label='Existing image', value=administerable.image_id)
|
||
fieldset['file'] = form.File(label='New image')
|
||
self[fieldset.name] = fieldset
|
||
|
||
fieldset = form.Fieldset(name='file')
|
||
fieldset['id'] = form.ForeignKeySelect(administerable.__class__.file, label='Existing file', value=administerable.file_id)
|
||
fieldset['file'] = form.File(label='New file')
|
||
self[fieldset.name] = fieldset
|
||
|
||
def process(self, submit):
|
||
|
||
if self.valid:
|
||
|
||
filled_ids = []
|
||
filled_files = []
|
||
|
||
types = ('video', 'image', 'file')
|
||
for type in types:
|
||
|
||
# collect which fields were filled
|
||
id = self[type]['id'].value
|
||
file = self[type]['file'].value
|
||
|
||
if file:
|
||
|
||
filled_files.append(type)
|
||
|
||
if id:
|
||
|
||
filled_ids.append(type)
|
||
|
||
if len(filled_files) > 0:
|
||
|
||
type = filled_files[0]
|
||
|
||
if len(filled_files) > 1:
|
||
flask.flash(f"You uploaded multiple files but only one can be assigned, using {type}.", 'warning')
|
||
|
||
cls = admin.Named.__class_descendants_lowercase__[type]
|
||
file = self[type]['file'].value
|
||
name = werkzeug.utils.secure_filename(file.filename).split('.')[0].lower()
|
||
|
||
try:
|
||
cls.load(name)
|
||
except cls.DoesNotExist:
|
||
# this is what we want, no collision
|
||
pass
|
||
else:
|
||
# this is what we don't want. generate random name instead
|
||
name = util.random_string_light()
|
||
|
||
instance = cls()
|
||
instance.name = name
|
||
instance.write_file(file)
|
||
instance.save(force_insert=True)
|
||
setattr(self.administerable, type, instance)
|
||
|
||
for other in [t for t in types if t != type]:
|
||
# remove any references to other types
|
||
setattr(self.administerable, other, None)
|
||
|
||
flask.flash(f"Created new {type} '{name}'.")
|
||
|
||
elif len(filled_ids) > 0: # elif means file uploads take precedence over ids
|
||
|
||
type = filled_ids[0]
|
||
|
||
if len(filled_ids) > 1:
|
||
flask.flash(f"You chose multiple existing uploads but only one can be assigned, using {type}.", 'warning')
|
||
|
||
cls = admin.Named.__class_descendants_lowercase__[type]
|
||
id = self[type]['id'].value
|
||
|
||
try:
|
||
instance = cls.get(cls.id == id)
|
||
except cls.DoesNotExist:
|
||
flask.flash(f"Tried assigning non-existent {type}.", 'error')
|
||
else:
|
||
setattr(self.administerable, type, instance)
|
||
|
||
return super().process(submit)
|
||
|
||
class PropagandaPieceItem(admin.Administerable):
|
||
|
||
class Meta:
|
||
constraints = [
|
||
peewee.SQL("""
|
||
CONSTRAINT only_one_file CHECK(
|
||
(
|
||
("video_id" IS NOT NULL)::INTEGER +
|
||
("image_id" IS NOT NULL)::INTEGER +
|
||
("file_id" IS NOT NULL)::INTEGER
|
||
) = 1
|
||
)
|
||
""")
|
||
]
|
||
|
||
autoform_class = PropagandaPieceItemForm
|
||
autoform_blacklist = ('id', 'video_id', 'image_id', 'file_id')
|
||
|
||
piece = peewee.ForeignKeyField(PropagandaPiece, null=False, backref='items', on_delete='CASCADE', verbose_name='Propaganda piece')
|
||
order_pos = peewee.IntegerField(null=False, default=0, verbose_name='Order position')
|
||
video = peewee.ForeignKeyField(upload.Video, null=True, on_delete='CASCADE')
|
||
image = peewee.ForeignKeyField(upload.Image, null=True, on_delete='CASCADE')
|
||
file = peewee.ForeignKeyField(upload.File, null=True, on_delete='CASCADE')
|
||
|
||
@property
|
||
def upload(self):
|
||
|
||
if self.video_id:
|
||
return self.video
|
||
elif self.image_id:
|
||
return self.image
|
||
elif self.file_id:
|
||
return self.file
|
||
|
||
class Page(admin.Administerable):
|
||
|
||
path = peewee.CharField(unique=True)
|
||
title = markdown.SafeMarkdownCharField(null=False, verbose_name='Title')
|
||
lead_image = peewee.ForeignKeyField(upload.Image, null=True, verbose_name='Lead image', on_delete='SET NULL')
|
||
text = markdown.MarkdownTextField(null=False, verbose_name='Text')
|
||
published = peewee.BooleanField(null=False, default=False, verbose_name='Published')
|
||
|
||
@app.route('/<path:path>')
|
||
@rendering.page(title_show=False)
|
||
def view_page(path):
|
||
|
||
# thrown Page.DoesNotExist will automatically trigger 404.
|
||
page = Page.get(Page.path == path)
|
||
|
||
if page.published or flask.g.user:
|
||
return page
|
||
|
||
raise Page.DoesNotExist()
|
||
|
||
class PermaRedirect(admin.Administerable):
|
||
|
||
# WARNING: This entire class is hacky and might break with virtually any flask or werkzeug update
|
||
|
||
__cache__ = {} # collect simplified instances in memory, keyed by id
|
||
match_path = peewee.CharField(null=False, unique=True, verbose_name='Match path', help_text='The path to match (i.e. as it was in URL on the old site')
|
||
redirect_path = peewee.CharField(null=False, verbose_name='Redirect path', help_text='The path to redirect to')
|
||
|
||
@classmethod
|
||
def view(cls, mode='full', **kwargs):
|
||
|
||
if mode != 'full':
|
||
return super().view(mode=mode, **kwargs)
|
||
|
||
info = cls.__cache__[kwargs['id']]
|
||
return flask.redirect(info['redirect_path'])
|
||
|
||
@classmethod
|
||
def boot(cls):
|
||
|
||
try:
|
||
|
||
for instance in cls.select():
|
||
cls.__cache__[instance.id] = {'match_path': instance.match_path, 'redirect_path': instance.redirect_path}
|
||
|
||
cls.update_routes()
|
||
|
||
except Exception as e:
|
||
app.logger.warning('Could not initialize PermaRedirect cache.')
|
||
|
||
@classmethod
|
||
def update_routes(cls):
|
||
|
||
for id, item in cls.__cache__.items():
|
||
|
||
# this is a simplified version of add_url_rule's logic
|
||
endpoint = f'permaredirect-{id}'
|
||
|
||
if endpoint in app.url_map._rules_by_endpoint:
|
||
app.url_map.remove_endpoint(endpoint)
|
||
|
||
#app.add_url_rule(item['match_path'], endpoint, functools.partial(cls.view, mode='full', id=id))
|
||
rule = app.url_rule_class(item['match_path'], methods=('GET', 'OPTIONS'), endpoint=endpoint)
|
||
rule.provide_automatic_options = True
|
||
|
||
app.url_map.add(rule)
|
||
app.view_functions[endpoint] = functools.partial(cls.view, mode='full', id=id)
|
||
|
||
def save(self, **kwargs):
|
||
|
||
# NOTE/TODO: This is an ugly hack (but so is the rest of this class).
|
||
# Should™ (also) be checked/handled via form validation
|
||
for path in (self.match_path, self.redirect_path):
|
||
if '<' in path or '>' in path:
|
||
raise ValueError(f"PermaRedirect paths MUST NOT contain '<' or '>' but got: {path}")
|
||
|
||
super().save(**kwargs)
|
||
|
||
self.__class__.__cache__[self.id] = {'match_path': self.match_path, 'redirect_path': self.redirect_path}
|
||
self.__class__.update_routes()
|
||
|
||
def delete_instance(self, **kwargs):
|
||
|
||
del(self.__class__.__cache__[self.id])
|
||
self.__class__.update_routes()
|
||
|
||
super().delete_instance(**kwargs)
|
||
|
||
app.boot(PermaRedirect.boot)
|
||
|
||
app.boot_run()
|
||
|
||
if __name__ == '__main__':
|
||
import flask.cli # needed for least worst app discovery hack
|
||
app.cli(obj=flask.cli.ScriptInfo(create_app = lambda: app)) # expose CLI as executable
|