2023-08-13 22:44:37 +00:00
|
|
|
import pytest
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
def test_create_table(config, database):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
|
|
|
class DBTestCreate(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# execute should raise an exception on failure
|
|
|
|
DBTestCreate.create_table().execute()
|
|
|
|
|
|
|
|
# manual introspection to verify results
|
|
|
|
with database.pool.connection() as conn:
|
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
with conn.cursor() as cursor:
|
|
|
|
|
|
|
|
cursor.execute("""
|
|
|
|
SELECT
|
|
|
|
table_schema
|
|
|
|
FROM
|
|
|
|
information_schema.tables
|
|
|
|
WHERE
|
|
|
|
table_catalog = %(db_name)s AND
|
|
|
|
table_name = %(table_name)s""",
|
|
|
|
{
|
|
|
|
'db_name': config['database']['dbname'],
|
|
|
|
'table_name': DBTestCreate.__sql_name__(),
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
row = cursor.fetchone()
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
assert row['table_schema'] == 'public'
|
|
|
|
|
|
|
|
def test_insert(database):
|
|
|
|
|
|
|
|
class DBTestInsert(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestInsert.create_table().execute()
|
|
|
|
|
|
|
|
statement = DBTestInsert.insert(DBTestInsert.a, DBTestInsert.b)
|
|
|
|
|
2023-08-13 22:44:37 +00:00
|
|
|
# execute should raise an exception on failure, so absence
|
|
|
|
# of an exception is treated as proof of success.
|
2023-08-14 19:44:29 +00:00
|
|
|
statement.execute(a=23, b="FNORD")
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
with database.pool.connection() as conn:
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
with conn.cursor() as cursor:
|
2023-08-14 19:44:29 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
cursor.execute(f"""
|
|
|
|
SELECT
|
|
|
|
*
|
|
|
|
FROM
|
|
|
|
{DBTestInsert.__sql__()}"""
|
|
|
|
)
|
|
|
|
|
|
|
|
row = cursor.fetchone()
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
assert row['a'] == 23
|
|
|
|
assert row['b'] == "FNORD"
|
|
|
|
|
|
|
|
def test_select(database):
|
|
|
|
|
|
|
|
class DBTestSelect(database.Model):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
DBTestSelect.create_table().execute()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
DBTestSelect.insert(DBTestSelect.a, DBTestSelect.b).execute(a=23, b="FNORD")
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
statement = DBTestSelect.select().where(DBTestSelect.a == database.Parameter('a'))
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
with statement.execute(a=23) as cursor:
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
|
|
assert row.a == 23
|
|
|
|
assert row.b == "FNORD"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
def test_select_where(database):
|
|
|
|
|
|
|
|
class DBTestSelectWhere(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestSelectWhere.create_table().execute()
|
|
|
|
|
|
|
|
insert = DBTestSelectWhere.insert(DBTestSelectWhere.a, DBTestSelectWhere.b)
|
|
|
|
insert.execute(a=23, b="FNORD")
|
|
|
|
insert.execute(a=42, b="BONGO")
|
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=23) as cursor:
|
|
|
|
row = cursor.fetchone()
|
2023-08-14 19:44:29 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 23
|
|
|
|
assert row.b == "FNORD"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=42) as cursor:
|
|
|
|
row = cursor.fetchone()
|
2023-08-14 19:44:29 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 42
|
|
|
|
assert row.b == "BONGO"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
def test_update(database):
|
|
|
|
|
|
|
|
class DBTestUpdate(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestUpdate.create_table().execute()
|
|
|
|
|
|
|
|
# insert test data
|
|
|
|
DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=23, b="FNORD")
|
|
|
|
DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=42, b="BONGO")
|
|
|
|
|
|
|
|
# do the update
|
|
|
|
DBTestUpdate.update(DBTestUpdate.a, DBTestUpdate.b).execute(a=69, b="KAZOO")
|
|
|
|
|
|
|
|
# get updated values for verification
|
|
|
|
rows = DBTestUpdate.select().execute().fetchall()
|
|
|
|
|
|
|
|
for row in rows:
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 69
|
|
|
|
assert row.b == "KAZOO"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
def test_update_where(database):
|
|
|
|
|
|
|
|
class DBTestUpdateWhere(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestUpdateWhere.create_table().execute()
|
|
|
|
|
|
|
|
# insert test data
|
|
|
|
DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=23, b="FNORD")
|
|
|
|
DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=42, b="BONGO")
|
|
|
|
|
|
|
|
# do the update
|
|
|
|
DBTestUpdateWhere.update(DBTestUpdateWhere.a, DBTestUpdateWhere.b).where(DBTestUpdateWhere.a == 23).execute(a=69, b="KAZOO")
|
|
|
|
|
|
|
|
# get updated values for verification
|
|
|
|
row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=69).fetchone()
|
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 69
|
|
|
|
assert row.b == "KAZOO"
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=42).fetchone()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 42
|
|
|
|
assert row.b == "BONGO"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
def test_delete(database):
|
|
|
|
|
|
|
|
class DBTestDelete(database.Model):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
DBTestDelete.create_table().execute()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# insert test data
|
|
|
|
DBTestDelete.insert(DBTestDelete.a, DBTestDelete.b).execute(a=23, b="FNORD")
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# do the actual deletion
|
|
|
|
DBTestDelete.delete().execute()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
row = DBTestDelete.select().execute().fetchone()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
assert row == None
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
def test_delete_where(database):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
class DBTestDeleteWhere(database.Model):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
DBTestDeleteWhere.create_table().execute()
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# insert test data
|
|
|
|
DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=23, b="FNORD")
|
|
|
|
DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=42, b="BONGO")
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# do the actual deletion
|
|
|
|
DBTestDeleteWhere.delete().where(DBTestDeleteWhere.a == 23).execute()
|
|
|
|
|
|
|
|
# this should throw an exception
|
|
|
|
row = DBTestDeleteWhere.select().where(DBTestDeleteWhere.a == 23).execute().fetchone()
|
|
|
|
|
|
|
|
assert row == None
|
|
|
|
|
|
|
|
row = DBTestDeleteWhere.select().execute().fetchone()
|
|
|
|
|
2023-11-07 05:14:54 +00:00
|
|
|
assert row.a == 42
|
|
|
|
assert row.b == "BONGO"
|
2023-08-14 19:44:29 +00:00
|
|
|
|
|
|
|
def test_save(database):
|
|
|
|
|
|
|
|
class DBTestSave(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestSave.create_table().execute()
|
|
|
|
|
|
|
|
instance = DBTestSave()
|
2023-08-13 22:44:37 +00:00
|
|
|
instance.a = 23
|
|
|
|
instance.b = "FNORD"
|
|
|
|
|
2023-08-29 04:05:31 +00:00
|
|
|
assert instance.__stored__ == False
|
2023-08-13 22:44:37 +00:00
|
|
|
instance.save()
|
2023-08-29 04:05:31 +00:00
|
|
|
assert instance.__stored__ == True
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
# id is automatically filled because .save uses
|
|
|
|
# RETURNING and writes data back to object
|
|
|
|
assert instance.id != None
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
def test_load(database):
|
2023-08-13 22:44:37 +00:00
|
|
|
|
2023-08-14 19:44:29 +00:00
|
|
|
class DBTestLoad(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestLoad.create_table().execute()
|
|
|
|
|
|
|
|
instance = DBTestLoad()
|
|
|
|
instance.a = 23
|
|
|
|
instance.b = "FNORD"
|
|
|
|
|
|
|
|
instance.save()
|
|
|
|
|
|
|
|
loaded = DBTestLoad.load(id=instance.id)
|
|
|
|
|
2023-08-29 04:05:31 +00:00
|
|
|
assert loaded.__stored__ == True
|
2023-08-14 19:44:29 +00:00
|
|
|
assert instance.id == loaded.id
|
|
|
|
assert instance.a == loaded.a
|
|
|
|
assert instance.b == loaded.b
|
2023-08-29 04:05:31 +00:00
|
|
|
|
|
|
|
def test_delete_instance(database):
|
|
|
|
|
|
|
|
class DBTestDeleteInstance(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a = database.Integer()
|
|
|
|
b = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestDeleteInstance.create_table().execute()
|
|
|
|
|
|
|
|
instance = DBTestDeleteInstance()
|
|
|
|
instance.a = 23
|
|
|
|
instance.b = "FNORD"
|
|
|
|
|
|
|
|
instance.save()
|
|
|
|
assert instance.__stored__ == True
|
|
|
|
|
|
|
|
instance.delete()
|
|
|
|
assert instance.__stored__ == False
|
2023-11-12 10:54:01 +00:00
|
|
|
|
|
|
|
def test_join(database):
|
|
|
|
|
|
|
|
class DBTestJoinA(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
foo = database.Integer()
|
|
|
|
bar = database.CharacterVarying()
|
|
|
|
|
|
|
|
class DBTestJoinB(database.Model):
|
|
|
|
|
|
|
|
id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid'))
|
|
|
|
a_id = database.UUID(constraints=[database.NotNull(), database.ForeignKey(DBTestJoinA.id)])
|
|
|
|
foo = database.Integer()
|
|
|
|
bar = database.CharacterVarying()
|
|
|
|
|
|
|
|
DBTestJoinA.create_table().execute()
|
|
|
|
DBTestJoinB.create_table().execute()
|
|
|
|
|
|
|
|
instance_a = DBTestJoinA()
|
|
|
|
instance_a.foo = 23
|
|
|
|
instance_a.bar = "FNORD"
|
|
|
|
|
|
|
|
instance_a.save()
|
|
|
|
|
|
|
|
assert not instance_a.id is None
|
|
|
|
|
|
|
|
instance_b = DBTestJoinB()
|
|
|
|
instance_b.a_id = instance_a.id
|
|
|
|
instance_b.foo = 42
|
|
|
|
instance_b.bar = "BONGO"
|
|
|
|
|
|
|
|
instance_b.save()
|
|
|
|
|
|
|
|
statement = DBTestJoinA.select().join(DBTestJoinB, on=DBTestJoinA.id == DBTestJoinB.a_id)
|
|
|
|
cursor = statement.execute()
|
|
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
|
|
assert row.foo == 23
|
|
|
|
assert row.bar == "FNORD"
|
|
|
|
assert row.joined.DBTestJoinB.foo == 42
|
|
|
|
assert row.joined.DBTestJoinB.bar == "BONGO"
|