Skip to content

Asyncio

Python 비동기 IO.

Categories

Exceptions

exception asyncio.CancelledError
작업이 취소되었습니다.
이 예외는 asyncio 태스크가 취소될 때 사용자 정의 작업을 수행하기 위해 잡을 수 있습니다. 거의 모든 상황에서 예외를 다시 일으켜야 합니다.

WARNING

버전 3.8에서 변경: CancelledError는 이제 BaseException의 서브 클래스입니다.

네이티브 코루틴 만들기

먼저 asyncio를 사용하려면 다음과 같이 async def로 네이티브 코루틴을 만듭니다. 파이썬에서는 제너레이터 기반의 코루틴과 구분하기 위해 async def로 만든 코루틴은 네이티브 코루틴이라고 합니다. async def 키워드는 파이썬 3.5 이상부터 사용 가능.

async def 함수이름():
    코드

Hello, World Example

import asyncio

async def hello():    # async def로 네이티브 코루틴을 만듦
    print('Hello, world!')

loop = asyncio.get_event_loop()     # 이벤트 루프를 얻음
loop.run_until_complete(hello())    # hello가 끝날 때까지 기다림
loop.close()                        # 이벤트 루프를 닫음

await 로 네이티브 코루틴 실행하기

이번에는 await로 네이티브 코루틴을 실행하는 방법입니다. 다음과 같이 await 뒤에 코루틴 객체, 퓨처 객체, 태스크 객체를 지정하면 해당 객체가 끝날 때까지 기다린 뒤 결과를 반환합니다. await는 단어 뜻 그대로 특정 객체가 끝날 때까지 기다립니다.

await 키워드는 파이썬 3.5 이상부터 사용 가능, 3.4에서는 yield from을 사용

  • 변수 = await 코루틴객체
  • 변수 = await 퓨처객체
  • 변수 = await 태스크객체

여기서 주의할 점이 있는데 await는 네이티브 코루틴 안에서만 사용할 수 있습니다. 그럼 두 수를 더하는 네이티브 코루틴을 만들고 코루틴에서 1초 대기한 뒤 결과를 반환해보겠습니다.

import asyncio

async def add(a, b):
    print('add: {0} + {1}'.format(a, b))
    await asyncio.sleep(1.0)    # 1초 대기. asyncio.sleep도 네이티브 코루틴
    return a + b    # 두 수를 더한 결과 반환

async def print_add(a, b):
    result = await add(a, b)    # await로 다른 네이티브 코루틴 실행하고 반환값을 변수에 저장
    print('print_add: {0} + {1} = {2}'.format(a, b, result))

loop = asyncio.get_event_loop()             # 이벤트 루프를 얻음
loop.run_until_complete(print_add(1, 2))    # print_add가 끝날 때까지 이벤트 루프를 실행
loop.close()                                # 이벤트 루프를 닫음

Future vs Task

퓨처 (asyncio.Future)
미래에 할 일을 표현하는 클래스인데 할 일을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.
태스크 (asyncio.Task)
asyncio.Future의 파생 클래스이며 asyncio.Future의 기능과 실행할 코루틴의 객체를 포함하고 있습니다.
태스크는 코루틴의 실행을 취소하거나 상태 확인, 완료 및 결과 설정에 사용합니다.

Synchronization Primitives

동시에 태스크 실행하기

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False) 를 사용하면 된다.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

퓨처

완료 콜백

add_done_callback(callback, *, context=None)를 사용하면 된다.

Future가 완료(done)될 때 실행할 콜백을 추가합니다. functools.partial을 사용하여 매개 변수를 callback에 전달할 수 있습니다, 예를 들어:

# Call 'print("Future:", fut)' when "fut" is done.
fut.add_done_callback(
    functools.partial(print, "Future:"))

태스크

태스크 만들기

asyncio.create_task(coro, *, name=None)를 사용한다:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

태스크 취소

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

async with

그럼 1초 뒤에 덧셈 결과를 반환하는 클래스를 만들어보겠습니다.

import asyncio

class AsyncAdd:
    def __init__(self, a, b):
        self.a = a
        self.b = b

    async def __aenter__(self):
        await asyncio.sleep(1.0)
        return self.a + self.b    # __aenter__에서 값을 반환하면 as에 지정한 변수에 들어감

    async def __aexit__(self, exc_type, exc_value, traceback):
        pass

async def main():
    async with AsyncAdd(1, 2) as result:    # async with에 클래스의 인스턴스 지정
        print(result)    # 3

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

async for

import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current < self.stop:
            await asyncio.sleep(1.0)
            r = self.current
            self.current += 1
            return r
        else:
            raise StopAsyncIteration

async def main():
    async for i in AsyncCounter(3):    # for 앞에 async를 붙임
        print(i, end=' ')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

run_in_executor

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

if __name__ == '__main__':
    asyncio.run(main())

전체 테스크 반환

import asyncio
import sys

def all_tasks(loop: asyncio.AbstractEventLoop):
    if sys.version_info >= (3, 7):
        return getattr(asyncio, "all_tasks")(loop)  # nocov
    else:
        tasks = asyncio.Task.all_tasks(loop)  # type: ignore[attr-defined]
        return {t for t in list(tasks) if not t.done()}

await 에 우선 순위를 부여하는 방법 (PriorityQueue 를 사용한 방법)

await 를 사용하여 직접적으로 제어하는 방법은 없으므로 우선순위 큐를 사용한다.

import asyncio

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def main():
    # 우선순위 큐 생성
    queue = asyncio.PriorityQueue()

    # 작업을 우선순위와 함께 큐에 추가
    await queue.put((0, high_priority_task()))
    await queue.put((1, low_priority_task()))

    # 큐에서 작업을 가져와 실행
    while not queue.empty():
        _, task = await queue.get()
        await task

asyncio.run(main())

asyncio.wait

  • asyncio.FIRST_COMPLETED - 퓨처가 하나라도 끝나거나 취소될 때 함수가 반환됩니다.
  • asyncio.FIRST_EXCEPTION - 예외가 발생되어 퓨처가 완료되면 반환. 예외가 없을 경우 ALL_COMPLETED 와 동일.
  • asyncio.ALL_COMPLETED - 모든 퓨처가 끝나거나 취소되면 함수가 반환됩니다.

asyncio.as_completed

완료 순서로 반환.

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

asyncio.to_thread

별도의 스레드에서 func 함수를 비동기적으로 실행합니다.

다음 코드를:

result = requests.get('https://python.org/')

다음과 같이 변경:

result = await asyncio.to_thread(requests.get, 'https://python.org/')

async 함수가 아닌 함수에서 테스크 목록 취소

import asyncio
import sys

def cancel_tasks(loop: asyncio.AbstractEventLoop, *tasks) -> None:
    if not tasks:
        return

    for task in tasks:
        task.cancel()

    loop.run_until_complete(asyncio.gather(*tasks, loop=loop, return_exceptions=True))

    for task in tasks:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler(
                {
                    "message": "Unhandled exception during asyncio.run() shutdown",
                    "exception": task.exception(),
                    "task": task,
                }
            )

Subprocesses

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

asyncio.run(run('ls /zzz'))

shlex를 사용한 cmd 적용.

app['process'] = await asyncio.create_subprocess_exec(
                           shlex.split(cmd), stdout=subprocess.PIPE)

Timeout 구현 (asyncio.wait_for)

asyncio.wait_for를 사용하면 된다:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

TCP/IP 소켓 오픈:

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass

Cancel 가능한 Sleep Task 구현

# -*- coding: utf-8 -*-

from asyncio import CancelledError, Task, ensure_future
from asyncio import sleep as asyncio_sleep
from typing import Any, Dict


class Sleeper:
    def __init__(self):
        self.tasks: Dict[Any, Task] = dict()

    async def sleep(self, key: Any, seconds: float, result=None, *, loop=None) -> Any:
        coro = asyncio_sleep(seconds, result=result, loop=loop)
        task = ensure_future(coro)
        self.tasks[key] = task
        try:
            return await task
        except CancelledError:
            return result
        finally:
            del self.tasks[key]

    def cancel(self, key: Any) -> None:
        self.tasks[key].cancel()

    def cancel_all(self) -> None:
        for task in self.tasks.values():
            task.cancel()

파이썬 3.4 이하에서 asyncio 사용하기

async defawait는 파이썬 3.5에서 추가되었습니다. 따라서 3.5 미만 버전에서는 사용할 수 없습니다. 파이썬 3.4에서는 다음과 같이 @asyncio.coroutine 데코레이터로 네이티브 코루틴을 만듭니다.

import asyncio

@asyncio.coroutine
async def 함수이름():
    코드

파이썬 3.4에서는 await가 아닌 yield from을 사용합니다.

  • 변수 = yield from 코루틴객체
  • 변수 = yield from 퓨처객체
  • 변수 = yield from 태스크객체

파이썬 3.3에서 asynciopip install asyncioasyncio를 설치한 뒤 @asyncio.coroutine 데코레이터와 yield from을 사용하면 됩니다. 단, 3.3 미만 버전에서는 asyncio를 지원하지 않습니다.

Example

import asyncio
import aiohttp

urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']

async def call_url(url):
    print('Starting {}'.format(url))
    response = await aiohttp.get(url)
    data = await response.text()
    print('{}: {} bytes: {}'.format(url, len(data), data))
    return data

futures = [call_url(url) for url in urls]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

TCP Echo Server

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

TCP Echo Client

A TCP echo client using the loop.create_connection() method, sends data, and waits until the connection is closed:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

ThreadPoolExecutor vs asyncio

Python:concurrent.futures#ThreadPoolExecutor vs asyncio 항목 참조.

Python Concurrency Mind Maps

Python_Concurrency_Mind_Maps.jpeg

Troubleshooting

pipe closed by peer or os.write(pipe, data) raised exception

asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
asyncio pipe closed by peer or os.write(pipe, data) raised exception.
...

drain을 사용하자.

proc.stdin.write(f.read())
await proc.stdin.drain()

See also

Favorite site

Tutorials

Guide

References


  1. Asynchronous_Python_-_mingrammers_note.pdf 

  2. Speed_Up_Your_Python_Program_With_Concurrency_-_Che1s_blog.pdf