691 lines
17 KiB
Python
691 lines
17 KiB
Python
import string
|
|
import random
|
|
import inspect
|
|
import functools
|
|
import codecs # so we can open a file as utf-8 in order to parse ASV for importing data
|
|
import werkzeug
|
|
import peewee
|
|
import flask
|
|
|
|
from threading import RLock
|
|
from collections import OrderedDict
|
|
|
|
_missing = object()
|
|
|
|
class locked_cached_property(object):
|
|
|
|
def __init__(self, func):
|
|
|
|
self.__name__ = func.__name__
|
|
self.__module__ = func.__module__
|
|
self.__doc__ = func.__doc__
|
|
|
|
self.func = func
|
|
self.lock = RLock()
|
|
|
|
def __get__(self, instance, type=None):
|
|
|
|
if instance is None:
|
|
return self
|
|
|
|
with self.lock:
|
|
value = instance.__dict__.get(self.__name__, _missing)
|
|
if value is _missing:
|
|
value = self.func(instance)
|
|
instance.__dict__[self.__name__] = value
|
|
|
|
return value
|
|
|
|
def __set__(self, instance, value):
|
|
with self.lock:
|
|
instance.__dict__[self.__name__] = value
|
|
|
|
def __delete__(self, instance):
|
|
with self.lock:
|
|
del instance.__dict__[self.__name__]
|
|
|
|
|
|
def function_parameters(function):
|
|
|
|
r = {}
|
|
signature = inspect.signature(function)
|
|
for param in signature.parameters.values():
|
|
|
|
if param.name == 'self':
|
|
continue # skip next loop iteration, we only want parameters that are explicitly passed
|
|
|
|
if param.annotation == param.empty:
|
|
param_type = str
|
|
|
|
else:
|
|
param_type = param.annotation
|
|
|
|
r[param.name] = {
|
|
'type': param_type,
|
|
'default': param.default
|
|
}
|
|
|
|
return r
|
|
|
|
def random_string(length=42):
|
|
|
|
rand = random.SystemRandom() # uses os.urandom, which is supposed to be cryptographically secure
|
|
string = u''
|
|
for i in range(0, length):
|
|
string += chr(rand.randint(33, 126)) # printable ascii chars are chars 33 - 126 #TODO: Should I even bother finding out whether unicode is an option?
|
|
|
|
return string
|
|
|
|
|
|
def random_string_light(length=8):
|
|
|
|
ranges = ((65, 90), (97, 122)) # A-Z, a-z
|
|
rand = random.SystemRandom()
|
|
string = u''
|
|
|
|
for i in range(0, length):
|
|
r = ranges[rand.randint(0, len(ranges)-1)]
|
|
string += chr(rand.randint(r[0], r[1]))
|
|
|
|
return string
|
|
|
|
def truncate_string(string, max_length):
|
|
|
|
""""
|
|
Truncate a given string to the specified maximum length and adds an
|
|
ellipsis ('…') if it was indeed truncated. Works with `math.inf`.
|
|
"""
|
|
|
|
if len(string) > max_length: # false if max_length == math.inf
|
|
return string[:max_length] + '…'
|
|
return string
|
|
|
|
def unindent_string(string, tab_spaces=4):
|
|
|
|
"""
|
|
Parameters:
|
|
* `string`: `str`, the string to be unindented.
|
|
* `tab_spaces`: `int`, how many spaces a tab counts for.
|
|
"""
|
|
|
|
string = string.replace('\t', ' ' * tab_spaces) # FIXME: This replaces *all* tabs, not only leading ones.
|
|
lines = string.split('\n')
|
|
|
|
leading_spaces = []
|
|
for line in lines:
|
|
if len(line.lstrip()) == 0: # empty line
|
|
continue # do not mark down 0, skip to next line
|
|
leading_spaces.append(len(line) - len(line.lstrip()))
|
|
|
|
indent = min(leading_spaces)
|
|
|
|
new_lines = []
|
|
for line in lines:
|
|
new_lines.append(line[indent:])
|
|
|
|
return "\n".join(new_lines)
|
|
|
|
def flatten_nested_multidict(v):
|
|
|
|
flat = []
|
|
|
|
if not isinstance(v, werkzeug.datastructures.MultiDict):
|
|
flat.append(v)
|
|
else:
|
|
for _, value in werkzeug.datastructures.iter_multi_items(v):
|
|
flat += flatten_nested_multidict(value)
|
|
|
|
return flat
|
|
|
|
|
|
def choose_primary(d):
|
|
|
|
for k,v in d.items():
|
|
|
|
if v['primary']:
|
|
return v
|
|
|
|
return d.values()[0]
|
|
|
|
|
|
def clean_string(s):
|
|
|
|
allowed_chars = string.ascii_lowercase + string.digits + '-' + '.' + '_'
|
|
clean = ""
|
|
|
|
#if not isinstance(s, unicode):
|
|
# s = unicode(s.decode('utf-8'))
|
|
|
|
s = s.lower()
|
|
|
|
substitutions = {
|
|
' ': '-',
|
|
':': '--',
|
|
'ä': 'ae',
|
|
'ö': 'oe',
|
|
'ü': 'ue',
|
|
'ß': 'ss'
|
|
}
|
|
|
|
for pattern, substitute in substitutions.items():
|
|
s = s.replace(pattern, substitute)
|
|
|
|
for char in s:
|
|
if char in allowed_chars:
|
|
clean += char
|
|
|
|
return clean
|
|
|
|
|
|
def themed(f):
|
|
|
|
@functools.wraps(f)
|
|
def real(*args, **kwargs):
|
|
|
|
rv = f(*args, **kwargs)
|
|
|
|
if isinstance(rv, tuple):
|
|
content = rv[0]
|
|
status_code = rv[1]
|
|
|
|
else:
|
|
content = rv
|
|
status_code = 200 # TODO: Find out if this is too naive
|
|
|
|
|
|
if isinstance(content, werkzeug.wrappers.Response):
|
|
return rv # pass Responses (i.e. redirects) upwards
|
|
|
|
elif isinstance(content, ThemedPassthrough):
|
|
return rv.themed
|
|
|
|
|
|
if hasattr(content, '_title') and content._title:
|
|
flask.g.title = content._title
|
|
|
|
elif hasattr(content, 'title') and content.title:
|
|
flask.g.title = content.title
|
|
|
|
elif hasattr(content, 'name') and content.name:
|
|
flask.g.title = content.name
|
|
|
|
else:
|
|
flask.g.title = content.__class__.__name__
|
|
flask.g.content = content
|
|
|
|
if hasattr(flask.g, 'user'):
|
|
user = flask.g.user
|
|
else:
|
|
user = None
|
|
|
|
if 'mode' in kwargs:
|
|
mode = kwargs['mode']
|
|
else:
|
|
#mode = content._meta.modes.keys()[0] # TODO: Default mode option in _meta?
|
|
mode = 'full' # will use default value for kwarg in .view
|
|
|
|
templates = [f'main-{content.__class__.__name__.lower()}.jinja', 'main.jinja']
|
|
return flask.render_template(templates, content=content, mode=mode, user=user), status_code
|
|
|
|
return real
|
|
|
|
|
|
def is_secure(f):
|
|
|
|
"""
|
|
decorator. Denies access if an url is accessed without TLS.
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def substitute(*args, **kwargs):
|
|
|
|
if flask.request.is_secure:
|
|
return f(*args, **kwargs)
|
|
|
|
else:
|
|
flask.abort(403, "You are trying to do naughty things without protection.")
|
|
|
|
return substitute
|
|
|
|
|
|
def pretty_bytes(bytecount):
|
|
|
|
"""
|
|
Return a human readable representation given a size in bytes.
|
|
"""
|
|
|
|
units = ['Byte', 'Kilobyte', 'Megabyte', 'Gigabyte', 'Terabyte']
|
|
|
|
value = bytecount
|
|
for unit in units:
|
|
if value / 1024.0 < 1:
|
|
break
|
|
|
|
value /= 1024.0
|
|
|
|
return f"{value:.2f} {unit}"
|
|
|
|
|
|
def levenshtein_distance(s1, s2):
|
|
|
|
"""
|
|
This code is basically copy-pasted from James Turks "jellyfish" module,
|
|
which is BSD-licensed and can thus be used in GPLd software. Thanks, mate!
|
|
"""
|
|
|
|
if s1 == s2:
|
|
return 0
|
|
rows = len(s1) + 1
|
|
cols = len(s2) + 1
|
|
|
|
if not s1:
|
|
return cols - 1
|
|
if not s2:
|
|
return rows - 1
|
|
|
|
prev = None
|
|
cur = range(cols)
|
|
for r in range(1, rows):
|
|
prev, cur = cur, [r] + [0] * (cols - 1)
|
|
for c in range(1, cols):
|
|
deletion = prev[c] + 1
|
|
insertion = cur[c - 1] + 1
|
|
edit = prev[c - 1] + (0 if s1[r - 1] == s2[c - 1] else 1)
|
|
cur[c] = min(edit, deletion, insertion)
|
|
|
|
return cur[-1]
|
|
|
|
|
|
class ThemedPassthrough(object):
|
|
|
|
themed = None
|
|
|
|
def __init__(self, themed):
|
|
self.themed = themed
|
|
|
|
|
|
class ClassOrInstanceBound(type): # probably the worst name I ever picked, but hey it's descriptive! ¯\_(ツ)_/¯
|
|
|
|
"""
|
|
For use in meta-classes. Leads to class `__init__`s having
|
|
the "owning" class or object injected at the front of their
|
|
when being accessed through one. Basically the cheapest form
|
|
of dependency injection. Not sure I want to keep it.
|
|
"""
|
|
|
|
def __get__(self, instance, owner):
|
|
|
|
if not instance is None:
|
|
return functools.partial(self, instance)
|
|
return functools.partial(self, owner)
|
|
# TODO: return functools.partial(self, instance or owner)
|
|
|
|
|
|
class FakeMetaOptions(object):
|
|
|
|
primary_key = None # This is a very ugly hack, to make this play nice with peewee Metaclass' __new__
|
|
abstract = None
|
|
handle_fields = None
|
|
modes = None
|
|
permission_class = None
|
|
schema = None # needed by peewee.ModelBase.__new__
|
|
database = None # needed py peewee.ModelBase.__new__ at least from 3.15.0 on
|
|
_additional_keys = None # needed by peewee.ModelBase.__new__
|
|
|
|
def __init__(self):
|
|
|
|
super(FakeMetaOptions, self).__init__()
|
|
self.abstract = False
|
|
self._additional_keys = set([])
|
|
|
|
|
|
class MetaCompatibility(type):
|
|
|
|
"""
|
|
Make a non-Model class compatible with peewees 'class Meta' pattern.
|
|
This is a hack.
|
|
"""
|
|
|
|
def __new__(cls, name, bases, attrs):
|
|
|
|
recognized_options = ['abstract', 'modes', 'permission_class', 'handle_fields', 'clone_props'] # FIXME: Make this shit generic, like peewee ModelOptions
|
|
|
|
cls = super(MetaCompatibility, cls).__new__(cls, name, bases, attrs)
|
|
|
|
defaults = {}
|
|
if hasattr(cls, '_meta'):
|
|
for option_name in recognized_options:
|
|
if hasattr(cls._meta, option_name):
|
|
defaults[option_name] = getattr(cls._meta, option_name)
|
|
|
|
defaults['abstract'] = False
|
|
|
|
if not issubclass(cls, peewee.Model): # Maybe suboptimal, but can't get poobrains.storage from here, I think
|
|
|
|
cls._meta = FakeMetaOptions()
|
|
|
|
#if hasattr(cls, 'Meta'):
|
|
if 'Meta' in attrs:
|
|
for option_name in recognized_options:
|
|
if hasattr(attrs['Meta'], option_name):
|
|
setattr(cls._meta, option_name, getattr(attrs['Meta'], option_name))
|
|
elif option_name in defaults:
|
|
setattr(cls._meta, option_name, defaults[option_name])
|
|
|
|
delattr(cls, 'Meta')
|
|
|
|
else:
|
|
for option_name, default in defaults.items():
|
|
setattr(cls._meta, option_name, default)
|
|
|
|
else:
|
|
cls._meta._additional_keys = cls._meta._additional_keys - set(['abstract']) # This makes the "abstract" property non-inheritable. FIXME: too hacky
|
|
|
|
if not hasattr(cls._meta, 'abstract'):
|
|
cls._meta.abstract = False
|
|
|
|
if not hasattr(cls._meta, 'handle_fields'):
|
|
cls._meta.handle_fields = [field.name for field in cls._meta.get_primary_keys()]
|
|
|
|
return cls
|
|
|
|
|
|
class ChildAware(object, metaclass=MetaCompatibility):
|
|
|
|
@classmethod
|
|
def class_tree(cls, abstract=False):
|
|
|
|
tree = []
|
|
|
|
for child in cls.__subclasses__():
|
|
|
|
tree.append((child, child.class_tree()))
|
|
|
|
return tuple(tree)
|
|
|
|
@classmethod
|
|
def class_children(cls, abstract=False):
|
|
|
|
reported_children = set()
|
|
children = cls.__subclasses__()
|
|
|
|
for child in children:
|
|
|
|
if abstract or not hasattr(child._meta, 'abstract') or not child._meta.abstract:
|
|
reported_children.add(child)
|
|
|
|
reported_children = reported_children.union(child.class_children())
|
|
|
|
return reported_children
|
|
|
|
|
|
@classmethod
|
|
def class_children_keyed(cls, lower=False):
|
|
|
|
children_keyed = OrderedDict()
|
|
|
|
for child in cls.class_children():
|
|
key = child.__name__.lower() if lower else child.__name__
|
|
children_keyed[key] = child
|
|
|
|
return children_keyed
|
|
|
|
|
|
@classmethod
|
|
def ancestors(cls, _level=0):
|
|
|
|
"""
|
|
Get the ancestors of this class, ordered by how far up the hierarchy they are.
|
|
"""
|
|
|
|
tiered = OrderedDict()
|
|
tiered[_level] = []
|
|
|
|
for base in cls.__bases__:
|
|
|
|
if base is ChildAware:
|
|
break
|
|
|
|
tiered[_level].append(base)
|
|
if hasattr(base, 'ancestors'):
|
|
for lvl, ancestors in base.ancestors(_level+1).items():
|
|
|
|
if not lvl in tiered:
|
|
tiered[lvl] = []
|
|
tiered[lvl] += ancestors
|
|
|
|
if _level > 0:
|
|
return tiered
|
|
|
|
r = []
|
|
for ancestors in tiered.values():
|
|
r += ancestors
|
|
|
|
return r
|
|
|
|
|
|
class TrueDict(OrderedDict):
|
|
|
|
def __setitem__(self, key, value):
|
|
|
|
if value == True and True in self.values() and self[name] != True:
|
|
raise ValueError('Only one item may be True.')
|
|
|
|
return super(TrueDict, self).__setitem__(key, value)
|
|
|
|
|
|
def choose(self):
|
|
|
|
if True in self.values():
|
|
for choice, primary in self.items():
|
|
if primary == True:
|
|
break
|
|
else:
|
|
choice = self.keys()[0]
|
|
|
|
return choice
|
|
|
|
|
|
class CustomOrderedDict(dict):
|
|
|
|
order = None
|
|
|
|
def __init__(self, *args, **kw):
|
|
self.order = []
|
|
super(CustomOrderedDict, self).__init__(*args, **kw)
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
r = '{'
|
|
|
|
for k, v in self.items():
|
|
r += f"{repr(k)}: {repr(v)}"
|
|
|
|
r += '}'
|
|
return r
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
|
|
super(CustomOrderedDict, self).__setitem__(key, value)
|
|
if key not in self.keys(): # add key to order only if needed, leave position unchanged otherwise
|
|
self.order.append(key)
|
|
|
|
|
|
def __delitem__(self, key):
|
|
super(CustomOrderedDict, self).__delitem__(key)
|
|
self.order.remove(key)
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
for key in self.keys():
|
|
yield key
|
|
|
|
|
|
def reorder_item(self, name, newidx):
|
|
|
|
if newidx >= len(self.order) or newidx < 0:
|
|
raise IndexError(f"Index {newidx} out of range for list of length {len(l)}")
|
|
|
|
oldidx = self.order.index(name)
|
|
new_order = []
|
|
|
|
i = 0
|
|
for other_name in self.order:
|
|
|
|
if i == newidx:
|
|
new_order.append(name)
|
|
i += 1
|
|
|
|
if other_name != name:
|
|
new_order.append(other_name)
|
|
i += 1
|
|
|
|
if i == newidx:
|
|
new_order.append(name)
|
|
i += 1
|
|
|
|
self.order = new_order
|
|
|
|
|
|
def items(self):
|
|
|
|
for key in self.keys():
|
|
yield key, self[key]
|
|
|
|
|
|
def values(self):
|
|
|
|
for key in self.keys():
|
|
yield self[key]
|
|
|
|
|
|
def clear(self):
|
|
super(CustomOrderedDict, self).clear()
|
|
self.order = []
|
|
|
|
|
|
def keys(self):
|
|
return self.order
|
|
|
|
|
|
class ASVReader(object):
|
|
|
|
filepath = None
|
|
|
|
def __init__(self, filepath):
|
|
|
|
super(ASVReader, self).__init__()
|
|
self.filepath = filepath
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
return ASVIterator(self)
|
|
|
|
|
|
class ASVIterator(object):
|
|
|
|
asv = None
|
|
fd = None
|
|
|
|
def __init__(self, asv):
|
|
|
|
self.asv = asv
|
|
self.fd = codecs.open(self.asv.filepath, 'r', encoding='utf-8')
|
|
self.keys = self.next_list()
|
|
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
|
|
def __del__(self):
|
|
self.fd.close()
|
|
|
|
|
|
def next_list(self):
|
|
|
|
""" Get the next record of the file as list """
|
|
|
|
record = []
|
|
current_token = u''
|
|
|
|
while True:
|
|
|
|
char = self.fd.read(1) # one unicode char, no matter how many bytes, compliments of the codecs module
|
|
|
|
if len(char) == 0:
|
|
raise StopIteration('ASV File was fully read.')
|
|
|
|
elif char == chr(0x1F): # unit separator, means the current column was fully read
|
|
record.append(current_token)
|
|
current_token = u''
|
|
|
|
elif char == chr(0x1E): # record separator, means we have reached the end of the line (or rather record)
|
|
record.append(current_token)
|
|
return record
|
|
|
|
else:
|
|
current_token += char
|
|
|
|
|
|
def __next__(self):
|
|
|
|
return OrderedDict(zip(self.keys, self.next_list()))
|
|
|
|
|
|
class ASVWriter(object):
|
|
|
|
fd = None
|
|
|
|
unit_separator = chr(0x1F)
|
|
record_terminator = chr(0x1E)
|
|
|
|
|
|
def __init__(self, filepath):
|
|
self.fd = codecs.open(filepath, 'a', encoding='utf-8')
|
|
|
|
def write_record(self, record):
|
|
self.fd.write(f"{self.unit_separator.join(record)}{self.record_terminator}")
|
|
|
|
def __del__(self):
|
|
self.fd.close()
|
|
|
|
|
|
class TypedList(list):
|
|
|
|
def __init__(self, cls, iterable=None):
|
|
|
|
super(TypedList, self).__init__()
|
|
|
|
self.type = cls
|
|
|
|
if iterable is not None:
|
|
for item in iterable:
|
|
self.append(item)
|
|
|
|
|
|
def append(self, value):
|
|
|
|
if not isinstance(value, self.type):
|
|
raise TypeError(f"Wrong type, this TypedList only allows {self.type.__name__} but got {type(value).__name__} {repr(value)}.")
|
|
|
|
super(TypedList, self).append(value)
|
|
|
|
|
|
def extend(self, iterable):
|
|
|
|
for value in iterable:
|
|
self.append(value)
|
|
|
|
|
|
def insert(self, index, value):
|
|
|
|
if not isinstance(value, self.type):
|
|
raise TypeError(f"Wrong type, this TypedList only allows {self.type.__name__} but got {type(value).__name__} {repr(value)}.")
|
|
|
|
super(TypedList, self).insert(index, value)
|