238 lines
8.1 KiB
Python
238 lines
8.1 KiB
Python
from . import exceptions
|
||
from . import util
|
||
from . import sanitation
|
||
|
||
"""
|
||
## Routing. ##
|
||
|
||
Matching request URLs to registered view callbacks.
|
||
|
||
### Terminology ###
|
||
|
||
* Route: associates an URL rule with a view callback
|
||
* URL rule: a string describing a pattern by which request URLs are matched
|
||
* rule part: the "directories" in the URL path, what's separated with slashes
|
||
* variable part: a dynamic rule part, denoted by variable signifiers, a variable name and optionally a sanitizer token
|
||
* variable signifiers: < and >, enclose and mark a variable part
|
||
* sanitizer token: a shorthand string denoting what input sanitizer to use
|
||
|
||
"""
|
||
|
||
# map sanitizer tokens to sanitizer objects
|
||
sanitizer_token_map = {
|
||
'str': sanitation.STRING,
|
||
'path': sanitation.PATH,
|
||
'int': sanitation.INTEGER,
|
||
'bool': sanitation.BOOL,
|
||
}
|
||
|
||
class Route:
|
||
|
||
"""
|
||
Associates an URL rule with a view callback.
|
||
"""
|
||
|
||
def __init__(self, rule, view):
|
||
|
||
if rule.startswith('/') or rule.endswith('/'):
|
||
raise exceptions.OozeProgrammingError("URL rules MUST NOT start or end with '/', but {rule} does!")
|
||
|
||
self.view = view
|
||
self.rule = rule
|
||
|
||
def __repr__(self):
|
||
return f"<Route '{self.rule}', {self.view}>"
|
||
|
||
class Router:
|
||
|
||
def __init__(self, dispatcher):
|
||
|
||
self.dispatcher = dispatcher
|
||
self.tree = {}
|
||
|
||
def part_is_variable(self, part):
|
||
return part.startswith('<') and part.endswith('>')
|
||
|
||
def part_remove_variable_signifiers(self, part):
|
||
return part[1:-1]
|
||
|
||
def part_token_name(self, part):
|
||
|
||
effective_part = self.part_remove_variable_signifiers(part)
|
||
if ':' in effective_part:
|
||
return effective_part.split(':', 1)
|
||
return 'str', effective_part
|
||
|
||
def get_node_parts(self, node):
|
||
|
||
"""
|
||
Returns the keys of a given node that are considered URL parts.
|
||
This means all keys, except for the ones prefixed by slashes,
|
||
which is what's used for special-purpose information in the
|
||
routing table.
|
||
|
||
As rules are split by '/', rule parts by definition can't contain '/',
|
||
making key collisions between rule parts and special-purpose
|
||
information impossible.
|
||
"""
|
||
|
||
return tuple( node_key for node_key in node.keys() if not node_key.startswith('/') )
|
||
|
||
def add_route(self, rule, func):
|
||
|
||
self.dispatcher.logger.debug(f"Registering route on {self.dispatcher}: '{rule}' to function '{func}'.")
|
||
|
||
rule_parts = rule.split('/')
|
||
tree_node = self.tree
|
||
|
||
for index, rule_part in enumerate(rule_parts):
|
||
|
||
last_part = index == len(rule_parts) - 1
|
||
|
||
if rule_part not in tree_node:
|
||
|
||
if self.part_is_variable(rule_part):
|
||
|
||
sanitizer_token, variable_name = self.part_token_name(rule_part)
|
||
tree_node_parts = self.get_node_parts(tree_node)
|
||
|
||
if len(tree_node_parts):
|
||
|
||
# variable parts collide with *any* other part that
|
||
# isn't a route
|
||
|
||
raise exceptions.OozeProgrammingError(f"Route collision: Variable part '{rule_part}'in URL rule '{rule}' collides with existing parts: {tree_node_parts}")
|
||
|
||
if not sanitizer_token in sanitizer_token_map:
|
||
raise exceptions.OozeProgrammingError(f"Invalid sanitizer token '{sanitizer_token}' in URL rule '{rule}'.")
|
||
|
||
sanitizer = sanitizer_token_map[sanitizer_token]
|
||
|
||
if not last_part and sanitizer.multipart:
|
||
raise exceptions.OozeProgrammingError(f"Multipart variable '{rule_part}' must be last part of URL rule, but '{rule}' has parts after it.")
|
||
|
||
variable_definition = {
|
||
'index' : index,
|
||
'name' : variable_name,
|
||
'sanitizer' : sanitizer,
|
||
}
|
||
|
||
tree_node[rule_part] = { '/variable': variable_definition }
|
||
|
||
else:
|
||
tree_node[rule_part] = {}
|
||
|
||
tree_node = tree_node[rule_part]
|
||
|
||
if '/' in tree_node:
|
||
raise exceptions.OozeProgrammingError(f"Route collision: Can't register {rule} because {tree_node['/']} already exists.")
|
||
|
||
tree_node['/'] = Route(
|
||
rule,
|
||
func
|
||
)
|
||
|
||
#self.dispatcher.logger.debug('Current route tree:')
|
||
#self.dispatcher.logger.debug(util.prettydict(self.tree))
|
||
|
||
def find_route(self, path):
|
||
|
||
"""
|
||
Find a route for a given request path.
|
||
|
||
Parameters:
|
||
* path: string, absolute request path
|
||
|
||
Raises:
|
||
* [exceptions.HTTPException](404) if no matching route is found
|
||
"""
|
||
|
||
view_kwargs = {}
|
||
path = path[len(self.dispatcher.absolute_root):] # drop dispatcher root from path
|
||
path = path.strip('/') # remove trailing slashes on both sides from remainder
|
||
path_remainder = path # used for multipart variables
|
||
path_parts = path.split('/')
|
||
|
||
tree_node = self.tree
|
||
for index, path_part in enumerate(path_parts):
|
||
|
||
last_part = index == len(path_parts) - 1
|
||
|
||
if path_part in tree_node and not self.part_is_variable(path_part):
|
||
|
||
# if a path_part is found verbatim, just use it – EXCEPT
|
||
# if it has the same format as a variable part, in which
|
||
# case it has to be matched as a variable because no static
|
||
# rule path can have that format.
|
||
#
|
||
# Consider the following:
|
||
#
|
||
# @app.route('foo/<str:bar>')
|
||
# def foo(request, bar):
|
||
# …
|
||
#
|
||
# A request to 'http://my.site/foo/<str:bar>' would have us
|
||
# match <str:bar> verbatim, sidestep variable parsing and
|
||
# go up in flames if not for the latter check.
|
||
|
||
tree_node = tree_node[path_part]
|
||
|
||
else:
|
||
|
||
branch_parts = self.get_node_parts(tree_node)
|
||
|
||
if len(branch_parts) == 0:
|
||
# tree_node is a leaf and we can't go deeper
|
||
raise exceptions.HTTPException(404)
|
||
|
||
first_branch_part = branch_parts[0]
|
||
|
||
if self.part_is_variable(branch_parts[0]):
|
||
|
||
# variable parts block any siblings, thus it's guaranteed
|
||
# we only have one and don't need to iterate
|
||
|
||
node = tree_node[first_branch_part]
|
||
|
||
variable_definition = node['/variable']
|
||
sanitizer = variable_definition['sanitizer']
|
||
|
||
if sanitizer.multipart:
|
||
|
||
try:
|
||
variable_value = sanitizer.sanitize(path_remainder)
|
||
except exceptions.SanitationException as e:
|
||
raise exceptions.HTTPException(404)
|
||
|
||
else:
|
||
|
||
try:
|
||
variable_value = sanitizer.sanitize(path_part)
|
||
except exceptions.SanitationException as e:
|
||
raise exceptions.HTTPException(404)
|
||
|
||
tree_node = node
|
||
view_kwargs[variable_definition['name']] = variable_value
|
||
|
||
if sanitizer.multipart:
|
||
# quit loop, otherwise only the first part of a
|
||
# multipart variable could be matched.
|
||
break
|
||
|
||
else:
|
||
|
||
# all child parts of tree_node are static parts.
|
||
# means the *request* path coming from the client
|
||
# contains something looksing like a variable
|
||
# declaration ('<…>') but there are no variable
|
||
# parts to match it against.
|
||
|
||
raise exceptions.HTTPException(404)
|
||
|
||
path_remainder = path_remainder[len(path_part) + 1:] # remove current part and trailing slash
|
||
|
||
try:
|
||
return tree_node['/'], view_kwargs
|
||
except KeyError:
|
||
raise exceptions.HTTPException(404)
|