ooze/ooze/protocol.py

376 lines
10 KiB
Python

from http import HTTPStatus
import types
import string
from . import util
from . import sanitation
from . import database # needed for database.NotFound exception
from . import session
#class CookieCollection(http.cookies.SimpleCookie):
# pass
def valid_header_value(value):
#return value.isascii() and value.isprintable()
if isinstance(value, str):
allowed_chars = string.ascii_letters + string.digits + '.,-+*/@'
return util.allowed_chars_only(value, allowed_chars)
return False
class CookieOptions:
http_only = True
secure = True
max_age = None
domain = None
path = '/'
samesite = 'Strict'
__valid_options = [
'secure',
'http_only',
'samesite',
'max_age',
'domain',
'path',
]
def __init__(self, **options):
for key, value in options.items():
if not key in self.__valid_options:
raise KeyError(f"Not a valid cookie option: '{key}'")
setattr(self, key, value)
def __setattribute__(self, name, value):
if name == 'samesite':
if not value in ('Strict', 'Lax', 'None'):
raise ValueError("Invalid value for cookie SameSite policy: {value} - must be one of 'Strict', 'Lax', 'None'.")
elif name == 'max_age':
value = sanitation.INTEGER.sanitize(value)
super().__setattribute__(name, value)
def __str__(self):
rv = ''
if self.secure is True:
rv += "; Secure"
if self.http_only is True:
rv += "; HttpOnly"
if not self.samesite is None:
rv += f"; SameSite={self.samesite}"
if not self.max_age is None:
rv += f"; Max-Age={self.max_age}"
if not self.domain is None:
rv += f"; Domain={self.domain}"
if not self.path is None:
rv += f"; Path={self.path}"
return rv
def as_dict(self):
return { k: getattr(self, k) for k in self.__valid_options }
class Cookie:
def __init__(self, name, value=None, **options):
if isinstance(value, str):
if not valid_header_value(value):
raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
self.name = name
self.value = value
self.options = CookieOptions(**options)
def __setattr__(self, name, value):
if name == 'name' and not valid_header_value(name):
raise ValueError(f"Not a valid cookie name '{name}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
if name == 'value' and not valid_header_value(value):
raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
super().__setattr__(name, value)
@classmethod
def from_string(cls, cookie_string):
name, value = cookie_string.split('=', 1)
return cls(name, value)
def header(self):
return f"{self.name}={self.value}{self.options}"
class CookieCollection:
"""
Holds all cookies for a request or response.
Setting and getting cookie values is done via dictionary interface:
cookies = CookieCollection()
cookies['foo'] = 'bar'
print(cookies['foo']) # prints 'bar'
Each cookie is held as `Cookie` object, with the dictionary interface
exposing the raw stored value. Direct access to `Cookie` objects is
done via attribute access; This is for example needed to set options
specific to that one cookie:
cookies = CookieCollection()
cookies['foo'] = 'bar'
cookies.foo.max_age = 300 # leads to cookie 'foo' expiring within 5 minutes (300s)
"""
def __init__(self):
self.cookies = {}
self.options = CookieOptions()
self.__deleted__ = [] # keeps track of what keys to invalidate
def __getattr__(self, name):
if name in self.cookies:
return self.cookies[name]
raise KeyError(f"Unknown cookie: '{name}'")
def __setattr__(self, name, value):
if isinstance(value, Cookie):
self.cookies[name] = value
else:
super().__setattr__(name, value)
def __getitem__(self, name):
return self.cookies[name].value
def __setitem__(self, name, value):
try:
self.__getattribute__(name)
raise KeyError(f"Forbidden cookie name: '{name}' - collides with existing object attribute.")
except AttributeError:
pass # allow only names not clashing with existing attributes
if isinstance(value, Cookie):
cookie = value
elif name in self.cookies:
self.cookies[name].value = value
else:
cookie = Cookie(name, value, **self.options.as_dict())
self.cookies[name] = cookie
def __delitem__(self, name):
try:
del self.cookies[name]
except KeyError:
# response cookies don't necessarily contain
# all cookies sent with the request
# ignore this case silently so we can
# still invalidate cookies not explicitly
# set in responses
pass
self.__deleted__.append(name)
def __iter__(self):
return self.cookies.__iter__()
def keys(self):
return self.cookies.keys()
def items(self):
return self.cookies.items()
@classmethod
def from_string(cls, cookies_string):
instance = cls()
for cookie_string in cookies_string.split(';'):
cookie_string = cookie_string.strip() # remove whitespace
try:
cookie = Cookie.from_string(cookie_string)
except ValueError as e: # invalid cookie string
pass # TODO: log this!
else:
instance.cookies[cookie.name] = cookie
return instance
def headers_wsgi(self):
headers = []
# set current cookies
for name, cookie in self.cookies.items():
headers.append((
'Set-Cookie',
cookie.header()
))
# mark deleted ones for invalidation by user agent
for name in self.__deleted__:
headers.append((
'Set-Cookie',
f'{name}=invalidated; Max-Age=0'
))
return headers
class Request:
def __init__(self, dispatcher, environ):
self.dispatcher = dispatcher
self.wsgi_options = {}
self.headers = {}
self.cookies = CookieCollection() # filled by process_headers
self.session = None # TODO
self.g = util.StorageObject() # like flask.g but without singleton hassle
# predefined variables from the wsgi. namespace
self.wsgi_version = environ['wsgi.version']
self.url_scheme = environ['wsgi.url_scheme']
self.input = environ['wsgi.input']
# Special environ variables as defined in PEP3333
self.method = environ['REQUEST_METHOD']
self.script_name = environ.get('SCRIPT_NAME', '')
self.path = environ.get('PATH_INFO', '/')
self.query_string = environ.get('QUERY_STRING')
self.content_type = environ.get('CONTENT_TYPE')
self.content_length = environ.get('CONTENT_LENGTH')
self.server_name = environ['SERVER_NAME']
self.server_port = environ['SERVER_PORT']
self.protocol = environ['SERVER_PROTOCOL']
# Other useful environ variables defined by CGI 1.1
# but not required by WSGI
self.remote_addr = environ.get('REMOTE_ADDR')
# extract WSGI options and HTTP headers
for key, value in environ.items():
if key.startswith('wsgi.'):
self.wsgi_options[key[5:]] = value
elif key.startswith('HTTP_'):
self.headers[key[5:]] = value
self.process_headers()
self.session_invalid = False
if 'session_id' in self.cookies:
try:
self.session = session.Session.load(id=self.cookies['session_id'])
except database.NotFound: # reference to deleted/invalid session in cookies
self.session_invalid = True
self.session = session.Session() # create new empty session
else:
self.session = session.Session()
def process_headers(self):
if 'RANGE' in self.headers:
self.ranges = self.parse_ranges(self.headers['RANGE'])
else:
self.ranges = False
if 'COOKIE' in self.headers:
self.cookies = CookieCollection.from_string(self.headers['COOKIE'])
def parse_ranges(self, header):
if not header.startswith('bytes='):
return False
ranges = ()
range_strings = header.replace(' ', '').split(',')
for range_string in range_strings:
if not '-' in range_string:
return False
start_string, end_string = range_string.split('-', 1)
try:
start = sanitation.INTEGER.sanitize(start_string)
end = sanitation.INTEGER.sanitize(end_string)
except exceptions.SanitationException as e:
return False
else:
ranges += (start, end)
return ranges
class Response:
def __init__(self, request):
self.request = request
self.headers = {
'Content-Type': 'text/html; charset=utf-8',
'Accept-Ranges': 'bytes',
}
self.cookies = CookieCollection()
self.status = HTTPStatus.OK
self.body = None
self.session = request.session
if self.session.modified:
self.session.save()
if not 'session_id' in request.cookies or request.cookies['session_id'] != str(self.session.id):
self.cookies['session_id'] = str(self.session.id)
elif request.session_invalid:
del self.cookies['session_id'] # mark session_id cookie for invalidation
def headers_wsgi(self):
"""
Returns the headers of this response in the list-of-tuple
format WSGI expects.
"""
rv = []
for key, value in self.headers.items():
rv.append((key, value))
rv.extend(self.cookies.headers_wsgi())
return rv
def body_wsgi(self):
"""
Returns the body of this response in the bytes format WSGI expects.
"""
if isinstance(self.body, types.GeneratorType):
return self.body
return [self.body.encode('UTF-8')]