ooze/tests/test_03_database.py

321 lines
9.3 KiB
Python

import pytest
def test_create_table(config, database):
class DBTestCreate(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
# execute should raise an exception on failure
DBTestCreate.create_table().execute()
# manual introspection to verify results
with database.pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
table_schema
FROM
information_schema.tables
WHERE
table_catalog = %(db_name)s AND
table_name = %(table_name)s""",
{
'db_name': config['database']['dbname'],
'table_name': DBTestCreate.__sql_name__(),
}
)
row = cursor.fetchone()
assert row['table_schema'] == 'public'
def test_insert(database):
class DBTestInsert(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestInsert.create_table().execute()
statement = DBTestInsert.insert(DBTestInsert.a, DBTestInsert.b)
# execute should raise an exception on failure, so absence
# of an exception is treated as proof of success.
statement.execute(a=23, b="FNORD")
with database.pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT
*
FROM
{DBTestInsert.__sql__()}"""
)
row = cursor.fetchone()
assert row['a'] == 23
assert row['b'] == "FNORD"
def test_select(database):
class DBTestSelect(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestSelect.create_table().execute()
DBTestSelect.insert(DBTestSelect.a, DBTestSelect.b).execute(a=23, b="FNORD")
statement = DBTestSelect.select().where(DBTestSelect.a == database.Parameter('a'))
with statement.execute(a=23) as cursor:
row = cursor.fetchone()
assert row.a == 23
assert row.b == "FNORD"
def test_select_where(database):
class DBTestSelectWhere(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestSelectWhere.create_table().execute()
insert = DBTestSelectWhere.insert(DBTestSelectWhere.a, DBTestSelectWhere.b)
insert.execute(a=23, b="FNORD")
insert.execute(a=42, b="BONGO")
with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=23) as cursor:
row = cursor.fetchone()
assert row.a == 23
assert row.b == "FNORD"
with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=42) as cursor:
row = cursor.fetchone()
assert row.a == 42
assert row.b == "BONGO"
def test_update(database):
class DBTestUpdate(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestUpdate.create_table().execute()
# insert test data
DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=23, b="FNORD")
DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=42, b="BONGO")
# do the update
DBTestUpdate.update(DBTestUpdate.a, DBTestUpdate.b).execute(a=69, b="KAZOO")
# get updated values for verification
rows = DBTestUpdate.select().execute().fetchall()
for row in rows:
assert row.a == 69
assert row.b == "KAZOO"
def test_update_where(database):
class DBTestUpdateWhere(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestUpdateWhere.create_table().execute()
# insert test data
DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=23, b="FNORD")
DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=42, b="BONGO")
# do the update
DBTestUpdateWhere.update(DBTestUpdateWhere.a, DBTestUpdateWhere.b).where(DBTestUpdateWhere.a == 23).execute(a=69, b="KAZOO")
# get updated values for verification
row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=69).fetchone()
assert row.a == 69
assert row.b == "KAZOO"
row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=42).fetchone()
assert row.a == 42
assert row.b == "BONGO"
def test_delete(database):
class DBTestDelete(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestDelete.create_table().execute()
# insert test data
DBTestDelete.insert(DBTestDelete.a, DBTestDelete.b).execute(a=23, b="FNORD")
# do the actual deletion
DBTestDelete.delete().execute()
row = DBTestDelete.select().execute().fetchone()
assert row == None
def test_delete_where(database):
class DBTestDeleteWhere(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestDeleteWhere.create_table().execute()
# insert test data
DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=23, b="FNORD")
DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=42, b="BONGO")
# do the actual deletion
DBTestDeleteWhere.delete().where(DBTestDeleteWhere.a == 23).execute()
# this should throw an exception
row = DBTestDeleteWhere.select().where(DBTestDeleteWhere.a == 23).execute().fetchone()
assert row == None
row = DBTestDeleteWhere.select().execute().fetchone()
assert row.a == 42
assert row.b == "BONGO"
def test_save(database):
class DBTestSave(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestSave.create_table().execute()
instance = DBTestSave()
instance.a = 23
instance.b = "FNORD"
assert instance.__stored__ == False
instance.save()
assert instance.__stored__ == True
# id is automatically filled because .save uses
# RETURNING and writes data back to object
assert instance.id != None
def test_load(database):
class DBTestLoad(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestLoad.create_table().execute()
instance = DBTestLoad()
instance.a = 23
instance.b = "FNORD"
instance.save()
loaded = DBTestLoad.load(id=instance.id)
assert loaded.__stored__ == True
assert instance.id == loaded.id
assert instance.a == loaded.a
assert instance.b == loaded.b
def test_delete_instance(database):
class DBTestDeleteInstance(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a = database.Integer()
b = database.CharacterVarying()
DBTestDeleteInstance.create_table().execute()
instance = DBTestDeleteInstance()
instance.a = 23
instance.b = "FNORD"
instance.save()
assert instance.__stored__ == True
instance.delete()
assert instance.__stored__ == False
def test_join(database):
class DBTestJoinA(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
foo = database.Integer()
bar = database.CharacterVarying()
class DBTestJoinB(database.Model):
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
a_id = database.UUID(constraints=[database.NotNull(), database.ForeignKey(DBTestJoinA.id)])
foo = database.Integer()
bar = database.CharacterVarying()
DBTestJoinA.create_table().execute()
DBTestJoinB.create_table().execute()
instance_a = DBTestJoinA()
instance_a.foo = 23
instance_a.bar = "FNORD"
instance_a.save()
assert not instance_a.id is None
instance_b = DBTestJoinB()
instance_b.a_id = instance_a.id
instance_b.foo = 42
instance_b.bar = "BONGO"
instance_b.save()
statement = DBTestJoinA.select().join(DBTestJoinB, on=DBTestJoinA.id == DBTestJoinB.a_id)
cursor = statement.execute()
row = cursor.fetchone()
assert row.foo == 23
assert row.bar == "FNORD"
assert row.joined.DBTestJoinB.foo == 42
assert row.joined.DBTestJoinB.bar == "BONGO"