Asyncpg
A fast PostgreSQL Database Client Library for Python/asyncio.
Basic Usage
import asyncio
import asyncpg
async def run():
conn = await asyncpg.connect(user='user', password='password',
database='database', host='127.0.0.1')
values = await conn.fetch('''SELECT * FROM mytable''')
await conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Create databse if not exists
import asyncio
import asyncpg
async def connect_create_if_not_exists(user, database):
try:
conn = await asyncpg.connect(user=user, database=database)
except asyncpg.InvalidCatalogNameError:
# Database does not exist, create it.
sys_conn = await asyncpg.connect(
database='template1',
user='postgres'
)
await sys_conn.execute(
f'CREATE DATABASE "{database}" OWNER "{user}"'
)
await sys_conn.close()
# Connect to the newly created database.
conn = await asyncpg.connect(user=user, database=database)
return conn
asyncio.get_event_loop().run_until_complete(
connect_create_if_not_exists(user='elvis', database='new-database')
)
Custom Type Conversions
automatic JSON conversion
The example below shows how to configure asyncpg to encode and decode JSON values using the json module.
import asyncio
import asyncpg
import json
async def main():
conn = await asyncpg.connect()
try:
await conn.set_type_codec(
'json',
encoder=json.dumps,
decoder=json.loads,
schema='pg_catalog'
)
data = {'foo': 'bar', 'spam': 1}
res = await conn.fetchval('SELECT $1::json', data)
finally:
await conn.close()
asyncio.get_event_loop().run_until_complete(main())
JSON Binary (JSONB
) 타입의 경우, asyncpg에선 결국 Text 파싱을 한다1 그래서 Binary 포맷을 사용해선 안된다. 자세한 내용은 Error when trying to set JSONB as a custom type codec #140 을 참조. 다음 예시는 recc에 사용된 코드이다.
from recc.node.driver.json import global_json_encoder, global_json_decoder
_DEFAULT_TEMPLATE_DATABASE = "template1"
async def _init_connection(conn: Connection):
await conn.set_type_codec(
"jsonb",
schema="pg_catalog",
encoder=global_json_encoder,
decoder=global_json_decoder,
format="text",
)
async def connect_and_create_if_not_exists(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
command_timeout: Optional[float] = None,
) -> Pool:
try:
pool = await create_pool(
host=host,
port=port,
user=user,
password=password,
database=database,
command_timeout=command_timeout,
init=_init_connection,
)
return pool
except InvalidCatalogNameError:
# Database does not exist, create it.
pass
automatic conversion of PostGIS types
The example below shows how to configure asyncpg to encode and decode the PostGIS geometry
type. It works for any Python object that conforms to the geo interface specification and relies on Shapely, although any library that supports reading and writing the WKB format will work.
import asyncio
import asyncpg
import shapely.geometry
import shapely.wkb
from shapely.geometry.base import BaseGeometry
async def main():
conn = await asyncpg.connect()
try:
def encode_geometry(geometry):
if not hasattr(geometry, '__geo_interface__'):
raise TypeError('{g} does not conform to '
'the geo interface'.format(g=geometry))
shape = shapely.geometry.asShape(geometry)
return shapely.wkb.dumps(shape)
def decode_geometry(wkb):
return shapely.wkb.loads(wkb)
await conn.set_type_codec(
'geometry', # also works for 'geography'
encoder=encode_geometry,
decoder=decode_geometry,
format='binary',
)
data = shapely.geometry.Point(-73.985661, 40.748447)
res = await conn.fetchrow(
'''SELECT 'Empire State Building' AS name,
$1::geometry AS coordinates
''',
data)
print(res)
finally:
await conn.close()
asyncio.get_event_loop().run_until_complete(main())
Troubleshooting
Cannot insert multiple commands into prepared statement
다음과 같이, 여러 명령을 하나의 쿼리로 보낼 수 있다.
import asyncpg
import asyncio
async def run():
con = await asyncpg.connect(user="coldmind", database="test")
await con.execute('SELECT ($1); SELECT 2;', 1)
asyncio.get_event_loop().run_until_complete(run())
그럼 다음과 같은 에러가 출력된다.
이슈의 답변은 다음과 같다.
매개변수는 "Prepared statements" 에서만 지원되며 "Prepared statements" 당 하나의 명령만 있을 수 있습니다. 이것은 PostgreSQL 프로토콜 제한 사항입니다. asyncpg 자체는 쿼리 수정을 시도하지 않습니다. 우리는 어느 시점에서 그것을 구현할 수 있습니다 (#9참조).
따라서 전체 문장을 완성시킨 후, 인자없이 한번에 실행하면 된다.
import asyncpg
import asyncio
async def run():
con = await asyncpg.connect(user="coldmind", database="test")
await con.execute('SELECT 1; SELECT 2;')
asyncio.get_event_loop().run_until_complete(run())
See also
Favorite site
References
-
확인 필요 ↩