Skip to content

Asyncio:AsyncPythonSubprocess

Python의 asyncio패키지를 사용하여 Subprocess를 기동하는 클래스 구현.

async_subprocess.py Code

# -*- coding: utf-8 -*-

import os
from asyncio import (
    Task,
    create_subprocess_exec,
    create_subprocess_shell,
    create_task,
    gather,
)
from asyncio.streams import StreamReader, StreamWriter
from asyncio.subprocess import PIPE, Process
from enum import Enum
from functools import reduce
from io import BytesIO
from os import environ
from typing import Callable, Dict, Mapping, Optional, Tuple

import psutil


def get_os_envs_dict() -> Dict[str, str]:
    return {k: str(environ.get(k)) for k in environ if environ}


ReaderCallable = Callable[[bytes], None]


class SubprocessMethod(Enum):
    Exec = 0
    Shell = 1


# class SubprocessStatus(Enum):
#     STATUS_RUNNING
#     STATUS_SLEEPING
#     STATUS_DISK_SLEEP
#     STATUS_STOPPED
#     STATUS_TRACING_STOP
#     STATUS_ZOMBIE
#     STATUS_DEAD
#     STATUS_WAKE_KILL
#     STATUS_WAKING
#     STATUS_PARKED(Linux)
#     STATUS_IDLE(Linux, macOS, FreeBSD)
#     STATUS_LOCKED(FreeBSD)
#     STATUS_WAITING(FreeBSD)
#     STATUS_SUSPENDED(NetBSD)


class AsyncSubprocess:
    def __init__(
        self,
        *commands,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
        cwd: Optional[str] = None,
        env: Optional[Mapping[str, str]] = None,
        writable=False,
        method=SubprocessMethod.Exec,
    ):
        self._commands = commands
        self._stdout_callback = stdout_callback
        self._stderr_callback = stderr_callback
        self._writable = writable

        self._process: Optional[Process] = None
        self._stdout_task: Optional[Task] = None
        self._stderr_task: Optional[Task] = None
        self._method = method
        self._cwd = cwd if cwd else os.getcwd()
        self._env = dict(env) if env is not None else get_os_envs_dict()

    def is_started(self) -> bool:
        return self._process is not None

    @staticmethod
    async def _reader(reader: StreamReader, callback: ReaderCallable) -> None:
        while not reader.at_eof():
            buffer = await reader.readline()
            callback(buffer)

    @property
    def stdin_flag(self) -> Optional[int]:
        return PIPE if self._writable else None

    @property
    def stdout_flag(self) -> Optional[int]:
        return PIPE if self._stdout_callback else None

    @property
    def stderr_flag(self) -> Optional[int]:
        return PIPE if self._stderr_callback else None

    async def _create_subprocess_exec(self):
        """
        DeprecationWarning:
        The `loop` argument is deprecated since Python 3.8
        and scheduled for removal in Python 3.10
        """
        return await create_subprocess_exec(
            self._commands[0],
            *self._commands[1:],
            stdin=self.stdin_flag,
            stdout=self.stdout_flag,
            stderr=self.stderr_flag,
            executable=None,
            cwd=self._cwd,
            env=self._env,
        )

    async def _create_subprocess_shell(self) -> Process:
        total_commands = [f"'{str(c).strip()}'" for c in self._commands]
        merged_commands = reduce(lambda x, y: f"{x} {y}", total_commands[1:])

        """
        DeprecationWarning:
        The `loop` argument is deprecated since Python 3.8
        and scheduled for removal in Python 3.10
        """
        return await create_subprocess_shell(
            merged_commands,
            stdin=self.stdin_flag,
            stdout=self.stdout_flag,
            stderr=self.stderr_flag,
            executable=None,
            cwd=self._cwd,
            env=self._env,
        )

    async def create_subprocess(self) -> Process:
        if self._method == SubprocessMethod.Exec:
            return await self._create_subprocess_exec()
        elif self._method == SubprocessMethod.Shell:
            return await self._create_subprocess_shell()
        else:
            raise NotImplementedError

    async def start(self) -> None:
        if self._process is not None:
            raise RuntimeError("Already started process")

        self._process = await self.create_subprocess()

        if self.stdout_flag:
            assert self._process.stdout is not None
            assert self._stdout_callback is not None
            self._stdout_task = create_task(
                self._reader(self._process.stdout, self._stdout_callback)
            )

        if self.stderr_flag:
            assert self._process.stderr is not None
            assert self._stderr_callback is not None
            self._stderr_task = create_task(
                self._reader(self._process.stderr, self._stderr_callback)
            )

    @property
    def process(self) -> Process:
        if not self._process:
            raise RuntimeError("Process is not started")
        return self._process

    @property
    def stdin(self) -> StreamWriter:
        if not self._process:
            raise RuntimeError("Process is not started")
        if not self._writable:
            raise RuntimeError("The writable flag is disabled")
        assert self._process.stdin
        return self._process.stdin

    def write(self, data: bytes) -> None:
        self.stdin.write(data)

    def writelines(self, data) -> None:
        self.stdin.writelines(data)

    def write_eof(self) -> None:
        self.stdin.write_eof()

    def can_write_eof(self) -> bool:
        return self.stdin.can_write_eof()

    def close_stdin(self) -> None:
        self.stdin.close()

    def is_closing_stdin(self) -> bool:
        return self.stdin.is_closing()

    async def wait_closed_stdin(self) -> None:
        await self.stdin.wait_closed()

    async def drain_stdin(self) -> None:
        await self.stdin.drain()

    async def wait_process(self) -> int:
        return await self.process.wait()

    async def wait_callbacks(self) -> None:
        futures = list()
        if self._stdout_task:
            futures.append(self._stdout_task)
        if self._stderr_task:
            futures.append(self._stderr_task)
        await gather(*futures)

    async def wait(self) -> int:
        exit_code = await self.wait_process()
        await self.wait_callbacks()
        return exit_code

    def send_signal(self, signal) -> None:
        self.process.send_signal(signal)

    def terminate(self) -> None:
        self.process.terminate()

    def kill(self) -> None:
        self.process.kill()

    def get_pid(self) -> int:
        return self.process.pid

    # noinspection PyTypeChecker
    @property
    def status(self) -> str:
        return psutil.Process(self.process.pid).status()
        # psutil.STATUS_RUNNING
        # psutil.STATUS_SLEEPING
        # psutil.STATUS_DISK_SLEEP
        # psutil.STATUS_STOPPED
        # psutil.STATUS_TRACING_STOP
        # psutil.STATUS_ZOMBIE
        # psutil.STATUS_DEAD
        # psutil.STATUS_WAKE_KILL
        # psutil.STATUS_WAKING
        # psutil.STATUS_PARKED(Linux)
        # psutil.STATUS_IDLE(Linux, macOS, FreeBSD)
        # psutil.STATUS_LOCKED(FreeBSD)
        # psutil.STATUS_WAITING(FreeBSD)
        # psutil.STATUS_SUSPENDED(NetBSD)

    def is_running(self) -> bool:
        return self.status == psutil.STATUS_RUNNING


async def start_async_subprocess(
    *commands,
    stdout_callback: Optional[ReaderCallable] = None,
    stderr_callback: Optional[ReaderCallable] = None,
    cwd: Optional[str] = None,
    env: Optional[Mapping[str, str]] = None,
    writable=False,
    method=SubprocessMethod.Exec,
) -> AsyncSubprocess:
    proc = AsyncSubprocess(
        *commands,
        stdout_callback=stdout_callback,
        stderr_callback=stderr_callback,
        cwd=cwd,
        env=env,
        writable=writable,
        method=method,
    )
    await proc.start()
    return proc


async def start_async_subprocess_simply(
    *commands,
    cwd: Optional[str] = None,
    env: Optional[Mapping[str, str]] = None,
    writable=False,
    method=SubprocessMethod.Exec,
) -> Tuple[int, bytes, bytes]:
    stdout = BytesIO()
    stderr = BytesIO()

    def _stdout_callback(data: bytes) -> None:
        stdout.write(data)

    def _stderr_callback(data: bytes) -> None:
        stderr.write(data)

    proc = await start_async_subprocess(
        *commands,
        stdout_callback=_stdout_callback,
        stderr_callback=_stderr_callback,
        cwd=cwd,
        env=env,
        writable=writable,
        method=method,
    )
    exit_code = await proc.wait()
    return exit_code, stdout.getvalue(), stderr.getvalue()

위의 클래스를 재활용한 클래스 샘플은 AsyncVirtualEnvironment 을 참조.

TestCase

# -*- coding: utf-8 -*-

from collections import deque
from sys import executable, version_info
from unittest import IsolatedAsyncioTestCase, main

from recc.subprocess.async_subprocess import start_async_subprocess


class AsyncSubprocessTestCase(IsolatedAsyncioTestCase):
    async def test_default(self):
        stdout_deque = deque()
        stderr_deque = deque()

        def _stdout(data: bytes) -> None:
            line = str(data, encoding="utf-8").strip()
            if line:
                stdout_deque.append(line)

        def _stderr(data: bytes) -> None:
            line = str(data, encoding="utf-8").strip()
            if line:
                stderr_deque.append(line)

        proc = await start_async_subprocess(
            executable,
            "--version",
            stdout_callback=_stdout,
            stderr_callback=_stderr,
        )
        version_text = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"
        self.assertEqual(0, await proc.wait())
        self.assertEqual(1, len(stdout_deque))
        self.assertEqual(f"Python {version_text}", stdout_deque.popleft())
        self.assertEqual(0, len(stderr_deque))


if __name__ == "__main__":
    main()

async_python_subprocess.py Code

# -*- coding: utf-8 -*-

import os
import sys
from dataclasses import dataclass
from functools import reduce
from typing import Dict, List, Mapping, Optional, Tuple

from recc.driver.json import global_json_decoder
from recc.subprocess.async_subprocess import (
    AsyncSubprocess,
    ReaderCallable,
    SubprocessMethod,
    start_async_subprocess,
)

PROGRESS_BAR_STYLE_OFF = "off"
PROGRESS_BAR_STYLE_ASCII = "ascii"
"""
.. deprecated:: pip 22.1
    Custom progress bar styles are deprecated pip 22.1
    will enforce this behaviour change.
"""

DEFAULT_PROGRESS_BAR_STYLE = PROGRESS_BAR_STYLE_OFF
PROGRESS_BAR_STYLE_FLAG = f"--progress-bar={DEFAULT_PROGRESS_BAR_STYLE}"


class Package(object):

    __slots__ = ("name", "version")

    name: str
    version: str

    def __init__(self, name: str, version: str):
        self.name = name
        self.version = version

    def __str__(self) -> str:
        return f"{self.name}=={self.version}"

    def __repr__(self) -> str:
        return f"Package<name={self.name},version={self.version}>"


@dataclass
class PackageInfo:
    name: str
    version: str
    summary: str
    homepage: str
    author: str
    author_email: str
    license: str
    location: str
    requires: List[str]
    required_by: List[str]


class AsyncPythonSubprocess:
    def __init__(
        self,
        executable: Optional[str] = None,
        pip_timeout: Optional[float] = None,
        env: Optional[Mapping[str, str]] = None,
        method=SubprocessMethod.Exec,
    ):
        self._executable = executable if executable else sys.executable
        self._pip_timeout = pip_timeout if pip_timeout else 0.0

        self.env = dict(env) if env is not None else dict()
        self.method = method

    @property
    def executable(self) -> str:
        return self._executable

    @property
    def executable_dir(self) -> str:
        return os.path.dirname(self._executable)

    @classmethod
    def create_system(cls, pip_timeout: Optional[float] = None):
        return cls(sys.executable, pip_timeout)

    async def start_python(
        self,
        *subcommands,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
        cwd: Optional[str] = None,
        env: Optional[Mapping[str, str]] = None,
        writable=False,
        method=SubprocessMethod.Exec,
    ) -> AsyncSubprocess:
        if not subcommands:
            ValueError("Empty subcommands arguments")

        total_commands = [self._executable, *subcommands]
        proc = await start_async_subprocess(
            *total_commands,
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
            cwd=cwd,
            env=env,
            writable=writable,
            method=method,
        )
        return proc

    def make_pip_subcommands(self, *subcommands) -> List[str]:
        result = [
            "-m",
            "pip",
            "--no-color",
            "--no-input",
            "--disable-pip-version-check",
            "--no-python-version-warning",
        ]
        if self._pip_timeout > 0.0:
            result += ["--timeout", str(self._pip_timeout)]
        if subcommands:
            result += subcommands
        return result

    async def start_pip(
        self,
        *subcommands,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
        cwd: Optional[str] = None,
        env: Optional[Mapping[str, str]] = None,
        writable=False,
        method=SubprocessMethod.Exec,
    ) -> AsyncSubprocess:
        return await self.start_python(
            *self.make_pip_subcommands(*subcommands),
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
            cwd=cwd,
            env=env,
            writable=writable,
            method=method,
        )

    async def start_python_simply(self, *subcommands) -> Tuple[List[str], List[str]]:
        stdout_lines: List[str] = list()
        stderr_lines: List[str] = list()

        def _stdout_callback(data: bytes) -> None:
            line = str(data, encoding="utf-8").strip()
            if line:
                stdout_lines.append(line)

        def _stderr_callback(data: bytes) -> None:
            line = str(data, encoding="utf-8").strip()
            if line:
                stderr_lines.append(line)

        proc = await self.start_python(
            *subcommands,
            stdout_callback=_stdout_callback,
            stderr_callback=_stderr_callback,
            cwd=None,
            env=self.env,
            writable=False,
            method=self.method,
        )
        exit_code = await proc.wait()

        if exit_code != 0:
            params_msg = f"code={exit_code}"
            if stdout_lines:
                stdout_text = reduce(lambda x, y: f"{x} {y}", stdout_lines)
                params_msg += f",stdout={stdout_text}"
            if stderr_lines:
                stderr_text = reduce(lambda x, y: f"{x} {y}", stderr_lines)
                params_msg += f",stderr={stderr_text}"
            error_msg = f"python {subcommands[0]} error: {params_msg}"
            raise RuntimeError(error_msg)

        return stdout_lines, stderr_lines

    async def start_pip_simply(self, *subcommands) -> Tuple[List[str], List[str]]:
        return await self.start_python_simply(*self.make_pip_subcommands(*subcommands))

    async def ensure_pip(self, isolate=True) -> Tuple[List[str], List[str]]:
        """
        Run ``ensure_pip`` module.

        :param isolate:
            If pydevd is connected, the ``python -Im ensure_pip`` command does not work
            properly. In this case, it is temporarily resolved by using the ``isolate``
            flag as ``False``. If possible, use only for debugging and testing purposes.
        """

        return await self.start_python_simply(
            "-Im" if isolate else "-m",
            "ensurepip",
            "--upgrade",
            "--default-pip",
        )

    async def recc_version(self) -> str:
        stdout_lines, _ = await self.start_python_simply("-m", "recc", "--version")
        assert len(stdout_lines) == 1
        return stdout_lines[0].strip()

    async def version(self) -> str:
        stdout_lines, _ = await self.start_python_simply("--version")
        assert len(stdout_lines) == 1
        prefix, version = stdout_lines[0].split(" ", 1)
        assert prefix == "Python"
        return version

    async def version_tuple(self) -> Tuple[int, int, int]:
        versions = [int(i) for i in (await self.version()).split(".")]
        assert len(versions) == 3
        return versions[0], versions[1], versions[2]

    async def download(
        self,
        package: str,
        destination: str,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
    ) -> int:
        proc = await self.start_pip(
            "download",
            "--dest",
            destination,
            PROGRESS_BAR_STYLE_FLAG,
            package,
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
        )
        return await proc.wait()

    async def install(
        self,
        package: str,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
    ) -> int:
        proc = await self.start_pip(
            "install",
            PROGRESS_BAR_STYLE_FLAG,
            package,
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
        )
        return await proc.wait()

    async def upgrade(
        self,
        package: str,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
    ) -> int:
        proc = await self.start_pip(
            "install",
            PROGRESS_BAR_STYLE_FLAG,
            "--upgrade",
            package,
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
        )
        return await proc.wait()

    async def uninstall(
        self,
        package: str,
        stdout_callback: Optional[ReaderCallable] = None,
        stderr_callback: Optional[ReaderCallable] = None,
    ) -> int:
        proc = await self.start_pip(
            "uninstall",
            "--yes",
            package,
            stdout_callback=stdout_callback,
            stderr_callback=stderr_callback,
        )
        return await proc.wait()

    async def list(self) -> List[Package]:
        stdout_lines, _ = await self.start_pip_simply("list", "--format=json")
        json_text = reduce(lambda x, y: f"{x}{y}", stdout_lines)
        packages = global_json_decoder(json_text)
        return [Package(p["name"], p["version"]) for p in packages]

    async def show(self, package: str) -> Dict[str, str]:
        stdout_lines, _ = await self.start_pip_simply("show", package)
        result = dict()
        for header_line in stdout_lines:
            # The output is in RFC-compliant mail header format.
            items = header_line.split(":", maxsplit=1)
            assert len(items) == 2
            key = items[0].strip()
            val = items[1].strip()
            result[key] = val
        return result

    async def show_as_info(self, package: str) -> PackageInfo:
        info = await self.show(package)

        def _split_packages(text: str) -> List[str]:
            return list(
                filter(
                    lambda x: bool(x),
                    map(
                        lambda x: x.strip(),
                        text.split(","),
                    ),
                )
            )

        name = info.get("Name", "").strip()
        version = info.get("Version", "").strip()
        summary = info.get("Summary", "").strip()
        homepage = info.get("Home-page", "").strip()
        author = info.get("Author", "").strip()
        author_email = info.get("Author-email", "").strip()
        license_ = info.get("License", "").strip()  # Shadows built-in name 'license'
        location = info.get("Location", "").strip()
        requires = _split_packages(info.get("Requires", ""))
        required_by = _split_packages(info.get("Required-by", ""))

        return PackageInfo(
            name,
            version,
            summary,
            homepage,
            author,
            author_email,
            license_,
            location,
            requires,
            required_by,
        )

TestCase

# -*- coding: utf-8 -*-

import os
from sys import executable, version_info
from unittest import IsolatedAsyncioTestCase, main

from recc.subprocess.async_python_subprocess import AsyncPythonSubprocess
from recc.util.version import parse_version_numbers


class AsyncPythonSubprocessTestCase(IsolatedAsyncioTestCase):
    async def test_version_tuple(self):
        python = AsyncPythonSubprocess(executable)
        version = await python.version_tuple()
        self.assertEqual(version_info[:3], version)

    async def test_list(self):
        print("test_list cwd: ", os.getcwd())
        python = AsyncPythonSubprocess(executable)
        packages = await python.list()
        self.assertLess(0, len(packages))

    async def test_show(self):
        python = AsyncPythonSubprocess(executable)
        show = await python.show("numpy")
        self.assertLess(0, len(show))
        self.assertEqual("numpy", show["Name"])

    async def test_show_as_info(self):
        python = AsyncPythonSubprocess(executable)
        show = await python.show_as_info("numpy")
        self.assertEqual("numpy", show.name)
        self.assertLessEqual((1, 22, 2), parse_version_numbers(show.version))
        summary = "NumPy is the fundamental package for array computing with Python."
        self.assertEqual(summary, show.summary)
        self.assertEqual("https://www.numpy.org", show.homepage)
        self.assertEqual("Travis E. Oliphant et al.", show.author)
        self.assertEqual("", show.author_email)
        self.assertEqual("BSD", show.license)
        self.assertTrue(os.path.isdir(show.location))
        self.assertEqual(0, len(show.requires))
        self.assertLess(0, len(show.required_by))


if __name__ == "__main__":
    main()

Download pip packages

중복되는 파일 없이 패키지를 다운로드 받고 캐시할 수 있도록 한다:

    @staticmethod
    def _read_hash(path: str, method: str) -> str:
        with open(path, "rb") as f:
            content = f.read()
            if method == PIP_HASH_METHOD_SHA256:
                return sha256(content).hexdigest()
            else:
                raise ValueError(f"Unsupported hash method: '{method}'")

    async def _exists_pip_package(self, domain: str, package: str) -> bool:
        assert self._local_storage
        assert self._database
        assert self._database.is_open()

        pip_infos = await self._database.select_pip_by_domain_and_name(domain, package)
        if not pip_infos:
            return False

        assert len(pip_infos) >= 1
        pip_download_dir = self._local_storage.pip_download

        for pip_info in pip_infos:
            pip_path = os.path.join(pip_download_dir, pip_info.file)
            if not os.path.isfile(pip_path):
                logger.warning(f"File not found: {pip_path}")
                return False

            try:
                hash_value = self._read_hash(pip_path, pip_info.hash_method)
                if hash_value != pip_info.hash_value:
                    logger.warning(f"Hash mismatch: {pip_path}")
                    return False
            except ValueError:
                return False

        return True

    async def download_pip_packages(
        self,
        domain: str,
        hash_method: str,
        logging_encoding="utf-8",
    ) -> None:
        assert self._local_storage
        assert self._database
        assert self._database.is_open()

        def _stdout_callback(data: bytes) -> None:
            line = str(data, encoding=logging_encoding).rstrip()
            if line:
                logger.debug(line)

        def _stderr_callback(data: bytes) -> None:
            line = str(data, encoding=logging_encoding).rstrip()
            if line:
                logger.warning(line)

        for package in RECC_REQUIREMENTS_MAIN:
            if await self._exists_pip_package(domain, package):
                logger.debug(f"Exists pip package '{package}'")
                continue
            else:
                await self._database.delete_pip_by_domain_and_name(domain, package)

            with self._local_storage.create_temporary_directory() as tmpdir:
                logger.debug(f"Run pip download '{package}'")
                code = await AsyncPythonSubprocess.create_system().download(
                    package,
                    tmpdir,
                    _stdout_callback,
                    _stderr_callback,
                )

                if code == 0:
                    logger.debug(f"Subprocess is done: pip download '{package}'")
                else:
                    raise RuntimeError(f"Error({code}) pip download '{package}'")

                for filename in os.listdir(tmpdir):
                    filepath = os.path.abspath(os.path.join(tmpdir, filename))
                    assert os.path.isfile(filepath)
                    hash_value = self._read_hash(filepath, hash_method)

                    dest = os.path.join(self._local_storage.pip_download, filename)
                    if os.path.exists(dest):
                        try:
                            os.remove(dest)
                            logger.debug(f"Remove the existing pip file: '{dest}'")
                        except BaseException as e:
                            logger.warning(f"Error remove existing pip file: {e}")
                            continue

                    try:
                        move(filepath, dest)
                    except BaseException as e:
                        logger.warning(f"Error moving downloaded pip file: {e}")
                        continue

                    try:
                        await self._database.insert_pip(
                            domain, package, filename, hash_method, hash_value
                        )
                    except BaseException as e:
                        logger.warning(f"Database insert error: {e}")
                        continue

                logger.debug(f"Done pip download '{package}'")

See also