import pytest def test_create_table(config, database): class DBTestCreate(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() # execute should raise an exception on failure DBTestCreate.create_table().execute() # manual introspection to verify results with database.pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(""" SELECT table_schema FROM information_schema.tables WHERE table_catalog = %(db_name)s AND table_name = %(table_name)s""", { 'db_name': config['database']['dbname'], 'table_name': DBTestCreate.__sql_name__(), } ) row = cursor.fetchone() assert row['table_schema'] == 'public' def test_insert(database): class DBTestInsert(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestInsert.create_table().execute() statement = DBTestInsert.insert(DBTestInsert.a, DBTestInsert.b) # execute should raise an exception on failure, so absence # of an exception is treated as proof of success. statement.execute(a=23, b="FNORD") with database.pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(f""" SELECT * FROM {DBTestInsert.__sql__()}""" ) row = cursor.fetchone() assert row['a'] == 23 assert row['b'] == "FNORD" def test_select(database): class DBTestSelect(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestSelect.create_table().execute() DBTestSelect.insert(DBTestSelect.a, DBTestSelect.b).execute(a=23, b="FNORD") statement = DBTestSelect.select().where(DBTestSelect.a == database.Parameter('a')) with statement.execute(a=23) as cursor: row = cursor.fetchone() assert row.a == 23 assert row.b == "FNORD" def test_select_where(database): class DBTestSelectWhere(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestSelectWhere.create_table().execute() insert = DBTestSelectWhere.insert(DBTestSelectWhere.a, DBTestSelectWhere.b) insert.execute(a=23, b="FNORD") insert.execute(a=42, b="BONGO") with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=23) as cursor: row = cursor.fetchone() assert row.a == 23 assert row.b == "FNORD" with DBTestSelectWhere.select().where(DBTestSelectWhere.a == database.Parameter('a')).execute(a=42) as cursor: row = cursor.fetchone() assert row.a == 42 assert row.b == "BONGO" def test_update(database): class DBTestUpdate(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestUpdate.create_table().execute() # insert test data DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=23, b="FNORD") DBTestUpdate.insert(DBTestUpdate.a, DBTestUpdate.b).execute(a=42, b="BONGO") # do the update DBTestUpdate.update(DBTestUpdate.a, DBTestUpdate.b).execute(a=69, b="KAZOO") # get updated values for verification rows = DBTestUpdate.select().execute().fetchall() for row in rows: assert row.a == 69 assert row.b == "KAZOO" def test_update_where(database): class DBTestUpdateWhere(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestUpdateWhere.create_table().execute() # insert test data DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=23, b="FNORD") DBTestUpdateWhere.insert(DBTestUpdateWhere.a, DBTestUpdateWhere.b).execute(a=42, b="BONGO") # do the update DBTestUpdateWhere.update(DBTestUpdateWhere.a, DBTestUpdateWhere.b).where(DBTestUpdateWhere.a == 23).execute(a=69, b="KAZOO") # get updated values for verification row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=69).fetchone() assert row.a == 69 assert row.b == "KAZOO" row = DBTestUpdateWhere.select().where(DBTestUpdateWhere.a == database.Parameter('a')).execute(a=42).fetchone() assert row.a == 42 assert row.b == "BONGO" def test_delete(database): class DBTestDelete(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestDelete.create_table().execute() # insert test data DBTestDelete.insert(DBTestDelete.a, DBTestDelete.b).execute(a=23, b="FNORD") # do the actual deletion DBTestDelete.delete().execute() row = DBTestDelete.select().execute().fetchone() assert row == None def test_delete_where(database): class DBTestDeleteWhere(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestDeleteWhere.create_table().execute() # insert test data DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=23, b="FNORD") DBTestDeleteWhere.insert(DBTestDeleteWhere.a, DBTestDeleteWhere.b).execute(a=42, b="BONGO") # do the actual deletion DBTestDeleteWhere.delete().where(DBTestDeleteWhere.a == 23).execute() # this should throw an exception row = DBTestDeleteWhere.select().where(DBTestDeleteWhere.a == 23).execute().fetchone() assert row == None row = DBTestDeleteWhere.select().execute().fetchone() assert row.a == 42 assert row.b == "BONGO" def test_save(database): class DBTestSave(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestSave.create_table().execute() instance = DBTestSave() instance.a = 23 instance.b = "FNORD" assert instance.__stored__ == False instance.save() assert instance.__stored__ == True # id is automatically filled because .save uses # RETURNING and writes data back to object assert instance.id != None def test_load(database): class DBTestLoad(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestLoad.create_table().execute() instance = DBTestLoad() instance.a = 23 instance.b = "FNORD" instance.save() loaded = DBTestLoad.load(id=instance.id) assert loaded.__stored__ == True assert instance.id == loaded.id assert instance.a == loaded.a assert instance.b == loaded.b def test_delete_instance(database): class DBTestDeleteInstance(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a = database.Integer() b = database.CharacterVarying() DBTestDeleteInstance.create_table().execute() instance = DBTestDeleteInstance() instance.a = 23 instance.b = "FNORD" instance.save() assert instance.__stored__ == True instance.delete() assert instance.__stored__ == False def test_join(database): class DBTestJoinA(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) foo = database.Integer() bar = database.CharacterVarying() class DBTestJoinB(database.Model): id = database.UUID(constraints=[database.PrimaryKey()], default=database.Call('gen_random_uuid')) a_id = database.UUID(constraints=[database.NotNull(), database.ForeignKey(DBTestJoinA.id)]) foo = database.Integer() bar = database.CharacterVarying() DBTestJoinA.create_table().execute() DBTestJoinB.create_table().execute() instance_a = DBTestJoinA() instance_a.foo = 23 instance_a.bar = "FNORD" instance_a.save() assert not instance_a.id is None instance_b = DBTestJoinB() instance_b.a_id = instance_a.id instance_b.foo = 42 instance_b.bar = "BONGO" instance_b.save() statement = DBTestJoinA.select().join(DBTestJoinB, on=DBTestJoinA.id == DBTestJoinB.a_id) cursor = statement.execute() row = cursor.fetchone() assert row.foo == 23 assert row.bar == "FNORD" assert row.joined.DBTestJoinB.foo == 42 assert row.joined.DBTestJoinB.bar == "BONGO"