poobrains-pdn/pdn.py

622 lines
19 KiB
Python
Executable File

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import math
import string
import os
import random
import datetime
import requests
import bs4
import markdown
import click
import flask
import poobrains
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont, ImageSequence
app = poobrains.app
@app.route('/')
def front():
try:
return poobrains.redirect(poobrains.auth.User.load('phryk').url('full'))
except Exception:
return poobrains.redirect(Article.url())
class MemePattern(markdown.inlinepatterns.Pattern):
name = None
def __init__(self, pattern, name, md=None):
super(MemePattern, self).__init__(pattern, md=md)
self.name = name
def handleMatch(self, match):
if match:
caption = match.group(2)
if not MemeWhiteList.select().where(MemeWhiteList.caption == caption).count():
cache_entry = MemeWhiteList()
cache_entry.caption = caption
cache_entry.save()
element = markdown.util.etree.Element('img')
element.set('src', "/meme/%s/%s" % (self.name, caption))
element.set('alt', caption)
return element
return super(MemePattern, self).handleMatch(match)
class Memextension(markdown.Extension):
def extendMarkdown(self, md, md_globals):
if 'MEMES' in app.config:
for name in app.config['MEMES']:
md.inlinePatterns.add(name, MemePattern('<%s>(.*?)</%s>' % (name, name), name), '<reference')
poobrains.md.md.registerExtensions([Memextension()], [])
@app.expose('/meme/<string:name>/<string:caption>')
class Mememage(poobrains.auth.Protected):
def view(self, mode='full', name=None, caption=None):
if name in app.config['MEMES'] and MemeWhiteList.select().where(MemeWhiteList.caption == caption).count():
# TODO: is input sanitation still needed?
if ':' in caption and len(caption.split(':')) == 2:
upper, lower = caption.split(':')
else:
upper = None
lower = caption
filename = os.path.join(app.root_path, app.config['MEMES'][name])
extension = filename.split('.')[-1]
template = Image.open(filename)
font = ImageFont.truetype(os.path.join(app.root_path, 'LeagueGothic-Regular.otf'), 80)
resized = (750, int(template.height * (750.0 / template.width)))
if upper:
upper_size = font.getsize(upper)
upper_x = int(round(resized[0] / 2.0 - upper_size[0] / 2.0))
upper_y = 0
if lower:
lower_size = font.getsize(lower)
lower_x = int(round(resized[0] / 2.0 - lower_size[0] / 2.0))
lower_y = resized[1] - lower_size[1] - 10 # last int is margin from bottom
if extension == 'gif':
frames = []
for frame in ImageSequence.Iterator(template):
frame = frame.convert('RGBA').resize(resized, Image.BICUBIC)
text_layer = Image.new('RGBA', frame.size, (0,0,0,0))
text_draw = ImageDraw.Draw(text_layer)
if upper:
outlined_text(text_draw, upper, upper_x, upper_y, font=font)
if lower:
outlined_text(text_draw, lower, lower_x, lower_y, font=font)
frames.append(Image.alpha_composite(frame, text_layer))
meme = frames.pop(0).convert('P')
if 'duration' in template.info:
meme.info['duration'] = template.info['duration']
if 'loop' in template.info:
meme.info['loop'] = template.info['loop']
out = BytesIO()
meme.save(out, save_all=True, append_images=frames, format='GIF')
r = flask.Response(
out.getvalue(),
mimetype='image/gif'
)
r.cache_control.public = True
r.cache_control.max_age = 604800
return r
else:
meme = template.convert('RGBA').resize(resized, Image.BICUBIC)
text_layer = Image.new('RGBA', resized, (0,0,0,0))
text_draw = ImageDraw.Draw(text_layer)
if upper:
outlined_text(text_draw, upper, upper_x, upper_y, font=font)
if lower:
outlined_text(text_draw, lower, lower_x, lower_y, font=font)
meme = Image.alpha_composite(meme, text_layer)
out = BytesIO()
meme.save(out, format='PNG')
#img.save(filename='memes/foo.png')
r = flask.Response(
out.getvalue(),
mimetype='image/png'
)
r.cache_control.public = True
r.cache_control.max_age = 604800
return r
raise poobrains.auth.AccessDenied()
def outlined_text(drawing, text, x=0, y=0, font=None):
drawing.text((x-1, y-1), text, font=font, fill=(0,0,0,255))
drawing.text((x-1, y+1), text, font=font, fill=(0,0,0,255))
drawing.text((x+1, y+1), text, font=font, fill=(0,0,0,255))
drawing.text((x+1, y-1), text, font=font, fill=(0,0,0,255))
drawing.text((x, y), text, font=font, fill=(255,255,255,255))
# content types
class MemeWhiteList(poobrains.storage.Model):
# TODO: add functionality to remove references to deleted/removed storable instances
caption = poobrains.storage.fields.CharField()
class ScoredLink(poobrains.auth.Administerable):
class Meta:
form_blacklist = ['id', 'external_site_count', 'updated']
link = poobrains.storage.fields.CharField(unique=True) # TODO: Add an URLField to poobrains.
external_site_count = poobrains.storage.fields.IntegerField(null=True)
updated = poobrains.storage.fields.DateTimeField(null=False, default=datetime.datetime.now)
mean = None
median = None
set_size = None
def scrape_external_site_count(self):
external_site_count = 0
if self.link:
link_domain = self.link.split('/')[2]
html = requests.get(self.link, timeout=30).text
dom = bs4.BeautifulSoup(html, 'lxml')
scored_elements = {
'script': 'src',
'link': 'href',
'img': 'src',
'object': 'data'
}
for tag, attribute in scored_elements.items():
for element in dom.find_all(tag):
attribute_value = element.get(attribute)
if isinstance(attribute_value, str) and attribute_value.find('://') >= 0: # means this isn't a relative link
attribute_domain = attribute_value.split('/')[2]
if attribute_domain != link_domain and \
not attribute_domain.endswith('.%s' % link_domain): # whether attribute_domain is a subdomain of link_domain
external_site_count += 1
return external_site_count
def save(self, *args, **kwargs):
try:
self.external_site_count = self.scrape_external_site_count()
self.updated = datetime.datetime.now()
except Exception as e: # Match all errors so failures here don't interfere with normal operations
poobrains.app.logger.error('Could not scrape external site count for URL: %s' % self.link)
poobrains.app.logger.debug('Problem when scraping external site count: %s: %s' % (str(type(e)), str(e)))
#if app.debug:
# raise # break hard in debug mode to make it easier to find problems
return super(ScoredLink, self).save(*args, **kwargs)
@property
def set_size(self):
return self.__class__.select().count()
@property
def external_site_counts(self):
external_site_counts = []
for row in self.__class__.select(self.__class__.external_site_count).where(self.__class__.external_site_count != None).order_by(self.__class__.external_site_count).dicts():
external_site_counts.append(row['external_site_count'])
return external_site_counts
@property
def median(self):
median_idx = int(math.floor(len(self.external_site_counts) / 2.0))
if len(self.external_site_counts) % 2 == 0:
a = self.external_site_counts[median_idx -1]
b = self.external_site_counts[median_idx]
median = (a + b) / 2.0
else:
median = float(self.external_site_counts[median_idx])
return median
@property
def mean(self):
return sum(self.external_site_counts) / float(len(self.external_site_counts))
@property
def name(self):
return self.link
@app.expose('/source/organization/', mode='full')
class SourceOrganization(poobrains.commenting.Commentable):
parent = poobrains.storage.fields.ForeignKeyField('self', null=True)
title = poobrains.storage.fields.CharField()
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
description = poobrains.md.MarkdownField(null=True)
@app.expose('/source/author/', mode='full')
class SourceAuthor(poobrains.commenting.Commentable):
title = poobrains.storage.fields.CharField()
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
description = poobrains.md.MarkdownField(null=True)
@app.expose('/source/organizationauthor/', mode='full')
class SourceOrganizationAuthor(poobrains.commenting.Commentable):
organization = poobrains.storage.fields.ForeignKeyField(SourceOrganization)
author = poobrains.storage.fields.ForeignKeyField(SourceAuthor)
@app.expose('/source/', mode='full')
class Source(poobrains.commenting.Commentable):
title = poobrains.storage.fields.CharField()
type = poobrains.storage.fields.CharField() # TODO: We need some logic to make this useful. Also, build enum type compatible to sqlite+postgres?
author = poobrains.storage.fields.ForeignKeyField(SourceOrganizationAuthor)
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
description = poobrains.md.MarkdownField()
@app.expose('/article/', mode='full')
class Article(poobrains.commenting.Commentable):
title = poobrains.storage.fields.CharField()
text = poobrains.md.MarkdownField()
@app.expose('/projects/', mode='full')
class Project(poobrains.commenting.Commentable):
title = poobrains.storage.fields.CharField()
text = poobrains.md.MarkdownField()
link = poobrains.storage.fields.CharField()
@app.expose('/curated/', mode='full')
class CuratedContent(poobrains.commenting.Commentable):
title = poobrains.storage.fields.CharField()
description = poobrains.md.MarkdownField()
link = poobrains.storage.fields.ForeignKeyField(ScoredLink, null=True)
@app.site.box('menu_main')
def menu_main():
menu = poobrains.rendering.Menu('main')
try:
menu.append(Article.url('teaser'), 'Articles')
except poobrains.auth.AccessDenied:
pass
try:
menu.append(Project.url('teaser'), 'Projects')
except poobrains.auth.AccessDenied:
pass
try:
CuratedContent.url('teaser')
menu.append(CuratedContent.url('teaser'), 'Curated content')
except poobrains.auth.AccessDenied:
pass
try:
menu.append(Source.url('teaser'), 'Sources')
except poobrains.auth.AccessDenied:
pass
for url, caption in poobrains.auth.Page.main_menu_entries():
menu.append(url, caption)
return menu
DOGE = {
'prefix': [
'wow',
'such',
'many',
'more',
'so',
'lol',
'very',
'omg'
],
'thing': [
'wow',
'doge',
'shibe',
'1337 h4xx0rz',
'internet',
'pretty',
'computer',
'free software',
'website',
'content',
'python',
'flask',
'poobrains',
'NOT PHP'
],
'thing_tls': [
'transport layer security',
'X.509',
'certificate'
],
'suffix': [
'wow',
'pls',
'mystery',
'anarchy',
'bees'
]
}
@app.after_request
def mkdoge(response):
items = [
DOGE['prefix'],
DOGE['thing'] + DOGE['thing_tls'] if flask.request.is_secure else DOGE['thing'],
DOGE['suffix']
]
doge = []
for l in items:
doge.append(l[random.randint(0, len(l) - 1)])
response.headers['X-Doge'] = ' '.join(doge)
return response
## ##
## waffenfunde infoscraping things ##
## ##
MONITOR_PATTERNS = ['waffenfund', 'waffe gefunden', 'waffen gefunden']
@poobrains.app.cron
def scrape_linkscores():
now = datetime.datetime.now()
period = datetime.timedelta(days=7)
count = 0 # keep track of how many scores we actually update
with click.progressbar(ScoredLink.select(), label="Update link scores where necessary", item_show_func=lambda x=None: x.link if x else '') as links: # iterates through all non-abstract Models
for link in links:
if now - period > link.updated: # update at most once per `period`
link.save() # ScoredLink scores are updated on every .save
count += 1
click.secho(f"Updated {count} link scores.", fg='green')
#@poobrains.app.cron # bitrotted inactive research, disable
def scrape_blaulicht():
owner = poobrains.auth.User.get(poobrains.auth.User.id == 2)
for pattern in MONITOR_PATTERNS:
article_urls = []
html = requests.get('http://www.presseportal.de/blaulicht/suche.htx?q=%s' % pattern, timeout=30).text
dom = bs4.BeautifulSoup(html, 'lxml')
click.echo("Beginning crawl of pagination for search pattern '%s'." % pattern)
last_page = False
while not last_page:
next_page = dom.find(attrs={'class': 'pagination-next'})
if next_page == None:
last_page = True
else:
# Why in the name of FUCK would you use spans with data-url for fucking links!?
next_page_url = 'http://www.presseportal.de/blaulicht/%s' % next_page['data-url']
for article in dom.find_all('article'):
try:
article_urls.append(article.find('h2', attrs={'class': 'news-headline'}).a['href'])
except Exception as e:
click.echo("Article appears to be without headline link, skipping")
if not last_page:
click.echo("Next page: %s" % next_page_url)
dom = bs4.BeautifulSoup(requests.get(next_page_url, timeout=30).text, 'lxml')
click.echo("URL collection done, found %d articles." % len(article_urls))
click.echo("Beginning crawl of individual articles.")
for article_url in article_urls:
url = 'http://www.presseportal.de%s' % article_url
try:
testlink = ScoredLink.get(ScoredLink.link == url)
if Source.select().where(Source.link == testlink).count():
click.echo("Already know source with link %s, skipping." % url)
continue
except ScoredLink.DoesNotExist:
pass
try:
dom = bs4.BeautifulSoup(requests.get(url, timeout=30).text, 'lxml')
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
message = '%s for %s: %s' % (type(e).__name__, url, str(e))
click.echo(message)
poobrains.app.logger.error(message)
continue
try:
org_dom = dom.find('h2', attrs={'class': 'story-company'}).a
except Exception as e:
message = "Couldn't extract source organization for %s" % url
click.echo(message)
poobrains.app.logger.error(message)
continue
try:
org = SourceOrganization.get(SourceOrganization.title == org_dom.text)
except SourceOrganization.DoesNotExist:
org_url = 'http://www.presseportal.de%s' % org_dom['href']
try:
org_link = ScoredLink.get(ScoredLink.link == org_url)
except ScoredLink.DoesNotExist:
org_link = ScoredLink()
org_link.link = org_url
org_link.save()
org = SourceOrganization()
org.name = poobrains.helpers.clean_string(org_dom.text)
org.title = org_dom.text
org.link = org_link
org.owner = owner
org.save()
try:
author = SourceAuthor.get(SourceAuthor.name == org.name)
except SourceAuthor.DoesNotExist:
author = SourceAuthor()
author.name = org.name
author.title = org.title
author.link = org.link
author.owner = owner
author.save()
try:
orgauthor = SourceOrganizationAuthor.get(SourceOrganizationAuthor.organization == org, SourceOrganizationAuthor.author == author)
except SourceOrganizationAuthor.DoesNotExist:
orgauthor = SourceOrganizationAuthor()
orgauthor.name = '%s-%s' % (org.name, author.name)
orgauthor.organization = org
orgauthor.author = author
orgauthor.owner = owner
orgauthor.save()
try:
source_link = ScoredLink.get(ScoredLink.link == url)
except ScoredLink.DoesNotExist:
source_link = ScoredLink()
source_link.link = url
source_link.save()
source_title = dom.find('h1', attrs={'class': 'story-headline'}).text.strip()
source_name = poobrains.helpers.clean_string(source_title)
try:
source = Source.get(Source.name == source_name)
click.echo("Already have a source named %s. Probably indicates duplicate names. Current URL %s" % (source_name, url))
except Source.DoesNotExist:
if poobrains.app.debug:
poobrains.app.debugger.set_trace()
date_string = dom.find(attrs={'class': 'story-date'}).text.strip().replace(u'\u2013', '-') # \u2013 is a unicode dash
source = Source()
source.link = source_link
source.type = "scrape_blaulicht"
source.author = orgauthor
source.title = source_title
source.name = source_name
source.description = dom.find(attrs={'class': 'story-text'}).text.strip().replace('<', '&lt;').replace('>', '&gt;')
source.date = datetime.datetime.strptime(date_string, "%d.%m.%Y - %H:%M") # format string contains a *dash*, not a minus!
source.owner = owner
source.save()
click.echo("Saved source: %s" % url)
if __name__ == '__main__':
app.cli()