1296 lines
42 KiB

""" A webframework for aspiring media terrorists. """
import os
import sys
import types
import collections
import copy
import functools
import pathlib # only needed to pass a pathlib.Path to scss compiler
import datetime
import logging
import json
import bson
import OpenSSL as openssl
import nacl.utils
import nacl.secret
import werkzeug
import click
import flask
import jinja2
from playhouse import db_url
import peewee
import scss # pyScss
# imports needed for scss handle_import copy in SCSSCore
from itertools import product
from scss.source import SourceFile
from pathlib import PurePosixPath
# comfort imports to expose flask functionality directly through poobrains
from flask import request, session, redirect, flash, abort, url_for, g
#from flask.helpers import locked_cached_property
from jinja2 import Markup
# internal imports
from . import helpers
from . import errors
from . import defaults
locked_cached_property = helpers.locked_cached_property
#db_url.schemes['sqlite'] = db_url.schemes['sqliteext'] # Make sure we get the extensible sqlite database, so we can make regular expressions case-sensitive. see https://github.com/coleifer/peewee/issues/1221
db_url.schemes['sqlite'] = functools.partial(db_url.schemes['sqliteext'], pragmas={'foreign_keys': 'on'})
import __main__ # to look up project name
if hasattr(__main__, '__file__'):
project_name = os.path.splitext(os.path.basename(__main__.__file__))[0] # basically filename of called file - extension
project_name = "REPL" # We're probably in a REPL, right? <- I think this assumption is wrong.
import config # imports config relative to main project
except ImportError as e:
config = types.ModuleType('config')
def is_renderable(x):
""" jinja test to check if a value can be rendered """
return hasattr(x, 'render') and callable(x.render) # not checking for inheritance here so MarkdownString matches, too.
class FormDataParser(werkzeug.formparser.FormDataParser):
def parse(self, *args, **kwargs):
stream, form_flat, files_flat = super(FormDataParser, self).parse(*args, **kwargs)
flat_data = {
'form': form_flat,
'files': files_flat
processed_data = {
'form': werkzeug.datastructures.MultiDict(),
'files': werkzeug.datastructures.MultiDict()
for subject, data in flat_data.items():
for key in data.keys():
current = processed_data[subject] # we're editing this dict via reference, the side-effects are critically required in this case
segments = key.split('.')
for segment in segments[:-1]: # iterate through segments, except for the last one, which has no dict, but finally a value associated to it
if not segment in current:
current[segment] = werkzeug.datastructures.MultiDict()
elif isinstance(current[segment], str): # means we're handling data from a TabbedFieldsets object, which has a value *and* is a container.
value = current[segment]
current[segment] = werkzeug.datastructures.MultiDict()
current[segment]['value'] = value
current = current[segment] # travel one level further "down" in the dict
current.setlist(segments[-1], data.getlist(key)) # handles single values as well as multivalues
#if 'submit' in form_flat:
if subject == 'form' and 'submit' in data:
current = processed_data[subject]
segments = form_flat['submit'].split('.')
for segment in segments[:-1]: # iterate through submit segments
if not segment in current:
current[segment] = werkzeug.datastructures.MultiDict()
if isinstance(current[segment], str): # means we're handling data from a TabbedFieldsets object, which has a value *and* is a container.
value = current[segment]
current[segment] = werkzeug.datastructures.MultiDict()
current[segment]['value'] = value
elif not segment in current:
current[segment] = werkzeug.datastructures.MultiDict() # add a new empty multidict for this segment at the current level
current = current[segment] # dive into the next-deeper level in processed_data['form']
current[segments[-1]] = True # finally, at the bottommost level, set the value for the submitting button to True
# TODO: Make form ImmutableDict again?
return (stream, processed_data['form'], processed_data['files'])
class Request(flask.Request):
form_data_parser_class = FormDataParser
def close(self):
files = self.__dict__.get('files')
if files:
for f in helpers.flatten_nested_multidict(files):
class Response(flask.Response):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'none'; style-src 'self' 'unsafe-inline'; font-src 'self'; worker-src 'none'; frame-ancestors 'self'" # CSP to disallow client-side scripting and third-party requests
if hasattr(config, 'HSTS_MAX_AGE'):
hsts_max_age = config.HSTS_MAX_AGE
hsts_max_age = 3600
if not app.debug:
self.headers['Strict-Transport-Security'] = f'max-age={hsts_max_age}'
def set_cookie(self, *args, **kwargs):
# always set samesite for cookies, defaulting to 'strict' because security.
if hasattr(config, 'COOKIE_SAMESITE'):
kwargs['samesite'] = config.COOKIE_SAMESITE
kwargs['samesite'] = 'strict'
return super().set_cookie(*args, **kwargs)
class DummySession(werkzeug.datastructures.CallbackDict, flask.sessions.SessionMixin):
new = False
modified = False
accessed = False
new_session_funcs = set()
def new_session(f):
return f
class DebugDict(dict):
A dictionary recursively casting dictionary items of itself
to its own type, placing a breakpoint for every modification.
def __init__(self, *args, **kwargs):
if isinstance(args[0], dict):
for key, value in args[0].items():
if isinstance(value, dict):
args[0][key] = self.__class__(value)
super().__init__(*args, **kwargs)
def __setitem__(self, name, value):
if isinstance(value, dict):
value = self.__class__(value)
super().__setitem__(name, value)
class Session(werkzeug.datastructures.CallbackDict, flask.sessions.SessionMixin):
permanent = True # doesn't really apply to server-side sessions since we don't get told when a client-side cookie is deleted
def __init__(self, sid=None, key=None, initial=None):
if not sid:
self.sessiondata = storage.SessionData()
self.sid = None
self.key = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE)
self.crypto = nacl.secret.SecretBox(self.key)
# call all @new_session decorated functions with this session object
for func in new_session_funcs:
if not key:
raise TypeError("Session initialized with sid but no key!")
self.sessiondata = storage.SessionData.load(sid)
self.key = key
self.crypto = nacl.secret.SecretBox(self.key)
plaintext = self.crypto.decrypt(bytes(self.sessiondata.data))
for k, v in bson.loads(plaintext).items():
self[k] = v
self.sid = sid
except storage.SessionData.DoesNotExist: # throw away unknown sids and create new sessiondata
self.sessiondata = storage.SessionData()
self.sid = None
self.key = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE)
self.crypto = nacl.secret.SecretBox(self.key)
# call all @new_session decorated functions with this session object
for func in new_session_funcs:
def on_update(self):
self.modified = True
self.accessed = True
super(Session, self).__init__(initial, on_update)
# def __setitem__(self, name, value):
# """ cast dict items to DebugDict so we can debug where what data comes from """
# app.debugger.set_trace()
# if isinstance(value, dict):
# value = DebugDict(value)
# super().__setitem__(name, value)
def expires(self):
return self.sessiondata.expires
def save(self):
self.sessiondata.expires = datetime.datetime.utcnow() + app.permanent_session_lifetime
self.sessiondata.data = self.crypto.encrypt(bson.dumps(dict(self)))
if not self.sid:
self.sid = self.sessiondata.sid
def delete_instance(self):
return self.sessiondata.delete_instance()
class SessionInterface(flask.sessions.SessionInterface):
def open_session(self, app, request):
cookie = request.cookies.get(app.session_cookie_name)
if not request.is_secure:
return DummySession()
if cookie:
untrusted = json.loads(cookie)
except json.decoder.JSONDecodeError:
#data = {'sid': None, 'key': None}
#raise errors.ExposedError("Fucky session. ( ͡° ͜ʖ ͡°)")
return Session()
#if not ('sid' in untrusted and 'key' in untrusted
data = untrusted
if 'sid' in untrusted\
and 'key' in untrusted\
and isinstance(untrusted['sid'], str)\
and isinstance(untrusted['key'], str):
return Session(data['sid'], key=bytes.fromhex(data['key']))
return DummySession() # a bit evil
return Session()
def should_set_cookie(self, app, session):
return super(SessionInterface, self).should_set_cookie(app, session) and flask.request.is_secure
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if session.modified:
if self.should_set_cookie(app, session):
if not session.sid:
sid = ''
sid = session.sid.hex
httponly = self.get_cookie_httponly(app)
secure = True
samesite = self.get_cookie_samesite(app)
expires = session.expires
json.dumps({'sid': sid, 'key': session.key.hex()}),
# Enable URL parameters like regex("[a-z]+")
class RegexConverter(werkzeug.routing.BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
class SCSSCore(scss.extension.core.CoreExtension):
# this function is a copy of scss.extension.core.CoreExtension.handle_import
# with adjusted search_path ordering, meaining this needs to be updated,
# at least when there's been a security flaw in this fixed upstream
# but how do I find out?
def handle_import(self, name, compilation, rule):
"""Implementation of the core Sass import mechanism, which just looks
for files on disk.
# TODO this is all not terribly well-specified by Sass. at worst,
# it's unclear how far "upwards" we should be allowed to go. but i'm
# also a little fuzzy on e.g. how relative imports work from within a
# file that's not actually in the search path.
# TODO i think with the new origin semantics, i've made it possible to
# import relative to the current file even if the current file isn't
# anywhere in the search path. is that right?
path = PurePosixPath(name)
search_exts = list(compilation.compiler.dynamic_extensions)
if path.suffix and path.suffix in search_exts:
basename = path.stem
basename = path.name
relative_to = path.parent
search_path = [] # tuple of (origin, start_from)
(origin, relative_to)
for origin in compilation.compiler.search_path
if relative_to.is_absolute():
relative_to = PurePosixPath(*relative_to.parts[1:])
elif rule.source_file.origin:
# Search relative to the current file first, only if not doing an
# absolute import
rule.source_file.relpath.parent / relative_to,
for prefix, suffix in product(('_', ''), search_exts):
filename = prefix + basename + suffix
for origin, relative_to in search_path:
relpath = relative_to / filename
# Lexically (ignoring symlinks!) eliminate .. from the part
# of the path that exists within Sass-space. pathlib
# deliberately doesn't do this, but os.path does.
relpath = PurePosixPath(os.path.normpath(str(relpath)))
if rule.source_file.key == (origin, relpath):
# Avoid self-import
# TODO is this what ruby does?
path = origin / relpath
if not path.exists():
# All good!
# TODO if this file has already been imported, we'll do the
# source preparation twice. make it lazy.
return SourceFile.read(origin, relpath)
class Poobrain(flask.Flask):
session_interface = SessionInterface()
request_class = Request
response_class = Response
debugger = None
site = None
admin = None
boxes = None
resource_extension_whitelist = None
error_codes = {
peewee.OperationalError: 500,
peewee.IntegrityError: 400,
peewee.DoesNotExist: 404
setup_funcs = None
cronjobs = None
_setup_done = False
def __init__(self, *args, **kwargs):
if not 'root_path' in kwargs:
kwargs['root_path'] = str(pathlib.Path('.').absolute()) #TODO: pathlib probably isn't really needed here
if 'DEBUG' in dir(config) and config.DEBUG:
# There's some shitty overriding going on based on FLASK_ENV.
# Set the env var to override the override and enforce what
# the config says.
os.environ['FLASK_ENV'] = 'development'
os.environ['FLASK_ENV'] = 'production'
super(Poobrain, self).__init__(*args, **kwargs)
self.setup_funcs = []
self.cronjobs = []
@click.group(cls=flask.cli.FlaskGroup, create_app=lambda x=None: self)
@click.option('--database', default=f"sqlite:///{project_name}.db")
def cli(database):
self.db = db_url.connect(database)
self.cli = cli
for name in dir(defaults):
if name.isupper and not name in dir(config):
self.config[name] = getattr(defaults, name)
if config:
for name in dir(config):
if name.isupper():
self.config[name] = getattr(config, name)
if self.config['LOGFILE']: # log to file, if configured
log_handler = logging.handlers.WatchedFileHandler(self.config['LOGFILE'])
if self.debug:
except IOError as e:
import grp
user = os.getlogin()
group = grp.getgrgid(os.getgid()).gr_name
sys.exit(f"Somethings' fucky with the log file: {e}. Current user/group is {user}/{group}.")
if self.debug:
# show SQL queries
peeweelog = logging.getLogger('peewee')
import signal
import pudb
if hasattr(signal, 'SIGINFO'):
print(f"{self.name}: a graphical debugger can be invoked with SIGINFO (^T)")
self.debugger = pudb
except ImportError:
print("pudb not installed, falling back to pdb!")
import signal # shouldn't be needed but feels hacky to leave out
import pdb
self.boxes = {}
self.poobrain_path = os.path.dirname(os.path.realpath(__file__))
self.site_path = os.getcwd()
self.resource_extension_whitelist = ['css', 'scss', 'png', 'svg', 'ttf', 'otf', 'woff', 'js', 'jpg']
self.scss_compiler = scss.Compiler(extensions=(SCSSCore,), root=pathlib.Path('/'), search_path=self.theme_paths)
if 'DATABASE' in self.config:
self.db = db_url.connect(self.config['DATABASE'], autocommit=True, autorollback=True)
import optparse # Pretty fucking ugly, but at least its in the stdlib. TODO: Can we *somehow* make this work with prompt in cli/__init__.py install command?
parser = optparse.OptionParser()
parser.add_option('--database', default=f"sqlite:///{project_name}.db", dest='database') # NOTE: If you change this, you'll also have to change the --database default in cli/__init__.py or else install will fuck up
(options, _) = parser.parse_args()
self.logger.warning(f"No DATABASE in config, using generated default or --database parameter '{options.database}'. This should only happen before the install command is executed.")
self.db = db_url.connect(options.database)
self.add_url_rule('/theme/<path:resource>', 'serve_theme_resources', self.serve_theme_resources)
# Make sure that each request has a proper database connection
# set up site and admin blueprints
self.site = Pooprint('site', 'site')
self.admin = Pooprint('admin', 'admin')
def full_dispatch_request(self):
if not self._setup_done:
return super().full_dispatch_request()
def main(self):
def select_jinja_autoescape(self, filename):
return True # autoescape everything
def setup(self, f):
return f
def run_setup(self):
Global runtime setup. Calls all @app.setup decorated functions
and registers site and admin blueprints.
Called by request_setup on first request.
for f in self.setup_funcs:
self.register_blueprint(self.admin, url_prefix='/admin/')
self._setup_done = True
def theme_paths(self):
paths = []
if self.config['THEME'] != 'default':
paths.append(os.path.join(self.root_path, 'themes', self.config['THEME']))
paths.append(os.path.join(self.poobrain_path, 'themes', self.config['THEME']))
paths.append(os.path.join(self.root_path, 'themes', 'default'))
paths.append(os.path.join(self.poobrain_path, 'themes', 'default'))
return paths
def serve_theme_resources(self, resource):
r = False
extension = resource.split('.')
if len(extension) > 1:
extension = extension[-1]
if extension not in self.resource_extension_whitelist:
abort(404) # extension not allowed
if extension == 'svg':
r = Response(
style=self.scss_compiler.compile_string("@import 'svg';")
except scss.errors.SassImportError:
r = Response(
except jinja2.exceptions.TemplateNotFound:
paths = [os.path.join(path, resource) for path in app.theme_paths]
for current_path in paths:
if os.path.exists(current_path):
if extension == 'scss':
r = Response(self.scss_compiler.compile(current_path), mimetype='text/css')
r = flask.send_from_directory(os.path.dirname(current_path), os.path.basename(current_path))
if r:
r.cache_control.private = True
r.cache_control.public = True
r.cache_control.max_age = app.config['CACHE_LONG']
return r
def request_setup(self):
flask.g.boxes = {}
flask.g.forms = {}
#self.db.close() # fails first request and thus always on sqlite
if self.db.is_closed():
#connection = self.db.get_conn()
flask.g.user = None
if not 'SSL_CLIENT_VERIFY' in flask.request.environ:
if self.debug:
flask.request.environ['SSL_CLIENT_VERIFY'] = 'FAILURE'
raise werkzeug.exceptions.InternalServerError("httpd configuration problem. SSL_CLIENT_VERIFY not set in request environment.")
if flask.request.environ['SSL_CLIENT_VERIFY'] == 'SUCCESS':
#cert_info = auth.ClientCert.get(auth.ClientCert.subject_name == flask.request.environ['SSL_CLIENT_S_DN'])
cert = openssl.crypto.load_certificate(openssl.crypto.FILETYPE_PEM, flask.request.environ['SSL_CLIENT_CERT'])
cert_info = auth.ClientCert.get(auth.ClientCert.fingerprint == cert.digest('sha512').replace(b':', b'')) # fuck colons
flask.g.user = cert_info.user
except auth.ClientCert.DoesNotExist:
self.logger.error(f"httpd verified client certificate successfully, but it's not known at this site. CN: {cert.get_subject()}, digest: {cert.digest('sha512')}")
if flask.g.user == None:
flask.g.user = auth.User.get(auth.User.id == 1) # loads "anonymous".
def request_teardown(self, exception):
if not self.db.is_closed():
def box_setup(self):
for name, f in self.boxes.items():
flask.g.boxes[name] = f()
def box(self, name):
def decorator(f):
self.boxes[name] = f
return f
return decorator
def expose(self, rule, mode='full', extra_modes=None, title=None, force_secure=False):
def decorator(cls):
if issubclass(cls, storage.Storable):
self.site.add_listing(cls, rule, mode='teaser', title=title, force_secure=force_secure)
self.site.add_view(cls, os.path.join(rule, '<handle>/'), mode=mode, force_secure=force_secure)
if not extra_modes is None:
for extra_mode in extra_modes:
self.site.add_view(cls, os.path.join(rule, f"<handle>/{extra_mode}"), mode=extra_mode, force_secure=force_secure)
for related_model, related_fields in cls._meta.model_backrefs.items(): # Add Models that are associated by ForeignKeyField, like /user/foo/userpermissions
if len(related_fields) > 1:
self.logger.debug("!!! Apparent multi-field relation for {related_model.__name__}: {related_fields} !!!")
if issubclass(related_model, auth.Administerable):
self.site.add_related_view(cls, related_fields[0], os.path.join(rule, '<handle>/'))
elif issubclass(cls, rendering.Renderable):
self.site.add_view(cls, rule, mode=mode, force_secure=force_secure)
self.site.add_view(cls, os.path.join(rule, '<handle>/'), mode=mode, force_secure=force_secure) # TODO: Only needed for Renderables that actually use handles
return cls
return decorator
def get_url(self, cls, mode=None, **url_params):
if flask.request.blueprint is not None:
if flask.request.blueprint == 'admin':
except auth.AccessDenied:
blueprint = self.site
blueprint = self.blueprints[flask.request.blueprint]
blueprint = self.site
return blueprint.get_url(cls, mode=mode, **url_params)
except LookupError:
blueprint_names = list(self.blueprints.keys())
blueprint_names.insert(0, 'admin')
blueprint_names.insert(0, 'site')
for bp_name in blueprint_names:
if bp_name != flask.request.blueprint:
if bp_name == 'admin':
except auth.AccessDenied:
blueprint = self.blueprints[bp_name]
return blueprint.get_url(cls, mode=mode, **url_params)
except LookupError:
raise LookupError(f"Failed generating URL for {cls.__name__}[{url_params.get('handle', None)}]-{mode}. No matching route found.")
def get_related_view_url(self, cls, handle, related_field, add=None):
blueprint = self.blueprints[flask.request.blueprint]
return blueprint.get_related_view_url(cls, handle, related_field, add=add)
def cron(self, func):
return func
def cron_run(self):
self.logger.info("Starting cron run.")
for func in self.cronjobs:
self.logger.info("Finished cron run.")
def jinja_loader(self):
return jinja2.FileSystemLoader(self.theme_paths)
class Pooprint(flask.Blueprint):
app = None
db = None
views = None
listings = None
related_views = None
boxes = None
poobrain_path = None
def __init__(self, *args, **kwargs):
super(Pooprint, self).__init__(*args, **kwargs)
self.views = collections.OrderedDict() # reverse lookup for URL endpoints of Renderable views by mode
self.listings = collections.OrderedDict() # listings ordered by
self.related_views = collections.OrderedDict() # reverse lookup for endpoints of (fk) related views
self.related_views_add = collections.OrderedDict() # reverse lookup for endpoints to add new related items
self.boxes = collections.OrderedDict()
self.poobrain_path = os.path.dirname(__file__)
def register(self, app, options):
super(Pooprint, self).register(app, options)
self.app = app
self.db = app.db
def add_view(self, cls, rule, endpoint=None, view_func=None, mode='full', force_secure=False, **options):
if not cls in self.views:
self.views[cls] = collections.OrderedDict()
if not mode in self.views[cls]:
self.views[cls][mode] = []
# Why the fuck does HTML not support DELETE!?
options['methods'] = ['GET', 'POST']
if mode == 'delete':
def view_func(**kwargs):
kwargs['mode'] = mode
return cls.class_view(**kwargs)
if force_secure:
view_func = helpers.is_secure(view_func) # manual decoration, cause I don't know how to do this cleaner
if endpoint is None:
endpoint = self.next_endpoint(cls, mode, 'view')
self.add_url_rule(rule, endpoint, view_func, **options)
def add_related_view(self, cls, related_field, rule, endpoint=None, view_func=None, force_secure=False, **options):
related_model = related_field.model
if not endpoint:
endpoint = self.next_endpoint(cls, related_field, 'related')
if not cls in self.related_views:
self.related_views[cls] = collections.OrderedDict()
if not related_field in self.related_views[cls]:
url_segment = f"{related_model.__name__.lower()}:{related_field.name.lower()}"
rule = os.path.join(rule, url_segment, "") # empty string to get trailing slash
self.related_views[cls][related_field] = []
if not cls in self.related_views_add:
self.related_views_add[cls] = collections.OrderedDict()
if not related_field in self.related_views_add[cls]:
self.related_views_add[cls][related_field] = []
def view_func(*args, **kwargs):
kwargs['related_field'] = related_field
return cls.related_view(*args, **kwargs)
def view_func_add(*args, **kwargs):
kwargs['related_field'] = related_field
return cls.related_view_add(*args, **kwargs)
offset_rule = os.path.join(rule, '+<int:offset>')
offset_endpoint = f"{endpoint}_offset"
add_rule = os.path.join(rule, 'add')
add_endpoint = endpoint + '_add'
self.add_url_rule(rule, endpoint, view_func, methods=['GET', 'POST'])
self.add_url_rule(offset_rule, offset_endpoint, view_func, methods=['GET', 'POST'])
self.add_url_rule(add_rule, add_endpoint, view_func_add, methods=['GET', 'POST'])
def box_setup(self):
for name, f in self.boxes.items():
flask.g.boxes[name] = f()
def box(self, name):
def decorator(f):
self.boxes[name] = f
return f
return decorator
def add_listing(self, cls, rule, title=None, mode=None, endpoint=None, view_func=None, action_func=None, force_secure=False, **options):
if not mode:
mode = 'teaser'
if endpoint is None:
endpoint = self.next_endpoint(cls, mode, 'listing')
rule = os.path.join(rule, '') # make sure rule has trailing slash
if not cls in self.listings:
self.listings[cls] = collections.OrderedDict()
if not mode in self.listings[cls]:
self.listings[cls][mode] = []
if view_func is None:
def view_func(offset=0):
if action_func:
menu_actions = action_func()
menu_actions = None
return storage.Listing(cls, offset=offset, title=title, mode=mode, menu_actions=menu_actions)
if force_secure:
view_func = helpers.is_secure(view_func) # manual decoration, cause I don't know how to do this cleaner
offset_rule = rule+'+<int:offset>'
offset_endpoint = f"{endpoint}_offset"
self.add_url_rule(rule, endpoint=endpoint, view_func=view_func, **options)
self.add_url_rule(offset_rule, endpoint=offset_endpoint, view_func=view_func, **options)
def listing(self, cls, rule, mode='teaser', title=None, **options):
# TODO: Is this even used? Does keeping it make sense?
def decorator(f):
def real(offset=0):
instance = storage.Listing(cls, title=title, offset=offset, mode=mode)
return f(instance)
self.add_listing(cls, rule, view_func=real, **options)
return real
return decorator
def choose_endpoint(self, endpoints, **url_params):
for rule in self.app.url_map.iter_rules():
if rule.endpoint in endpoints:
endpoint = rule.endpoint
not_too_many_params = set(url_params.keys()).issubset(rule.arguments)
missing_params = rule.arguments - set(url_params.keys())
missing_all_optional = all([param in rule.defaults.keys() for param in missing_params])
#if sorted(rule.arguments) == sorted(url_params.keys()): # means url parameters match perfectly
#if set(url_params.keys()).issubset(rule.arguments):
if not_too_many_params and missing_all_optional:
return endpoint
nice_param_list = ', '.join(url_params.keys())
raise ValueError(f"No fitting url rule found for all params: {nice_param_list}")
def get_url(self, cls, mode=None, **url_params):
if not issubclass(cls, storage.Model) or \
mode == 'add' or \
'handle' in url_params and (mode is None or not mode.startswith('teaser')):
return self.get_view_url(cls, mode=mode, **url_params)
return self.get_listing_url(cls, mode=mode, **url_params)
def get_view_url(self, cls, mode=None, **url_params):
if mode == None:
mode = 'full'
if not cls in self.views:
raise LookupError(f"No registered views for class {cls.__name__}.")
if not mode in self.views[cls]:
raise LookupError(f"No registered views for class {cls.__name__} with mode {mode}.")
endpoints = [f"{self.name}.{x}" for x in self.views[cls][mode]]
if len(endpoints) > 1:
endpoint = self.choose_endpoint(endpoints, **url_params)
endpoint = endpoints[0]
return flask.url_for(endpoint, **url_params)
def get_listing_url(self, cls, handle=None, mode=None, offset=0, **url_params):
if mode == None:
mode = 'teaser'
if handle is not None:
instance = cls.load(handle)
clauses = []
for ordering in cls._meta.order_by: # Ordering is a peewee.WrappedNode, its .node property is the field
if ordering.direction == 'ASC':
clauses.append(ordering.node <= getattr(instance, ordering.node.name))
else: # We'll just assume there can only be ASC and DESC
clauses.append(ordering.node >= getattr(instance, ordering.node.name))
offset = cls.select().where(*clauses).count() - 1
if not cls in self.listings:
raise LookupError(f"No registered listings for class {cls.__name__}.")
if not mode in self.listings[cls]:
raise LookupError(f"No registered listings for class {cls.__name__} with mode {mode}.")
endpoints = [f"{self.name}.{x}" for x in self.listings[cls][mode]]
endpoint = self.choose_endpoint(endpoints)
# if isinstance(offset, int) and offset > 0:
# return flask.url_for(endpoint+'_offset', offset=offset)
kw = copy.copy(url_params)
if offset > 0:
kw['offset'] = offset
endpoint = f"{endpoint}_offset"
return flask.url_for(endpoint, **kw)
def get_related_view_url(self, cls, handle, related_field, add=False):
if add:
lookup = self.related_views_add
lookup = self.related_views
if not cls in lookup:
raise LookupError(f"No registered related views for class {cls.__name__}.")
if not related_field in lookup[cls]:
raise LookupError(f"No registered related views for {cls.__name__}[{handle}]<-{related_field.model.__name__}.{related_field.name}.")
endpoints = [f"{self.name}.{x}" for x in lookup[cls][related_field]]
endpoint = self.choose_endpoint(endpoints, **{'handle': handle})
return flask.url_for(endpoint, handle=handle)
def next_endpoint(self, cls, mode, context): # TODO: rename mode because it's not an applicable name for 'related' context
endpoint_base = f"{cls.__name__}_{context}_{mode}"
if context == 'view':
endpoints = self.views[cls][mode]
elif context == 'listing':
endpoints = self.listings[cls][mode]
elif context == 'related':
# mode is actually a foreign key field
endpoint_base = f"{cls.__name__}_{context}_{mode.model.__name__}-{mode.name}_autogen"
endpoints = self.related_views[cls][mode]
except KeyError: # means no view/listing has been registered yet
endpoints = []
i = 1
endpoint = f"{endpoint_base}_{i}"
while endpoint in endpoints:
endpoint = f"{endpoint_base}_{i}"
i += 1
return endpoint
app = Poobrain('poobrains') # TODO: Make app class configurable.
app.jinja_env.tests['renderable'] = is_renderable
app.url_map.converters['regex'] = RegexConverter
if app.config['PROFILE']:
from werkzeug.middleware.profiler import ProfilerMiddleware
app.wsgi_app = ProfilerMiddleware(app.wsgi_app, profile_dir='profiling')
# delayed internal imports which may depend on app
from . import mailing
from . import rendering
from . import form
from . import storage
from . import md
from . import auth
from . import upload
from . import tagging
from . import commenting
from . import svg
from . import formapp
from . import analysis
from . import search
from . import profile
from . import cli
from . import doc
from . import tools
class ErrorPage(rendering.Renderable):
title = None
error = None
code = None
message = None
def __init__(self, error, traceback):
super(ErrorPage, self).__init__()
self.error = error
self.traceback = traceback
if hasattr(error, 'code'):
self.code = error.code
# default to 500, but use more specific code if a matching exception is found in app.error_codes
self.code = 500
for cls, code in app.error_codes.items():
if isinstance(error, cls):
self.code = code
self.title = f"Ermahgerd, {self.code}!"
if isinstance(self.error, errors.ExposedError):
self.message = str(error)
elif isinstance(self.error, werkzeug.exceptions.HTTPException):
self.message = error.description
elif isinstance(self.error, peewee.DoesNotExist):
self.message = "The scurry of bouncy squirrels running this site couldn't find what you're looking for."
elif app.debug:
self.message = str(error) # verbatim error messages in debug mode
# raise error
self.message = "Weasels on PCP gnawed through our server internals."
def errorpage(error):
tb = None
app.logger.error(f"Error {type(error).__name__} when accessing {flask.request.path}: {error}")
if app.config['DEBUG']:
if hasattr(error, 'code'):
error_code = error.code
error_code = 500
for cls, code in app.error_codes.items():
if isinstance(error, cls):
error_code = code
if not error_code in (403, 404):
import traceback
tb = traceback.format_exc()
page = ErrorPage(error, traceback=tb)
return (page, page.code)
def menu_breadcrumb():
m = rendering.Menu('breadcrumb')
segments = flask.request.path.split('/')
for i in range(0, len(segments)):
segment = segments[i]
if i == 0:
m.append('/', 'home')
elif segment != '':
if ''.join(segments[i+1:]) == '': # means the rest of segments just appears empty strings
path = flask.request.path # makes sure we don't fuck over any trailing-slash rules
path = '/' + os.path.join(*segments[0:i+1]) + '/'
m.append(path, segment)
return m
def robots_txt():
Supply robots.txt for crawlers.
Allow everything except /doc/ by default, lets you add a custom
robots.txt to a projects' root directory.
if os.path.exists(os.path.join(app.root_path, 'robots.txt')):
response = flask.send_from_directory(app.root_path, 'robots.txt')
response = Response("User-agent: *\nDisallow: /doc/")
response.cache_control.public = True
response.cache_control.max_age = app.config['CACHE_LONG']
return response
app.register_error_handler(400, errorpage)
app.register_error_handler(403, errorpage)
app.register_error_handler(404, errorpage)
app.register_error_handler(errors.ExposedError, errorpage)
app.register_error_handler(peewee.OperationalError, errorpage)
app.register_error_handler(peewee.IntegrityError, errorpage)
app.register_error_handler(peewee.DoesNotExist, errorpage)
#if not app.config['DEBUG']:
# app.register_error_handler(Exception, errorpage) # Catch all in production