from http import HTTPStatus import types import string from . import util from . import sanitation from . import database # needed for database.NotFound exception from . import session #class CookieCollection(http.cookies.SimpleCookie): # pass def valid_header_value(value): #return value.isascii() and value.isprintable() if isinstance(value, str): allowed_chars = string.ascii_letters + string.digits + '.,-+*/@' return util.allowed_chars_only(value, allowed_chars) return False class CookieOptions: http_only = True secure = True max_age = None domain = None path = '/' samesite = 'Strict' __valid_options = [ 'secure', 'http_only', 'samesite', 'max_age', 'domain', 'path', ] def __init__(self, **options): for key, value in options.items(): if not key in self.__valid_options: raise KeyError(f"Not a valid cookie option: '{key}'") setattr(self, key, value) def __setattribute__(self, name, value): if name == 'samesite': if not value in ('Strict', 'Lax', 'None'): raise ValueError("Invalid value for cookie SameSite policy: {value} - must be one of 'Strict', 'Lax', 'None'.") elif name == 'max_age': value = sanitation.INTEGER.sanitize(value) super().__setattribute__(name, value) def __str__(self): rv = '' if self.secure is True: rv += "; Secure" if self.http_only is True: rv += "; HttpOnly" if not self.samesite is None: rv += f"; SameSite={self.samesite}" if not self.max_age is None: rv += f"; Max-Age={self.max_age}" if not self.domain is None: rv += f"; Domain={self.domain}" if not self.path is None: rv += f"; Path={self.path}" return rv def as_dict(self): return { k: getattr(self, k) for k in self.__valid_options } class Cookie: def __init__(self, name, value=None, **options): if isinstance(value, str): if not valid_header_value(value): raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).") self.name = name self.value = value self.options = CookieOptions(**options) def __setattr__(self, name, value): if name == 'name' and not valid_header_value(name): raise ValueError(f"Not a valid cookie name '{name}'. Can only hold ASCII and NO linebreaks (\\r, \\n).") if name == 'value' and not valid_header_value(value): raise ValueError(f"Not a valid cookie value '{value}'. Can only hold ASCII and NO linebreaks (\\r, \\n).") super().__setattr__(name, value) @classmethod def from_string(cls, cookie_string): name, value = cookie_string.split('=', 1) return cls(name, value) def header(self): return f"{self.name}={self.value}{self.options}" class CookieCollection: """ Holds all cookies for a request or response. Setting and getting cookie values is done via dictionary interface: cookies = CookieCollection() cookies['foo'] = 'bar' print(cookies['foo']) # prints 'bar' Each cookie is held as `Cookie` object, with the dictionary interface exposing the raw stored value. Direct access to `Cookie` objects is done via attribute access; This is for example needed to set options specific to that one cookie: cookies = CookieCollection() cookies['foo'] = 'bar' cookies.foo.max_age = 300 # leads to cookie 'foo' expiring within 5 minutes (300s) """ def __init__(self): self.cookies = {} self.options = CookieOptions() self.__deleted__ = [] # keeps track of what keys to invalidate def __getattr__(self, name): if name in self.cookies: return self.cookies[name] raise KeyError(f"Unknown cookie: '{name}'") def __setattr__(self, name, value): if isinstance(value, Cookie): self.cookies[name] = value else: super().__setattr__(name, value) def __getitem__(self, name): return self.cookies[name].value def __setitem__(self, name, value): try: self.__getattribute__(name) raise KeyError(f"Forbidden cookie name: '{name}' - collides with existing object attribute.") except AttributeError: pass # allow only names not clashing with existing attributes if isinstance(value, Cookie): cookie = value elif name in self.cookies: self.cookies[name].value = value else: cookie = Cookie(name, value, **self.options.as_dict()) self.cookies[name] = cookie def __delitem__(self, name): try: del self.cookies[name] except KeyError: # response cookies don't necessarily contain # all cookies sent with the request # ignore this case silently so we can # still invalidate cookies not explicitly # set in responses pass self.__deleted__.append(name) def __iter__(self): return self.cookies.__iter__() def keys(self): return self.cookies.keys() def items(self): return self.cookies.items() @classmethod def from_string(cls, cookies_string): instance = cls() for cookie_string in cookies_string.split(';'): cookie_string = cookie_string.strip() # remove whitespace try: cookie = Cookie.from_string(cookie_string) except ValueError as e: # invalid cookie string pass # TODO: log this! else: instance.cookies[cookie.name] = cookie return instance def headers_wsgi(self): headers = [] # set current cookies for name, cookie in self.cookies.items(): headers.append(( 'Set-Cookie', cookie.header() )) # mark deleted ones for invalidation by user agent for name in self.__deleted__: headers.append(( 'Set-Cookie', f'{name}=invalidated; Max-Age=0' )) return headers class Request: def __init__(self, dispatcher, environ): self.dispatcher = dispatcher self.wsgi_options = {} self.headers = {} self.cookies = CookieCollection() # filled by process_headers self.session = None # TODO self.g = util.StorageObject() # like flask.g but without singleton hassle # predefined variables from the wsgi. namespace self.wsgi_version = environ['wsgi.version'] self.url_scheme = environ['wsgi.url_scheme'] self.input = environ['wsgi.input'] # Special environ variables as defined in PEP3333 self.method = environ['REQUEST_METHOD'] self.script_name = environ.get('SCRIPT_NAME', '') self.path = environ.get('PATH_INFO', '/') self.query_string = environ.get('QUERY_STRING') self.content_type = environ.get('CONTENT_TYPE') self.content_length = environ.get('CONTENT_LENGTH') self.server_name = environ['SERVER_NAME'] self.server_port = environ['SERVER_PORT'] self.protocol = environ['SERVER_PROTOCOL'] # Other useful environ variables defined by CGI 1.1 # but not required by WSGI self.remote_addr = environ.get('REMOTE_ADDR') # extract WSGI options and HTTP headers for key, value in environ.items(): if key.startswith('wsgi.'): self.wsgi_options[key[5:]] = value elif key.startswith('HTTP_'): self.headers[key[5:]] = value self.process_headers() self.session_invalid = False if 'session_id' in self.cookies: try: self.session = session.Session.load(id=self.cookies['session_id']) except database.NotFound: # reference to deleted/invalid session in cookies self.session_invalid = True self.session = session.Session() # create new empty session else: self.session = session.Session() def process_headers(self): if 'RANGE' in self.headers: self.ranges = self.parse_ranges(self.headers['RANGE']) else: self.ranges = False if 'COOKIE' in self.headers: self.cookies = CookieCollection.from_string(self.headers['COOKIE']) def parse_ranges(self, header): if not header.startswith('bytes='): return False ranges = () range_strings = header.replace(' ', '').split(',') for range_string in range_strings: if not '-' in range_string: return False start_string, end_string = range_string.split('-', 1) try: start = sanitation.INTEGER.sanitize(start_string) end = sanitation.INTEGER.sanitize(end_string) except exceptions.SanitationException as e: return False else: ranges += (start, end) return ranges class Response: def __init__(self, request): self.request = request self.headers = { 'Content-Type': 'text/html; charset=utf-8', 'Accept-Ranges': 'bytes', } self.cookies = CookieCollection() self.status = HTTPStatus.OK self.body = None self.session = request.session if self.session.modified: self.session.save() if not 'session_id' in request.cookies or request.cookies['session_id'] != str(self.session.id): self.cookies['session_id'] = str(self.session.id) elif request.session_invalid: del self.cookies['session_id'] # mark session_id cookie for invalidation def headers_wsgi(self): """ Returns the headers of this response in the list-of-tuple format WSGI expects. """ rv = [] for key, value in self.headers.items(): rv.append((key, value)) rv.extend(self.cookies.headers_wsgi()) return rv def body_wsgi(self): """ Returns the body of this response in the bytes format WSGI expects. """ if isinstance(self.body, types.GeneratorType): return self.body return [self.body.encode('UTF-8')]