You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Create 'test' PostgreSQL database for running this snippet:
createdb -E utf-8 test
importasyncioimportpeeweeimportpeewee_async# Nothing special, just define model and database:database=peewee_async.PooledPostgresqlDatabase(
database='db_name',
user='user',
host='127.0.0.1',
port='5432',
password='password',
)
classTestModel(peewee_async.AioModel):
text=peewee.CharField()
classMeta:
database=database# Look, sync code is working!TestModel.create_table(True)
TestModel.create(text="Yo, I can do it sync!")
database.close()
# No need for sync anymore!database.set_allow_sync(False)
asyncdefhandler():
awaitTestModel.aio_create(text="Not bad. Watch this, I'm async!")
all_objects=awaitTestModel.select().aio_execute()
forobjinall_objects:
print(obj.text)
loop=asyncio.get_event_loop()
loop.run_until_complete(handler())
loop.close()
# Clean up, can do it sync again:withdatabase.allow_sync():
TestModel.drop_table(True)
# Expected output:# Yo, I can do it sync!# Not bad. Watch this, I'm async!