xmpp-site/main.py

662 lines
24 KiB
Python

import copy
import os
import re
import json
import requests
import tarfile
import shutil
import scss
import markdown
import bs4
import werkzeug # mainly for isinstance checks
import click
import flask
from flask import flash, abort, redirect
app = flask.Flask(__name__)
app.config.from_object('defaults')
app.config.from_object('config')
if 'RESOURCE_DIR' not in app.config:
app.config['RESOURCE_DIR'] = os.path.join(os.getcwd(), 'resources')
if 'ARTICLE_DIR' not in app.config:
app.config['ARTICLE_DIR'] = os.path.join(os.getcwd(), 'articles')
scss_compiler = scss.Compiler()
markdown_compiler = markdown.Markdown(extensions=['tables', 'footnotes'])
class ExposedException(Exception):
def __init__(self, message, status=500):
self.message = message
self.status = status
def headline_address(text):
text = text.lower()
text = text.replace(' ', '-')
# purge everything that's not 'a' to 'z' or '-' with fire
text = re.sub(r'[^a-z0-9-]', '', text)
return text
def replace_thingamabob(value, invite_info, os, client, installer):
# FIXME: Brain up a less shitty name for this.
# Data preprocessing for invite_main
client_info = app.config['CLIENT_INFO'][client]
installer_info = app.config['INSTALLER_INFO'][os][installer]
if '{invite.uri}' in value:
value = value.replace('{invite.uri}', invite_info['uri'])
if '{invite.register_url}' in value:
value = value.replace('{invite.register_url}', flask.url_for('.invite_register', token=invite_info['token'], client=client))
if '{client.name}' in value:
value = value.replace('{client.name}', client_info['title'])
if installer_info['url_support']:
if os in client_info['installer_magic']:
if '{client.url_magic}' in value:
value = value.replace('{client.url_magic}', client_info['installer_magic'][os][installer]['url'])
if '{client.url_install}' in value:
value = value.replace('{client.url_install}', client_info['installer_manual'][os][installer]['url'])
if installer_info['pkgname_support']:
if '{client.pkgname}' in value:
value = value.replace('{client.pkgname}', client_info['installer_manual'][os][installer]['pkgname'])
if 'guide' in installer_info:
if '{installer.guide}' in value:
value = value.replace('{installer.guide}', installer_info['guide'])
if '{installer.step_install}' in value:
if client_info['installer_manual'][os][installer]['gratis']:
step = installer_info['step_install']['gratis']
else:
step = installer_info['step_install']['paid']
value = value.replace('{installer.step_install}', step)
if '{register}' in value:
if 'invites' in client_info['features']:
text = app.config['REGISTER_INFO']['client']
else:
text = app.config['REGISTER_INFO']['web']
value = value.replace('{register}', replace_thingamabob(text, invite_info, os, client, installer))
return value
def page(f):
template_candidates = [
f"{f.__name__}.jinja",
"main.jinja",
]
def wrapper(*args, **kwargs):
env = {
'claim': app.config['CLAIM'],
'support_address': app.config['SUPPORT_ADDRESS'],
'menu': menu('main'),
'messages': flask.get_flashed_messages(with_categories=True),
}
try:
r = f(*args, **kwargs)
except ExposedException as e:
app.logger.error(f"Caught error: {e.message}")
return flask.render_template('error.jinja', error=e, **env), e.status
except Exception as e:
app.logger.error(f"Caught unplanned error:")
app.log_exception(e)
return flask.render_template('error.jinja', error=ExposedException('Something went wrong. :('), **env), 500
if isinstance(r, werkzeug.wrappers.Response):
return r # directly pass constructed Responses (like redirects) through
if isinstance(r, dict):
env.update(r)
else:
env['content']: r
try:
rendered = flask.render_template(template_candidates, **env)
except Exception as e:
app.logger.error(f"Caught templating error.")
app.log_exception(e)
return flask.render_template('error.jinja', error=ExposedException('Something went wrong because of a broken template. :('), **env)
return rendered
wrapper.__name__ = f.__name__ # i sure hope this doesn't fuck shit up
return wrapper
def ensure_token(token):
try:
invite_info = get_invite_info(token)
except ValueError:
raise ExposedException("Couldn't get info for this token - are you sure it's valid?", 410)
except RuntimeError:
raise ExposedException('Something seems to have gone wrong on our side.<img src="/resources/sorry.gif" alt="Sorry…" title="Sorry…" />')
if invite_info['type'] != 'register':
raise ExposedException(f"""You have some kind of invite for this service, but
this site doesn't handle this specific invite type: {invite_info['type']}.
Feel free to contact {app.config['SUPPORT_ADDRESS']}.""", 204)
return invite_info
def best_os_guess(request):
agent = request.headers["User-Agent"]
if "Android" in agent:
return "android"
elif "iPhone" in agent:
return "ios"
elif "Windows" in agent:
return "windows"
elif "Mac OS" in agent:
return "macos"
elif "Linux" in agent:
return "linux"
elif "FreeBSD" in agent:
return "freebsd"
elif "OpenBSD" in agent:
return "openbsd"
elif "NetBSD" in agent:
return "netbsd"
return "other"
def get_invite_info(token):
try:
r = requests.get(f"{app.config['API_BASE_URL']}/invite/{token}")
except requests.exceptions.ConnectionError:
raise RuntimeError("Unable to establish connection to XMPP server!")
if r.status_code == 200:
return json.loads(r.text)
elif r.status_code >= 400 and r.status_code <= 499:
raise ValueError("Invalid data!")
elif r.status_code >= 500 and r.status_code <= 599:
raise RuntimeError("Server-side error indicated!")
else:
raise RuntimeError(f"Non-specific error: {r.text}")
def register_with_invite(username, password, token):
data = {
'username': username,
'password': password,
'token': token,
}
payload = json.dumps(data)
r = requests.post(f"{app.config['API_BASE_URL']}/register", payload, headers={'Content-Type': 'application/json'})
if r.status_code == 200:
return
elif r.status_code == 409:
raise ValueError("A user with this name already exists!")
elif r.status_code >= 400 and r.status_code <= 499:
raise ValueError("Invalid data!")
elif r.status_code >= 500 and r.status_code <= 599:
raise RuntimeError("Server-side error indicated!")
else:
raise RuntimeError(f"Non-specific error: {r.text}")
def any_in(value, candidates):
return any(
map(
lambda x: x.lower() in value.lower(),
candidates
)
)
# ,- words that are considered a match
# |
# | ,- end of string or nonword
# | | char guarantees matched
# | | words stand on their own
# ↓ ↓
match_good = re.compile('^(good|yes|positive|true)($|\W)', re.IGNORECASE)
match_bad = re.compile('^(bad|no|negative|false)($|\W)', re.IGNORECASE)
match_meh = re.compile('^(meh|partial|unverified|mild)($|\W)', re.IGNORECASE)
def article_load(name):
article_path = os.path.join(app.config['ARTICLE_DIR'], name + '.md')
if os.path.exists(article_path):
with open(article_path, 'r', encoding='UTF-8') as fd:
text = fd.read()
text = markdown_compiler.convert(text)
markdown_compiler.reset()
soup = bs4.BeautifulSoup(text, 'html.parser')
# Extract title and subtitle for specialized presentation
header = False
title = None
subtitle = None
scan_offset = 0
for element in soup.children:
if isinstance(element, bs4.Tag):
if title is None and element.name == 'h1':
title = element.text
element.decompose()
scan_offset += 1
continue # skip to next loop iteration
elif title != None and subtitle is None and element.name == 'h2':
subtitle = element.text
element.decompose()
scan_offset += 1
break # terminate loop
else:
# terminate loop, if first tag isn't h1 we don't have to
# scan the rest of the document
break
# Split rest of content into sections based on h2 elements
first_scanned_element = soup.contents[scan_offset]
if type(first_scanned_element) == bs4.element.Tag and first_scanned_element.name == 'section':
header = str(first_scanned_element) # if the text (after title and subtitle) begins with a <section>, treat it as dedicated header
first_scanned_element.decompose()
section = soup.new_tag('section')
section['class'] = 'text'
soup.contents[scan_offset].insert_before(section)
for element in soup.contents[scan_offset:]:
if type(element) == bs4.element.Tag:
if element != section:
if element.name in ['h2', 'h3', 'h4', 'h5', 'h6']:
name = headline_address(element.text)
link = soup.new_tag('a')
link['name'] = name
link['href'] = f'#{name}'
link.append(element.text)
element.clear()
element.append(link)
if element.name == 'h2' and len(section):
section = soup.new_tag('section')
section['class'] = 'text'
element.insert_before(section)
elif element.name == 'section':
# let existing sections stand on their own, start new section afterwards
section = soup.new_tag('section')
section['class'] = 'text'
element.insert_after(section)
continue # skip rest of the loop, do not add this section
section.append(element.extract())
for td in soup.find_all('td'):
if len(td) and isinstance(td.contents[0], bs4.NavigableString):
subject = td.contents[0].string
else:
subject = td.text
if match_bad.match(subject):
td['class'] = 'bad'
elif match_good.match(subject):
td['class'] = 'good'
elif match_meh.match(subject):
td['class'] = 'meh'
for a in soup.find_all('a'):
if a['href'].startswith('http://'):
a['href'] = 'https://' + a['href'][7:]
if a['href'].startswith('https://'):
a['target'] = '_blank'
a['rel'] = 'noreferrer'
text = str(soup)
return {
'title': title,
'subtitle': subtitle,
'header': header,
'text': text
}
abort(404, "No such content. *sad server noises*")
def menu(name=None):
links = [
{
'url': '/',
'caption': 'Home',
},
{
'url': flask.url_for('.article', name='chatcontrol'),
'caption': 'ChatControl',
},
{
'url': flask.url_for('.article', name='x-as-in-freedom'),
'caption': '<em>X</em> as in <em>freedom</em>',
},
{
'url': flask.url_for('.article', name='foss-socialism'),
'caption': 'Why FOSS is Socialism',
},
#{
# 'url': flask.url_for('.article', name='adversaries'),
# 'caption': 'Adversaries'
#},
{
'url': flask.url_for('.clients'),
'caption': 'Supported clients',
},
{
'url': flask.url_for('.client'),
'caption': 'Web client',
},
]
for link in links:
if flask.request.path == link['url']:
link['active'] = True # propagates to links[idx]['active']
else:
link['active'] = False # dito
return flask.render_template('menu.jinja', links=links, name=name)
@app.cli.command()
@click.option('--version', default='9.1.1')
def install_conversejs(version):
directory = 'resources/converse'
directory_dist = f'{directory}/dist'
version_name = f'converse.js-{version}'
file_name = f'{version_name}.tgz'
file_path = f'{directory}/{file_name}'
archive_url = f'https://github.com/conversejs/converse.js/releases/download/v{version}/{file_name}'
if not os.path.exists(directory):
click.secho(f"Created directory {directory}.", fg='green')
os.mkdir(directory)
if not os.path.isdir(directory):
click.secho(f"Not a directory: {directory}", fg='red')
click.secho(f"Downloading archive '{file_name}'")
response = requests.get(archive_url)
if response.status_code == 200:
click.secho("Success!", fg='green')
click.secho("Saving…")
with open(file_path, 'wb') as file:
file.write(response.content)
click.secho("Extracting archive…")
member_whitelist = []
with tarfile.open(file_path) as tar:
for member in tar.getmembers():
if member.name.startswith('package/dist'):
member_whitelist.append(member)
tar.extractall(directory_dist, members=member_whitelist)
for extracted_name in os.listdir(f'{directory_dist}/package/dist'):
shutil.move(
f'{directory_dist}/package/dist/{extracted_name}',
f'{directory_dist}/{extracted_name}'
)
shutil.rmtree(f'{directory_dist}/package')
@app.route('/resources/<path:name>')
def serve_resource(name):
resource_path = os.path.join(app.config['RESOURCE_DIR'], name)
if os.path.exists(resource_path):
extension = name.split('.')[-1]
if extension == 'scss':
return flask.Response(scss_compiler.compile(resource_path), mimetype='text/css')
return flask.send_file(resource_path)
else:
return flask.Response("No such resource.", status=404)
@app.route('/')
@page
def home():
return article_load('home')
@app.route('/article/<string:name>')
@page
def article(name):
return article_load(name)
@app.route('/clients/')
@page
def clients():
client_info = {}
for client_name, info in app.config['CLIENT_INFO'].items():
#current_client_info = {}
current_client_info = info
if os.path.exists(os.path.join(app.config['RESOURCE_DIR'], 'third-party-logos', f'{client_name}.svg')):
current_client_info['logo'] = f'/resources/third-party-logos/{client_name}.svg'
client_info[client_name] = current_client_info
os_info = {}
for os_name, info in app.config['OS_INFO'].items():
current_os_info = info
if os.path.exists(os.path.join(app.config['RESOURCE_DIR'], 'third-party-logos', f'{os_name}.svg')):
current_os_info['logo'] = f'/resources/third-party-logos/{os_name}.svg'
os_info[os_name] = current_os_info
return {
'client_info': client_info,
'feature_info': app.config['FEATURE_INFO'],
'installer_info': app.config['INSTALLER_INFO'],
'os_info': os_info
}
@app.route('/invite/<string:token>/')
@page
def invite_main(token):
invite_info = ensure_token(token)
welcome_msg = f'''You have been invited to register on {app.config['SERVICE_NAME']}<br /> <span class="claim">{app.config['CLAIM']}</span>'''
probable_os = best_os_guess(flask.request)
# redirect to #fragment of detected OS
# ?r acts as marker that we already redirected, to avoid looping
if not 'r' in flask.request.args:
return redirect(f'?r#{probable_os}')
processed_info = {}
for os_name, os_info in app.config['OS_INFO'].items():
current_os_info = {
'title': os_info['title'],
'client_recommend': os_info['client_recommend'],
'client_other': [],
'clients': {},
}
for client_name, client_info in app.config['CLIENT_INFO'].items():
if os_name in client_info['os']:
if client_name != current_os_info['client_recommend']:
current_os_info['client_other'].append(client_name)
current_client_info = {
'title': client_info['title'],
'description': client_info['description'],
'features': client_info['features'],
'project_url': client_info['project_url'],
'os': client_info['os'],
'installer_recommend': client_info['installer_recommend'][os_name],
}
if os.path.exists(os.path.join(app.config['RESOURCE_DIR'], 'third-party-logos', f'{client_name}.svg')):
current_client_info['logo'] = f'/resources/third-party-logos/{client_name}.svg'
if 'help_url' in client_info:
current_client_info['help_url'] = client_info['help_url']
if os_name in client_info['installer_magic']:
current_client_info['installer_magic'] = client_info['installer_magic'][os_name]
for magic_installer in current_client_info['installer_magic']:
# string replacement to inject invite redirect in magic installer URLs
current_client_info['installer_magic'][magic_installer]['url'] = current_client_info['installer_magic'][magic_installer]['url'].replace('{invite.uri}', flask.helpers.url_quote(invite_info['uri'])) # this line is a monster
current_client_info['installer_manual'] = client_info['installer_manual'][os_name]
# collect non-recommended installers for this OS
# we assume 'installer_manual' to hold *all* installers for this OS
current_client_info['installer_other'] = set(current_client_info['installer_manual'].keys())
current_client_info['installer_other'].remove(current_client_info['installer_recommend'])
if os_name in client_info['installer_magic'] and current_client_info['installer_recommend'] in current_client_info['installer_magic']:
current_client_info['action_recommend'] = 'installer_magic'
else:
current_client_info['action_recommend'] = 'installer_manual'
current_client_info['installers'] = {}
for installer_name, installer_info in app.config['INSTALLER_INFO'][os_name].items():
if installer_name in client_info['installer_manual'][os_name]:
current_installer_info = {
'title': installer_info['title'],
'gratis': current_client_info['installer_manual'][installer_name]['gratis'],
'url_support': installer_info['url_support'],
'magic_support': installer_info['magic_support'],
'pkgname_support': installer_info['pkgname_support'],
'guide_package': installer_info['guide_package'],
}
# list of keys over whose values we want to run string replacement
keys_replace = ['guide_package']
if installer_info['magic_support']:
current_installer_info['guide_magic'] = installer_info['guide_magic']
keys_replace.append('guide_magic')
if 'guide' in installer_info: # pre-installed installers don't have 'guide'
current_installer_info['guide'] = installer_info['guide']
# prepend 'guide'; has to come first so guide with
# replacements is available in 'guide_package'.
keys_replace = ['guide'] + keys_replace
for k in keys_replace:
value = replace_thingamabob(current_installer_info[k], invite_info, os_name, client_name, installer_name)
current_installer_info[k] = value
current_client_info['installers'][installer_name] = current_installer_info
current_os_info['clients'][client_name] = current_client_info
processed_info[os_name] = current_os_info
return {
'probable_os': probable_os,
'welcome_msg': welcome_msg,
'info': processed_info,
'feature_info': app.config['FEATURE_INFO'],
}
@app.route('/invite/<string:token>/register/', methods=['GET', 'POST'])
@page
def invite_register(token):
invite_info = ensure_token(token)
fakepost = 'fakepost' in flask.request.args
if flask.request.method == 'POST' or fakepost:
if ('username' in flask.request.form and 'password' in flask.request.form) or fakepost:
if fakepost:
username = "FNORD"
password = "FNORD"
else:
username = flask.request.form['username']
password = flask.request.form['password']
try:
if not fakepost:
register_with_invite(username, password, token)
except ValueError as e:
return e.args[0]
except RuntimeError as e:
return { 'message': "Sorry, that didn't work. Please try again." }
else:
data = {
'help': app.config['HELP_INFO'],
'message': """Congratulations, your account
is ready for use!""",
'username': username,
'password': password,
'jid': f"{ username }@{ app.config['XMPP_DOMAIN']}",
}
if 'client' in flask.request.args:
data['client'] = flask.request.args['client']
data['client_info'] = app.config['CLIENT_INFO'][data['client']]
flask.session['jid'] = data['jid']
flask.session['password'] = data['password']
return data
else:
return { 'message': "Malformatted data." }
else:
return {
'help': app.config['HELP_INFO'],
'message': "Almost there! Just tell us what your new identity should be.",
'domain': app.config['XMPP_DOMAIN'],
'form': True,
}
@app.route('/client')
@page
def client():
return {
'bosh_url': app.config.get('BOSH_URL'),
'websocket_url': app.config.get('WEBSOCKET_URL'),
'jid': flask.session['jid'] if 'jid' in flask.session else None,
'password': flask.session['password'] if 'password' in flask.session else None,
}
if __name__ == '__main__':
app.config['DEBUG'] = True
app.run('0.0.0.0')