Asyncio
Python 비동기 IO.
Categories
- Python:EventLoop
- grpc#asyncio
- Asyncio:AsyncPythonSubprocess - Python의 asyncio패키지를 사용하여 Subprocess를 기동하는 클래스 구현.
- Asyncio:TaskManager - Python의 asyncio패키지를 사용하여 Task 관리자 구현.
Exceptions
- exception asyncio.CancelledError
- 작업이 취소되었습니다.
- 이 예외는 asyncio 태스크가 취소될 때 사용자 정의 작업을 수행하기 위해 잡을 수 있습니다. 거의 모든 상황에서 예외를 다시 일으켜야 합니다.
WARNING |
버전 3.8에서 변경: |
네이티브 코루틴 만들기
먼저 asyncio를 사용하려면 다음과 같이 async def
로 네이티브 코루틴을 만듭니다. 파이썬에서는 제너레이터 기반의 코루틴과 구분하기 위해 async def
로 만든 코루틴은 네이티브 코루틴이라고 합니다. async def
키워드는 파이썬 3.5
이상부터 사용 가능.
Hello, World Example
import asyncio
async def hello(): # async def로 네이티브 코루틴을 만듦
print('Hello, world!')
loop = asyncio.get_event_loop() # 이벤트 루프를 얻음
loop.run_until_complete(hello()) # hello가 끝날 때까지 기다림
loop.close() # 이벤트 루프를 닫음
await 로 네이티브 코루틴 실행하기
이번에는 await
로 네이티브 코루틴을 실행하는 방법입니다. 다음과 같이 await
뒤에 코루틴 객체, 퓨처 객체, 태스크 객체를 지정하면 해당 객체가 끝날 때까지 기다린 뒤 결과를 반환합니다. await
는 단어 뜻 그대로 특정 객체가 끝날 때까지 기다립니다.
await
키워드는 파이썬 3.5
이상부터 사용 가능, 3.4
에서는 yield from
을 사용
-
변수 = await 코루틴객체
-
변수 = await 퓨처객체
-
변수 = await 태스크객체
여기서 주의할 점이 있는데 await
는 네이티브 코루틴 안에서만 사용할 수 있습니다. 그럼 두 수를 더하는 네이티브 코루틴을 만들고 코루틴에서 1초 대기한 뒤 결과를 반환해보겠습니다.
import asyncio
async def add(a, b):
print('add: {0} + {1}'.format(a, b))
await asyncio.sleep(1.0) # 1초 대기. asyncio.sleep도 네이티브 코루틴
return a + b # 두 수를 더한 결과 반환
async def print_add(a, b):
result = await add(a, b) # await로 다른 네이티브 코루틴 실행하고 반환값을 변수에 저장
print('print_add: {0} + {1} = {2}'.format(a, b, result))
loop = asyncio.get_event_loop() # 이벤트 루프를 얻음
loop.run_until_complete(print_add(1, 2)) # print_add가 끝날 때까지 이벤트 루프를 실행
loop.close() # 이벤트 루프를 닫음
Future vs Task
- 퓨처 (asyncio.Future)
- 미래에 할 일을 표현하는 클래스인데 할 일을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
- 태스크 (asyncio.Task)
-
asyncio.Future
의 파생 클래스이며asyncio.Future
의 기능과 실행할 코루틴의 객체를 포함하고 있습니다. - 태스크는 코루틴의 실행을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
Synchronization Primitives
- Synchronization Primitives — Python 3.10.5 documentation
- Lock
- Event - WaitForSingleObject 와 비슷한 역할 (다만
clear
함수로 플래그를 리셋 해야 한다) - Condition - std::condition variable 와 비슷한 역할
- Semaphore
- BoundedSemaphore
동시에 태스크 실행하기
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
를 사용하면 된다.
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
퓨처
완료 콜백
add_done_callback(callback, *, context=None)
를 사용하면 된다.
Future가 완료(done)될 때 실행할 콜백을 추가합니다. functools.partial을 사용하여 매개 변수를 callback에 전달할 수 있습니다, 예를 들어:
# Call 'print("Future:", fut)' when "fut" is done.
fut.add_done_callback(
functools.partial(print, "Future:"))
태스크
태스크 만들기
asyncio.create_task(coro, *, name=None)
를 사용한다:
async def coro():
...
# In Python 3.7+
task = asyncio.create_task(coro())
...
# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...
태스크 취소
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
async with
그럼 1초 뒤에 덧셈 결과를 반환하는 클래스를 만들어보겠습니다.
import asyncio
class AsyncAdd:
def __init__(self, a, b):
self.a = a
self.b = b
async def __aenter__(self):
await asyncio.sleep(1.0)
return self.a + self.b # __aenter__에서 값을 반환하면 as에 지정한 변수에 들어감
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def main():
async with AsyncAdd(1, 2) as result: # async with에 클래스의 인스턴스 지정
print(result) # 3
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
async for
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.stop:
await asyncio.sleep(1.0)
r = self.current
self.current += 1
return r
else:
raise StopAsyncIteration
async def main():
async for i in AsyncCounter(3): # for 앞에 async를 붙임
print(i, end=' ')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
run_in_executor
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
if __name__ == '__main__':
asyncio.run(main())
전체 테스크 반환
import asyncio
import sys
def all_tasks(loop: asyncio.AbstractEventLoop):
if sys.version_info >= (3, 7):
return getattr(asyncio, "all_tasks")(loop) # nocov
else:
tasks = asyncio.Task.all_tasks(loop) # type: ignore[attr-defined]
return {t for t in list(tasks) if not t.done()}
await 에 우선 순위를 부여하는 방법 (PriorityQueue 를 사용한 방법)
await 를 사용하여 직접적으로 제어하는 방법은 없으므로 우선순위 큐를 사용한다.
import asyncio
async def high_priority_task():
print("High priority task started")
await asyncio.sleep(1)
print("High priority task finished")
async def low_priority_task():
print("Low priority task started")
await asyncio.sleep(2)
print("Low priority task finished")
async def main():
# 우선순위 큐 생성
queue = asyncio.PriorityQueue()
# 작업을 우선순위와 함께 큐에 추가
await queue.put((0, high_priority_task()))
await queue.put((1, low_priority_task()))
# 큐에서 작업을 가져와 실행
while not queue.empty():
_, task = await queue.get()
await task
asyncio.run(main())
asyncio.wait
- asyncio.FIRST_COMPLETED - 퓨처가 하나라도 끝나거나 취소될 때 함수가 반환됩니다.
- asyncio.FIRST_EXCEPTION - 예외가 발생되어 퓨처가 완료되면 반환. 예외가 없을 경우 ALL_COMPLETED 와 동일.
- asyncio.ALL_COMPLETED - 모든 퓨처가 끝나거나 취소되면 함수가 반환됩니다.
asyncio.as_completed
완료 순서로 반환.
asyncio.to_thread
별도의 스레드에서 func 함수를 비동기적으로 실행합니다.
다음 코드를:
다음과 같이 변경:
async 함수가 아닌 함수에서 테스크 목록 취소
import asyncio
import sys
def cancel_tasks(loop: asyncio.AbstractEventLoop, *tasks) -> None:
if not tasks:
return
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, loop=loop, return_exceptions=True))
for task in tasks:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "Unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)
Subprocesses
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
asyncio.run(run('ls /zzz'))
shlex를 사용한 cmd
적용.
Timeout 구현 (asyncio.wait_for)
asyncio.wait_for
를 사용하면 된다:
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# Expected output:
#
# timeout!
TCP/IP 소켓 오픈:
fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
pass
Cancel 가능한 Sleep Task 구현
# -*- coding: utf-8 -*-
from asyncio import CancelledError, Task, ensure_future
from asyncio import sleep as asyncio_sleep
from typing import Any, Dict
class Sleeper:
def __init__(self):
self.tasks: Dict[Any, Task] = dict()
async def sleep(self, key: Any, seconds: float, result=None, *, loop=None) -> Any:
coro = asyncio_sleep(seconds, result=result, loop=loop)
task = ensure_future(coro)
self.tasks[key] = task
try:
return await task
except CancelledError:
return result
finally:
del self.tasks[key]
def cancel(self, key: Any) -> None:
self.tasks[key].cancel()
def cancel_all(self) -> None:
for task in self.tasks.values():
task.cancel()
파이썬 3.4 이하에서 asyncio 사용하기
async def
와 await
는 파이썬 3.5
에서 추가되었습니다. 따라서 3.5
미만 버전에서는 사용할 수 없습니다. 파이썬 3.4
에서는 다음과 같이 @asyncio.coroutine
데코레이터로 네이티브 코루틴을 만듭니다.
파이썬 3.4
에서는 await
가 아닌 yield from
을 사용합니다.
-
변수 = yield from 코루틴객체
-
변수 = yield from 퓨처객체
-
변수 = yield from 태스크객체
파이썬 3.3
에서 asyncio
는 pip install asyncio
로 asyncio를 설치한 뒤 @asyncio.coroutine
데코레이터와 yield from
을 사용하면 됩니다. 단, 3.3
미만 버전에서는 asyncio
를 지원하지 않습니다.
Example
import asyncio
import aiohttp
urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
async def call_url(url):
print('Starting {}'.format(url))
response = await aiohttp.get(url)
data = await response.text()
print('{}: {} bytes: {}'.format(url, len(data), data))
return data
futures = [call_url(url) for url in urls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))
TCP Echo Server
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
TCP Echo Client
A TCP echo client using the loop.create_connection()
method, sends data, and waits until the connection is closed:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
ThreadPoolExecutor vs asyncio
Python:concurrent.futures#ThreadPoolExecutor vs asyncio 항목 참조.
Python Concurrency Mind Maps
|
Troubleshooting
pipe closed by peer or os.write(pipe, data) raised exception
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
...
drain
을 사용하자.
See also
- Python
- Python:asyncio
- Python:subprocess
- Python:multiprocessing
- Python:threading
- trio
- aiohttp
- aiortc
- libuv
Favorite site
Tutorials
- 파이썬과 비동기 프로그래밍 #1, 비동기 프로그래밍이란
- [추천] 파이썬과 비동기 프로그래밍 #2, 파이썬에서 비동기 프로그래밍 시작하기
- 파이썬과 비동기 프로그래밍 #3, 파이썬에서 비동기 프로그래밍 활용하기