622 lines
19 KiB
Python
Executable File
622 lines
19 KiB
Python
Executable File
#! /usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import math
|
|
import string
|
|
import os
|
|
import random
|
|
import datetime
|
|
import requests
|
|
import bs4
|
|
import markdown
|
|
import click
|
|
import flask
|
|
import poobrains
|
|
|
|
from io import BytesIO
|
|
from PIL import Image, ImageDraw, ImageFont, ImageSequence
|
|
|
|
app = poobrains.app
|
|
|
|
@app.route('/')
|
|
def front():
|
|
try:
|
|
return poobrains.redirect(poobrains.auth.User.load('phryk').url('full'))
|
|
except Exception:
|
|
return poobrains.redirect(Article.url())
|
|
|
|
class MemePattern(markdown.inlinepatterns.Pattern):
|
|
|
|
name = None
|
|
|
|
def __init__(self, pattern, name, md=None):
|
|
|
|
super(MemePattern, self).__init__(pattern, md=md)
|
|
self.name = name
|
|
|
|
def handleMatch(self, match):
|
|
|
|
if match:
|
|
|
|
caption = match.group(2)
|
|
if not MemeWhiteList.select().where(MemeWhiteList.caption == caption).count():
|
|
cache_entry = MemeWhiteList()
|
|
cache_entry.caption = caption
|
|
cache_entry.save()
|
|
|
|
element = markdown.util.etree.Element('img')
|
|
element.set('src', "/meme/%s/%s" % (self.name, caption))
|
|
element.set('alt', caption)
|
|
return element
|
|
|
|
return super(MemePattern, self).handleMatch(match)
|
|
|
|
|
|
class Memextension(markdown.Extension):
|
|
|
|
def extendMarkdown(self, md, md_globals):
|
|
if 'MEMES' in app.config:
|
|
for name in app.config['MEMES']:
|
|
md.inlinePatterns.add(name, MemePattern('<%s>(.*?)</%s>' % (name, name), name), '<reference')
|
|
|
|
poobrains.md.md.registerExtensions([Memextension()], [])
|
|
|
|
|
|
@app.expose('/meme/<string:name>/<string:caption>')
|
|
class Mememage(poobrains.auth.Protected):
|
|
|
|
def view(self, mode='full', name=None, caption=None):
|
|
|
|
if name in app.config['MEMES'] and MemeWhiteList.select().where(MemeWhiteList.caption == caption).count():
|
|
|
|
# TODO: is input sanitation still needed?
|
|
if ':' in caption and len(caption.split(':')) == 2:
|
|
upper, lower = caption.split(':')
|
|
else:
|
|
upper = None
|
|
lower = caption
|
|
|
|
filename = os.path.join(app.root_path, app.config['MEMES'][name])
|
|
extension = filename.split('.')[-1]
|
|
|
|
template = Image.open(filename)
|
|
font = ImageFont.truetype(os.path.join(app.root_path, 'LeagueGothic-Regular.otf'), 80)
|
|
|
|
resized = (750, int(template.height * (750.0 / template.width)))
|
|
|
|
if upper:
|
|
upper_size = font.getsize(upper)
|
|
upper_x = int(round(resized[0] / 2.0 - upper_size[0] / 2.0))
|
|
upper_y = 0
|
|
|
|
if lower:
|
|
lower_size = font.getsize(lower)
|
|
lower_x = int(round(resized[0] / 2.0 - lower_size[0] / 2.0))
|
|
lower_y = resized[1] - lower_size[1] - 10 # last int is margin from bottom
|
|
|
|
|
|
if extension == 'gif':
|
|
|
|
frames = []
|
|
for frame in ImageSequence.Iterator(template):
|
|
|
|
frame = frame.convert('RGBA').resize(resized, Image.BICUBIC)
|
|
text_layer = Image.new('RGBA', frame.size, (0,0,0,0))
|
|
text_draw = ImageDraw.Draw(text_layer)
|
|
|
|
if upper:
|
|
outlined_text(text_draw, upper, upper_x, upper_y, font=font)
|
|
|
|
if lower:
|
|
outlined_text(text_draw, lower, lower_x, lower_y, font=font)
|
|
|
|
frames.append(Image.alpha_composite(frame, text_layer))
|
|
|
|
meme = frames.pop(0).convert('P')
|
|
|
|
if 'duration' in template.info:
|
|
meme.info['duration'] = template.info['duration']
|
|
|
|
if 'loop' in template.info:
|
|
meme.info['loop'] = template.info['loop']
|
|
|
|
out = BytesIO()
|
|
meme.save(out, save_all=True, append_images=frames, format='GIF')
|
|
|
|
|
|
r = flask.Response(
|
|
out.getvalue(),
|
|
mimetype='image/gif'
|
|
)
|
|
|
|
r.cache_control.public = True
|
|
r.cache_control.max_age = 604800
|
|
|
|
return r
|
|
|
|
|
|
else:
|
|
|
|
meme = template.convert('RGBA').resize(resized, Image.BICUBIC)
|
|
|
|
text_layer = Image.new('RGBA', resized, (0,0,0,0))
|
|
text_draw = ImageDraw.Draw(text_layer)
|
|
|
|
if upper:
|
|
outlined_text(text_draw, upper, upper_x, upper_y, font=font)
|
|
|
|
if lower:
|
|
outlined_text(text_draw, lower, lower_x, lower_y, font=font)
|
|
|
|
meme = Image.alpha_composite(meme, text_layer)
|
|
|
|
out = BytesIO()
|
|
meme.save(out, format='PNG')
|
|
|
|
#img.save(filename='memes/foo.png')
|
|
r = flask.Response(
|
|
out.getvalue(),
|
|
mimetype='image/png'
|
|
)
|
|
r.cache_control.public = True
|
|
r.cache_control.max_age = 604800
|
|
|
|
return r
|
|
|
|
|
|
raise poobrains.auth.AccessDenied()
|
|
|
|
|
|
def outlined_text(drawing, text, x=0, y=0, font=None):
|
|
drawing.text((x-1, y-1), text, font=font, fill=(0,0,0,255))
|
|
drawing.text((x-1, y+1), text, font=font, fill=(0,0,0,255))
|
|
drawing.text((x+1, y+1), text, font=font, fill=(0,0,0,255))
|
|
drawing.text((x+1, y-1), text, font=font, fill=(0,0,0,255))
|
|
drawing.text((x, y), text, font=font, fill=(255,255,255,255))
|
|
|
|
|
|
# content types
|
|
|
|
class MemeWhiteList(poobrains.storage.Model):
|
|
|
|
# TODO: add functionality to remove references to deleted/removed storable instances
|
|
|
|
caption = poobrains.storage.fields.CharField()
|
|
|
|
|
|
class ScoredLink(poobrains.auth.Administerable):
|
|
|
|
class Meta:
|
|
form_blacklist = ['id', 'external_site_count', 'updated']
|
|
|
|
link = poobrains.storage.fields.CharField(unique=True) # TODO: Add an URLField to poobrains.
|
|
external_site_count = poobrains.storage.fields.IntegerField(null=True)
|
|
updated = poobrains.storage.fields.DateTimeField(null=False, default=datetime.datetime.now)
|
|
|
|
mean = None
|
|
median = None
|
|
set_size = None
|
|
|
|
|
|
def scrape_external_site_count(self):
|
|
|
|
external_site_count = 0
|
|
|
|
if self.link:
|
|
|
|
link_domain = self.link.split('/')[2]
|
|
|
|
html = requests.get(self.link, timeout=30).text
|
|
dom = bs4.BeautifulSoup(html, 'lxml')
|
|
|
|
scored_elements = {
|
|
'script': 'src',
|
|
'link': 'href',
|
|
'img': 'src',
|
|
'object': 'data'
|
|
}
|
|
|
|
for tag, attribute in scored_elements.items():
|
|
for element in dom.find_all(tag):
|
|
attribute_value = element.get(attribute)
|
|
|
|
if isinstance(attribute_value, str) and attribute_value.find('://') >= 0: # means this isn't a relative link
|
|
attribute_domain = attribute_value.split('/')[2]
|
|
if attribute_domain != link_domain and \
|
|
not attribute_domain.endswith('.%s' % link_domain): # whether attribute_domain is a subdomain of link_domain
|
|
external_site_count += 1
|
|
|
|
return external_site_count
|
|
|
|
|
|
def save(self, *args, **kwargs):
|
|
|
|
try:
|
|
self.external_site_count = self.scrape_external_site_count()
|
|
self.updated = datetime.datetime.now()
|
|
except Exception as e: # Match all errors so failures here don't interfere with normal operations
|
|
poobrains.app.logger.error('Could not scrape external site count for URL: %s' % self.link)
|
|
poobrains.app.logger.debug('Problem when scraping external site count: %s: %s' % (str(type(e)), str(e)))
|
|
|
|
#if app.debug:
|
|
# raise # break hard in debug mode to make it easier to find problems
|
|
|
|
return super(ScoredLink, self).save(*args, **kwargs)
|
|
|
|
|
|
@property
|
|
def set_size(self):
|
|
|
|
return self.__class__.select().count()
|
|
|
|
|
|
@property
|
|
def external_site_counts(self):
|
|
|
|
external_site_counts = []
|
|
for row in self.__class__.select(self.__class__.external_site_count).where(self.__class__.external_site_count != None).order_by(self.__class__.external_site_count).dicts():
|
|
external_site_counts.append(row['external_site_count'])
|
|
|
|
return external_site_counts
|
|
|
|
|
|
@property
|
|
def median(self):
|
|
|
|
median_idx = int(math.floor(len(self.external_site_counts) / 2.0))
|
|
if len(self.external_site_counts) % 2 == 0:
|
|
|
|
a = self.external_site_counts[median_idx -1]
|
|
b = self.external_site_counts[median_idx]
|
|
|
|
median = (a + b) / 2.0
|
|
else:
|
|
median = float(self.external_site_counts[median_idx])
|
|
|
|
return median
|
|
|
|
|
|
@property
|
|
def mean(self):
|
|
return sum(self.external_site_counts) / float(len(self.external_site_counts))
|
|
|
|
|
|
|
|
@property
|
|
def name(self):
|
|
return self.link
|
|
|
|
|
|
@app.expose('/source/organization/', mode='full')
|
|
class SourceOrganization(poobrains.commenting.Commentable):
|
|
|
|
parent = poobrains.storage.fields.ForeignKeyField('self', null=True)
|
|
title = poobrains.storage.fields.CharField()
|
|
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
|
|
description = poobrains.md.MarkdownField(null=True)
|
|
|
|
|
|
@app.expose('/source/author/', mode='full')
|
|
class SourceAuthor(poobrains.commenting.Commentable):
|
|
|
|
title = poobrains.storage.fields.CharField()
|
|
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
|
|
description = poobrains.md.MarkdownField(null=True)
|
|
|
|
|
|
@app.expose('/source/organizationauthor/', mode='full')
|
|
class SourceOrganizationAuthor(poobrains.commenting.Commentable):
|
|
|
|
organization = poobrains.storage.fields.ForeignKeyField(SourceOrganization)
|
|
author = poobrains.storage.fields.ForeignKeyField(SourceAuthor)
|
|
|
|
|
|
@app.expose('/source/', mode='full')
|
|
class Source(poobrains.commenting.Commentable):
|
|
|
|
title = poobrains.storage.fields.CharField()
|
|
type = poobrains.storage.fields.CharField() # TODO: We need some logic to make this useful. Also, build enum type compatible to sqlite+postgres?
|
|
author = poobrains.storage.fields.ForeignKeyField(SourceOrganizationAuthor)
|
|
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
|
|
description = poobrains.md.MarkdownField()
|
|
|
|
|
|
@app.expose('/article/', mode='full')
|
|
class Article(poobrains.commenting.Commentable):
|
|
|
|
title = poobrains.storage.fields.CharField()
|
|
text = poobrains.md.MarkdownField()
|
|
|
|
|
|
@app.expose('/projects/', mode='full')
|
|
class Project(poobrains.commenting.Commentable):
|
|
|
|
title = poobrains.storage.fields.CharField()
|
|
text = poobrains.md.MarkdownField()
|
|
link = poobrains.storage.fields.CharField()
|
|
|
|
|
|
@app.expose('/curated/', mode='full')
|
|
class CuratedContent(poobrains.commenting.Commentable):
|
|
|
|
title = poobrains.storage.fields.CharField()
|
|
description = poobrains.md.MarkdownField()
|
|
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
|
|
|
|
|
|
@app.site.box('menu_main')
|
|
def menu_main():
|
|
|
|
menu = poobrains.rendering.Menu('main')
|
|
|
|
try:
|
|
menu.append(Article.url('teaser'), 'Articles')
|
|
except poobrains.auth.AccessDenied:
|
|
pass
|
|
|
|
try:
|
|
menu.append(Project.url('teaser'), 'Projects')
|
|
except poobrains.auth.AccessDenied:
|
|
pass
|
|
|
|
try:
|
|
CuratedContent.url('teaser')
|
|
menu.append(CuratedContent.url('teaser'), 'Curated content')
|
|
except poobrains.auth.AccessDenied:
|
|
pass
|
|
|
|
try:
|
|
menu.append(Source.url('teaser'), 'Sources')
|
|
except poobrains.auth.AccessDenied:
|
|
pass
|
|
|
|
for url, caption in poobrains.auth.Page.main_menu_entries():
|
|
menu.append(url, caption)
|
|
|
|
return menu
|
|
|
|
|
|
DOGE = {
|
|
'prefix': [
|
|
'wow',
|
|
'such',
|
|
'many',
|
|
'more',
|
|
'so',
|
|
'lol',
|
|
'very',
|
|
'omg'
|
|
],
|
|
|
|
'thing': [
|
|
'wow',
|
|
'doge',
|
|
'shibe',
|
|
'1337 h4xx0rz',
|
|
'internet',
|
|
'pretty',
|
|
'computer',
|
|
'free software',
|
|
'website',
|
|
'content',
|
|
'python',
|
|
'flask',
|
|
'poobrains',
|
|
'NOT PHP'
|
|
],
|
|
|
|
'thing_tls': [
|
|
'transport layer security',
|
|
'X.509',
|
|
'certificate'
|
|
],
|
|
|
|
'suffix': [
|
|
'wow',
|
|
'pls',
|
|
'mystery',
|
|
'anarchy',
|
|
'bees'
|
|
]
|
|
}
|
|
|
|
@app.after_request
|
|
def mkdoge(response):
|
|
|
|
items = [
|
|
DOGE['prefix'],
|
|
DOGE['thing'] + DOGE['thing_tls'] if flask.request.is_secure else DOGE['thing'],
|
|
DOGE['suffix']
|
|
]
|
|
|
|
doge = []
|
|
|
|
for l in items:
|
|
doge.append(l[random.randint(0, len(l) - 1)])
|
|
|
|
response.headers['X-Doge'] = ' '.join(doge)
|
|
return response
|
|
|
|
|
|
## ##
|
|
## waffenfunde infoscraping things ##
|
|
## ##
|
|
|
|
MONITOR_PATTERNS = ['waffenfund', 'waffe gefunden', 'waffen gefunden']
|
|
|
|
@poobrains.app.cron
|
|
def scrape_linkscores():
|
|
|
|
now = datetime.datetime.now()
|
|
period = datetime.timedelta(days=7)
|
|
count = 0 # keep track of how many scores we actually update
|
|
with click.progressbar(ScoredLink.select(), label="Update link scores where necessary", item_show_func=lambda x=None: x.link if x else '') as links: # iterates through all non-abstract Models
|
|
|
|
for link in links:
|
|
if now - period > link.updated: # update at most once per `period`
|
|
link.save() # ScoredLink scores are updated on every .save
|
|
count += 1
|
|
|
|
click.secho(f"Updated {count} link scores.", fg='green')
|
|
|
|
|
|
#@poobrains.app.cron # bitrotted inactive research, disable
|
|
def scrape_blaulicht():
|
|
|
|
|
|
owner = poobrains.auth.User.get(poobrains.auth.User.id == 2)
|
|
for pattern in MONITOR_PATTERNS:
|
|
|
|
article_urls = []
|
|
html = requests.get('http://www.presseportal.de/blaulicht/suche.htx?q=%s' % pattern, timeout=30).text
|
|
dom = bs4.BeautifulSoup(html, 'lxml')
|
|
|
|
click.echo("Beginning crawl of pagination for search pattern '%s'." % pattern)
|
|
|
|
last_page = False
|
|
while not last_page:
|
|
|
|
next_page = dom.find(attrs={'class': 'pagination-next'})
|
|
if next_page == None:
|
|
last_page = True
|
|
else:
|
|
# Why in the name of FUCK would you use spans with data-url for fucking links!?
|
|
next_page_url = 'http://www.presseportal.de/blaulicht/%s' % next_page['data-url']
|
|
|
|
for article in dom.find_all('article'):
|
|
try:
|
|
article_urls.append(article.find('h2', attrs={'class': 'news-headline'}).a['href'])
|
|
except Exception as e:
|
|
click.echo("Article appears to be without headline link, skipping")
|
|
|
|
if not last_page:
|
|
click.echo("Next page: %s" % next_page_url)
|
|
dom = bs4.BeautifulSoup(requests.get(next_page_url, timeout=30).text, 'lxml')
|
|
|
|
click.echo("URL collection done, found %d articles." % len(article_urls))
|
|
|
|
click.echo("Beginning crawl of individual articles.")
|
|
|
|
for article_url in article_urls:
|
|
|
|
url = 'http://www.presseportal.de%s' % article_url
|
|
|
|
try:
|
|
testlink = ScoredLink.get(ScoredLink.link == url)
|
|
if Source.select().where(Source.link == testlink).count():
|
|
click.echo("Already know source with link %s, skipping." % url)
|
|
continue
|
|
|
|
except ScoredLink.DoesNotExist:
|
|
pass
|
|
|
|
try:
|
|
dom = bs4.BeautifulSoup(requests.get(url, timeout=30).text, 'lxml')
|
|
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
|
|
message = '%s for %s: %s' % (type(e).__name__, url, str(e))
|
|
click.echo(message)
|
|
poobrains.app.logger.error(message)
|
|
continue
|
|
|
|
try:
|
|
org_dom = dom.find('h2', attrs={'class': 'story-company'}).a
|
|
except Exception as e:
|
|
message = "Couldn't extract source organization for %s" % url
|
|
click.echo(message)
|
|
poobrains.app.logger.error(message)
|
|
continue
|
|
|
|
try:
|
|
org = SourceOrganization.get(SourceOrganization.title == org_dom.text)
|
|
|
|
except SourceOrganization.DoesNotExist:
|
|
|
|
org_url = 'http://www.presseportal.de%s' % org_dom['href']
|
|
|
|
try:
|
|
org_link = ScoredLink.get(ScoredLink.link == org_url)
|
|
|
|
except ScoredLink.DoesNotExist:
|
|
|
|
org_link = ScoredLink()
|
|
org_link.link = org_url
|
|
org_link.save()
|
|
|
|
org = SourceOrganization()
|
|
org.name = poobrains.helpers.clean_string(org_dom.text)
|
|
org.title = org_dom.text
|
|
org.link = org_link
|
|
org.owner = owner
|
|
org.save()
|
|
|
|
try:
|
|
author = SourceAuthor.get(SourceAuthor.name == org.name)
|
|
|
|
except SourceAuthor.DoesNotExist:
|
|
|
|
author = SourceAuthor()
|
|
author.name = org.name
|
|
author.title = org.title
|
|
author.link = org.link
|
|
author.owner = owner
|
|
|
|
author.save()
|
|
|
|
|
|
try:
|
|
orgauthor = SourceOrganizationAuthor.get(SourceOrganizationAuthor.organization == org, SourceOrganizationAuthor.author == author)
|
|
|
|
except SourceOrganizationAuthor.DoesNotExist:
|
|
|
|
orgauthor = SourceOrganizationAuthor()
|
|
orgauthor.name = '%s-%s' % (org.name, author.name)
|
|
orgauthor.organization = org
|
|
orgauthor.author = author
|
|
orgauthor.owner = owner
|
|
|
|
orgauthor.save()
|
|
|
|
|
|
try:
|
|
source_link = ScoredLink.get(ScoredLink.link == url)
|
|
|
|
except ScoredLink.DoesNotExist:
|
|
|
|
source_link = ScoredLink()
|
|
source_link.link = url
|
|
|
|
source_link.save()
|
|
|
|
|
|
source_title = dom.find('h1', attrs={'class': 'story-headline'}).text.strip()
|
|
source_name = poobrains.helpers.clean_string(source_title)
|
|
|
|
try:
|
|
|
|
source = Source.get(Source.name == source_name)
|
|
click.echo("Already have a source named %s. Probably indicates duplicate names. Current URL %s" % (source_name, url))
|
|
|
|
except Source.DoesNotExist:
|
|
if poobrains.app.debug:
|
|
poobrains.app.debugger.set_trace()
|
|
|
|
date_string = dom.find(attrs={'class': 'story-date'}).text.strip().replace(u'\u2013', '-') # \u2013 is a unicode dash
|
|
|
|
source = Source()
|
|
source.link = source_link
|
|
source.type = "scrape_blaulicht"
|
|
source.author = orgauthor
|
|
source.title = source_title
|
|
source.name = source_name
|
|
source.description = dom.find(attrs={'class': 'story-text'}).text.strip().replace('<', '<').replace('>', '>')
|
|
source.date = datetime.datetime.strptime(date_string, "%d.%m.%Y - %H:%M") # format string contains a *dash*, not a minus!
|
|
source.owner = owner
|
|
|
|
source.save()
|
|
click.echo("Saved source: %s" % url)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.cli()
|