376 lines
10 KiB
Python
376 lines
10 KiB
Python
from http import HTTPStatus
|
|
import types
|
|
import string
|
|
|
|
from . import util
|
|
from . import sanitation
|
|
from . import database # needed for database.NotFound exception
|
|
from . import session
|
|
|
|
#class CookieCollection(http.cookies.SimpleCookie):
|
|
# pass
|
|
|
|
def valid_header_value(value):
|
|
|
|
#return value.isascii() and value.isprintable()
|
|
|
|
if isinstance(value, str):
|
|
allowed_chars = string.ascii_letters + string.digits + '.,-+*/@'
|
|
return util.allowed_chars_only(value, allowed_chars)
|
|
return False
|
|
|
|
class CookieOptions:
|
|
|
|
http_only = True
|
|
secure = True
|
|
max_age = None
|
|
domain = None
|
|
path = '/'
|
|
samesite = 'Strict'
|
|
|
|
__valid_options = [
|
|
'secure',
|
|
'http_only',
|
|
'samesite',
|
|
'max_age',
|
|
'domain',
|
|
'path',
|
|
]
|
|
|
|
def __init__(self, **options):
|
|
|
|
for key, value in options.items():
|
|
if not key in self.__valid_options:
|
|
raise KeyError(f"Not a valid cookie option: '{key}'")
|
|
|
|
setattr(self, key, value)
|
|
|
|
def __setattribute__(self, name, value):
|
|
|
|
if name == 'samesite':
|
|
if not value in ('Strict', 'Lax', 'None'):
|
|
raise ValueError("Invalid value for cookie SameSite policy: {value} - must be one of 'Strict', 'Lax', 'None'.")
|
|
|
|
elif name == 'max_age':
|
|
value = sanitation.INTEGER.sanitize(value)
|
|
|
|
super().__setattribute__(name, value)
|
|
|
|
def __str__(self):
|
|
|
|
rv = ''
|
|
|
|
if self.secure is True:
|
|
rv += "; Secure"
|
|
|
|
if self.http_only is True:
|
|
rv += "; HttpOnly"
|
|
|
|
if not self.samesite is None:
|
|
rv += f"; SameSite={self.samesite}"
|
|
|
|
if not self.max_age is None:
|
|
rv += f"; Max-Age={self.max_age}"
|
|
|
|
if not self.domain is None:
|
|
rv += f"; Domain={self.domain}"
|
|
|
|
if not self.path is None:
|
|
rv += f"; Path={self.path}"
|
|
|
|
return rv
|
|
|
|
def as_dict(self):
|
|
return { k: getattr(self, k) for k in self.__valid_options }
|
|
|
|
|
|
class Cookie:
|
|
|
|
def __init__(self, name, value=None, **options):
|
|
|
|
if isinstance(value, str):
|
|
if not valid_header_value(value):
|
|
raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
|
|
|
|
self.name = name
|
|
self.value = value
|
|
self.options = CookieOptions(**options)
|
|
|
|
def __setattr__(self, name, value):
|
|
|
|
if name == 'name' and not valid_header_value(name):
|
|
raise ValueError(f"Not a valid cookie name '{name}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
|
|
|
|
if name == 'value' and not valid_header_value(value):
|
|
raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).")
|
|
|
|
super().__setattr__(name, value)
|
|
|
|
@classmethod
|
|
def from_string(cls, cookie_string):
|
|
|
|
name, value = cookie_string.split('=', 1)
|
|
return cls(name, value)
|
|
|
|
def header(self):
|
|
return f"{self.name}={self.value}{self.options}"
|
|
|
|
class CookieCollection:
|
|
|
|
"""
|
|
Holds all cookies for a request or response.
|
|
|
|
Setting and getting cookie values is done via dictionary interface:
|
|
cookies = CookieCollection()
|
|
cookies['foo'] = 'bar'
|
|
print(cookies['foo']) # prints 'bar'
|
|
|
|
Each cookie is held as `Cookie` object, with the dictionary interface
|
|
exposing the raw stored value. Direct access to `Cookie` objects is
|
|
done via attribute access; This is for example needed to set options
|
|
specific to that one cookie:
|
|
cookies = CookieCollection()
|
|
cookies['foo'] = 'bar'
|
|
cookies.foo.max_age = 300 # leads to cookie 'foo' expiring within 5 minutes (300s)
|
|
"""
|
|
|
|
def __init__(self):
|
|
|
|
self.cookies = {}
|
|
self.options = CookieOptions()
|
|
self.__deleted__ = [] # keeps track of what keys to invalidate
|
|
|
|
def __getattr__(self, name):
|
|
|
|
if name in self.cookies:
|
|
return self.cookies[name]
|
|
|
|
raise KeyError(f"Unknown cookie: '{name}'")
|
|
|
|
def __setattr__(self, name, value):
|
|
|
|
if isinstance(value, Cookie):
|
|
self.cookies[name] = value
|
|
else:
|
|
super().__setattr__(name, value)
|
|
|
|
|
|
def __getitem__(self, name):
|
|
return self.cookies[name].value
|
|
|
|
def __setitem__(self, name, value):
|
|
|
|
try:
|
|
self.__getattribute__(name)
|
|
raise KeyError(f"Forbidden cookie name: '{name}' - collides with existing object attribute.")
|
|
except AttributeError:
|
|
pass # allow only names not clashing with existing attributes
|
|
|
|
if isinstance(value, Cookie):
|
|
cookie = value
|
|
elif name in self.cookies:
|
|
self.cookies[name].value = value
|
|
else:
|
|
cookie = Cookie(name, value, **self.options.as_dict())
|
|
|
|
self.cookies[name] = cookie
|
|
|
|
def __delitem__(self, name):
|
|
|
|
try:
|
|
del self.cookies[name]
|
|
except KeyError:
|
|
# response cookies don't necessarily contain
|
|
# all cookies sent with the request
|
|
# ignore this case silently so we can
|
|
# still invalidate cookies not explicitly
|
|
# set in responses
|
|
pass
|
|
self.__deleted__.append(name)
|
|
|
|
def __iter__(self):
|
|
return self.cookies.__iter__()
|
|
|
|
def keys(self):
|
|
return self.cookies.keys()
|
|
|
|
def items(self):
|
|
return self.cookies.items()
|
|
|
|
@classmethod
|
|
def from_string(cls, cookies_string):
|
|
|
|
instance = cls()
|
|
|
|
for cookie_string in cookies_string.split(';'):
|
|
|
|
cookie_string = cookie_string.strip() # remove whitespace
|
|
|
|
try:
|
|
cookie = Cookie.from_string(cookie_string)
|
|
except ValueError as e: # invalid cookie string
|
|
pass # TODO: log this!
|
|
else:
|
|
instance.cookies[cookie.name] = cookie
|
|
|
|
return instance
|
|
|
|
def headers_wsgi(self):
|
|
|
|
headers = []
|
|
|
|
# set current cookies
|
|
for name, cookie in self.cookies.items():
|
|
headers.append((
|
|
'Set-Cookie',
|
|
cookie.header()
|
|
))
|
|
|
|
# mark deleted ones for invalidation by user agent
|
|
for name in self.__deleted__:
|
|
headers.append((
|
|
'Set-Cookie',
|
|
f'{name}=invalidated; Max-Age=0'
|
|
))
|
|
|
|
return headers
|
|
|
|
|
|
class Request:
|
|
|
|
def __init__(self, dispatcher, environ):
|
|
|
|
self.dispatcher = dispatcher
|
|
|
|
self.wsgi_options = {}
|
|
self.headers = {}
|
|
self.cookies = CookieCollection() # filled by process_headers
|
|
self.session = None # TODO
|
|
self.g = util.StorageObject() # like flask.g but without singleton hassle
|
|
|
|
# predefined variables from the wsgi. namespace
|
|
self.wsgi_version = environ['wsgi.version']
|
|
self.url_scheme = environ['wsgi.url_scheme']
|
|
self.input = environ['wsgi.input']
|
|
|
|
# Special environ variables as defined in PEP3333
|
|
self.method = environ['REQUEST_METHOD']
|
|
self.script_name = environ.get('SCRIPT_NAME', '')
|
|
self.path = environ.get('PATH_INFO', '/')
|
|
|
|
self.query_string = environ.get('QUERY_STRING')
|
|
self.content_type = environ.get('CONTENT_TYPE')
|
|
self.content_length = environ.get('CONTENT_LENGTH')
|
|
|
|
self.server_name = environ['SERVER_NAME']
|
|
self.server_port = environ['SERVER_PORT']
|
|
self.protocol = environ['SERVER_PROTOCOL']
|
|
|
|
# Other useful environ variables defined by CGI 1.1
|
|
# but not required by WSGI
|
|
|
|
self.remote_addr = environ.get('REMOTE_ADDR')
|
|
|
|
# extract WSGI options and HTTP headers
|
|
for key, value in environ.items():
|
|
if key.startswith('wsgi.'):
|
|
self.wsgi_options[key[5:]] = value
|
|
elif key.startswith('HTTP_'):
|
|
self.headers[key[5:]] = value
|
|
|
|
self.process_headers()
|
|
|
|
self.session_invalid = False
|
|
if 'session_id' in self.cookies:
|
|
try:
|
|
self.session = session.Session.load(id=self.cookies['session_id'])
|
|
except database.NotFound: # reference to deleted/invalid session in cookies
|
|
self.session_invalid = True
|
|
self.session = session.Session() # create new empty session
|
|
else:
|
|
self.session = session.Session()
|
|
|
|
def process_headers(self):
|
|
|
|
if 'RANGE' in self.headers:
|
|
self.ranges = self.parse_ranges(self.headers['RANGE'])
|
|
else:
|
|
self.ranges = False
|
|
|
|
if 'COOKIE' in self.headers:
|
|
self.cookies = CookieCollection.from_string(self.headers['COOKIE'])
|
|
|
|
def parse_ranges(self, header):
|
|
|
|
if not header.startswith('bytes='):
|
|
return False
|
|
|
|
ranges = ()
|
|
range_strings = header.replace(' ', '').split(',')
|
|
|
|
for range_string in range_strings:
|
|
|
|
if not '-' in range_string:
|
|
return False
|
|
|
|
start_string, end_string = range_string.split('-', 1)
|
|
|
|
try:
|
|
start = sanitation.INTEGER.sanitize(start_string)
|
|
end = sanitation.INTEGER.sanitize(end_string)
|
|
|
|
except exceptions.SanitationException as e:
|
|
return False
|
|
|
|
else:
|
|
ranges += (start, end)
|
|
|
|
return ranges
|
|
|
|
class Response:
|
|
|
|
def __init__(self, request):
|
|
|
|
self.request = request
|
|
self.headers = {
|
|
'Content-Type': 'text/html; charset=utf-8',
|
|
'Accept-Ranges': 'bytes',
|
|
}
|
|
self.cookies = CookieCollection()
|
|
self.status = HTTPStatus.OK
|
|
self.body = None
|
|
|
|
self.session = request.session
|
|
if self.session.modified:
|
|
self.session.save()
|
|
if not 'session_id' in request.cookies or request.cookies['session_id'] != str(self.session.id):
|
|
self.cookies['session_id'] = str(self.session.id)
|
|
elif request.session_invalid:
|
|
del self.cookies['session_id'] # mark session_id cookie for invalidation
|
|
|
|
def headers_wsgi(self):
|
|
|
|
"""
|
|
Returns the headers of this response in the list-of-tuple
|
|
format WSGI expects.
|
|
"""
|
|
|
|
rv = []
|
|
for key, value in self.headers.items():
|
|
rv.append((key, value))
|
|
|
|
rv.extend(self.cookies.headers_wsgi())
|
|
|
|
return rv
|
|
|
|
def body_wsgi(self):
|
|
|
|
"""
|
|
Returns the body of this response in the bytes format WSGI expects.
|
|
"""
|
|
|
|
if isinstance(self.body, types.GeneratorType):
|
|
return self.body
|
|
|
|
return [self.body.encode('UTF-8')]
|