Asyncio:AsyncPythonSubprocess
Python의 asyncio패키지를 사용하여 Subprocess를 기동하는 클래스 구현.
async_subprocess.py Code
# -*- coding: utf-8 -*-
import os
from asyncio import (
Task,
create_subprocess_exec,
create_subprocess_shell,
create_task,
gather,
)
from asyncio.streams import StreamReader, StreamWriter
from asyncio.subprocess import PIPE, Process
from enum import Enum
from functools import reduce
from io import BytesIO
from os import environ
from typing import Callable, Dict, Mapping, Optional, Tuple
import psutil
def get_os_envs_dict() -> Dict[str, str]:
return {k: str(environ.get(k)) for k in environ if environ}
ReaderCallable = Callable[[bytes], None]
class SubprocessMethod(Enum):
Exec = 0
Shell = 1
# class SubprocessStatus(Enum):
# STATUS_RUNNING
# STATUS_SLEEPING
# STATUS_DISK_SLEEP
# STATUS_STOPPED
# STATUS_TRACING_STOP
# STATUS_ZOMBIE
# STATUS_DEAD
# STATUS_WAKE_KILL
# STATUS_WAKING
# STATUS_PARKED(Linux)
# STATUS_IDLE(Linux, macOS, FreeBSD)
# STATUS_LOCKED(FreeBSD)
# STATUS_WAITING(FreeBSD)
# STATUS_SUSPENDED(NetBSD)
class AsyncSubprocess:
def __init__(
self,
*commands,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
writable=False,
method=SubprocessMethod.Exec,
):
self._commands = commands
self._stdout_callback = stdout_callback
self._stderr_callback = stderr_callback
self._writable = writable
self._process: Optional[Process] = None
self._stdout_task: Optional[Task] = None
self._stderr_task: Optional[Task] = None
self._method = method
self._cwd = cwd if cwd else os.getcwd()
self._env = dict(env) if env is not None else get_os_envs_dict()
def is_started(self) -> bool:
return self._process is not None
@staticmethod
async def _reader(reader: StreamReader, callback: ReaderCallable) -> None:
while not reader.at_eof():
buffer = await reader.readline()
callback(buffer)
@property
def stdin_flag(self) -> Optional[int]:
return PIPE if self._writable else None
@property
def stdout_flag(self) -> Optional[int]:
return PIPE if self._stdout_callback else None
@property
def stderr_flag(self) -> Optional[int]:
return PIPE if self._stderr_callback else None
async def _create_subprocess_exec(self):
"""
DeprecationWarning:
The `loop` argument is deprecated since Python 3.8
and scheduled for removal in Python 3.10
"""
return await create_subprocess_exec(
self._commands[0],
*self._commands[1:],
stdin=self.stdin_flag,
stdout=self.stdout_flag,
stderr=self.stderr_flag,
executable=None,
cwd=self._cwd,
env=self._env,
)
async def _create_subprocess_shell(self) -> Process:
total_commands = [f"'{str(c).strip()}'" for c in self._commands]
merged_commands = reduce(lambda x, y: f"{x} {y}", total_commands[1:])
"""
DeprecationWarning:
The `loop` argument is deprecated since Python 3.8
and scheduled for removal in Python 3.10
"""
return await create_subprocess_shell(
merged_commands,
stdin=self.stdin_flag,
stdout=self.stdout_flag,
stderr=self.stderr_flag,
executable=None,
cwd=self._cwd,
env=self._env,
)
async def create_subprocess(self) -> Process:
if self._method == SubprocessMethod.Exec:
return await self._create_subprocess_exec()
elif self._method == SubprocessMethod.Shell:
return await self._create_subprocess_shell()
else:
raise NotImplementedError
async def start(self) -> None:
if self._process is not None:
raise RuntimeError("Already started process")
self._process = await self.create_subprocess()
if self.stdout_flag:
assert self._process.stdout is not None
assert self._stdout_callback is not None
self._stdout_task = create_task(
self._reader(self._process.stdout, self._stdout_callback)
)
if self.stderr_flag:
assert self._process.stderr is not None
assert self._stderr_callback is not None
self._stderr_task = create_task(
self._reader(self._process.stderr, self._stderr_callback)
)
@property
def process(self) -> Process:
if not self._process:
raise RuntimeError("Process is not started")
return self._process
@property
def stdin(self) -> StreamWriter:
if not self._process:
raise RuntimeError("Process is not started")
if not self._writable:
raise RuntimeError("The writable flag is disabled")
assert self._process.stdin
return self._process.stdin
def write(self, data: bytes) -> None:
self.stdin.write(data)
def writelines(self, data) -> None:
self.stdin.writelines(data)
def write_eof(self) -> None:
self.stdin.write_eof()
def can_write_eof(self) -> bool:
return self.stdin.can_write_eof()
def close_stdin(self) -> None:
self.stdin.close()
def is_closing_stdin(self) -> bool:
return self.stdin.is_closing()
async def wait_closed_stdin(self) -> None:
await self.stdin.wait_closed()
async def drain_stdin(self) -> None:
await self.stdin.drain()
async def wait_process(self) -> int:
return await self.process.wait()
async def wait_callbacks(self) -> None:
futures = list()
if self._stdout_task:
futures.append(self._stdout_task)
if self._stderr_task:
futures.append(self._stderr_task)
await gather(*futures)
async def wait(self) -> int:
exit_code = await self.wait_process()
await self.wait_callbacks()
return exit_code
def send_signal(self, signal) -> None:
self.process.send_signal(signal)
def terminate(self) -> None:
self.process.terminate()
def kill(self) -> None:
self.process.kill()
def get_pid(self) -> int:
return self.process.pid
# noinspection PyTypeChecker
@property
def status(self) -> str:
return psutil.Process(self.process.pid).status()
# psutil.STATUS_RUNNING
# psutil.STATUS_SLEEPING
# psutil.STATUS_DISK_SLEEP
# psutil.STATUS_STOPPED
# psutil.STATUS_TRACING_STOP
# psutil.STATUS_ZOMBIE
# psutil.STATUS_DEAD
# psutil.STATUS_WAKE_KILL
# psutil.STATUS_WAKING
# psutil.STATUS_PARKED(Linux)
# psutil.STATUS_IDLE(Linux, macOS, FreeBSD)
# psutil.STATUS_LOCKED(FreeBSD)
# psutil.STATUS_WAITING(FreeBSD)
# psutil.STATUS_SUSPENDED(NetBSD)
def is_running(self) -> bool:
return self.status == psutil.STATUS_RUNNING
async def start_async_subprocess(
*commands,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
writable=False,
method=SubprocessMethod.Exec,
) -> AsyncSubprocess:
proc = AsyncSubprocess(
*commands,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
cwd=cwd,
env=env,
writable=writable,
method=method,
)
await proc.start()
return proc
async def start_async_subprocess_simply(
*commands,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
writable=False,
method=SubprocessMethod.Exec,
) -> Tuple[int, bytes, bytes]:
stdout = BytesIO()
stderr = BytesIO()
def _stdout_callback(data: bytes) -> None:
stdout.write(data)
def _stderr_callback(data: bytes) -> None:
stderr.write(data)
proc = await start_async_subprocess(
*commands,
stdout_callback=_stdout_callback,
stderr_callback=_stderr_callback,
cwd=cwd,
env=env,
writable=writable,
method=method,
)
exit_code = await proc.wait()
return exit_code, stdout.getvalue(), stderr.getvalue()
위의 클래스를 재활용한 클래스 샘플은 AsyncVirtualEnvironment 을 참조.
TestCase
# -*- coding: utf-8 -*-
from collections import deque
from sys import executable, version_info
from unittest import IsolatedAsyncioTestCase, main
from recc.subprocess.async_subprocess import start_async_subprocess
class AsyncSubprocessTestCase(IsolatedAsyncioTestCase):
async def test_default(self):
stdout_deque = deque()
stderr_deque = deque()
def _stdout(data: bytes) -> None:
line = str(data, encoding="utf-8").strip()
if line:
stdout_deque.append(line)
def _stderr(data: bytes) -> None:
line = str(data, encoding="utf-8").strip()
if line:
stderr_deque.append(line)
proc = await start_async_subprocess(
executable,
"--version",
stdout_callback=_stdout,
stderr_callback=_stderr,
)
version_text = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"
self.assertEqual(0, await proc.wait())
self.assertEqual(1, len(stdout_deque))
self.assertEqual(f"Python {version_text}", stdout_deque.popleft())
self.assertEqual(0, len(stderr_deque))
if __name__ == "__main__":
main()
async_python_subprocess.py Code
# -*- coding: utf-8 -*-
import os
import sys
from dataclasses import dataclass
from functools import reduce
from typing import Dict, List, Mapping, Optional, Tuple
from recc.driver.json import global_json_decoder
from recc.subprocess.async_subprocess import (
AsyncSubprocess,
ReaderCallable,
SubprocessMethod,
start_async_subprocess,
)
PROGRESS_BAR_STYLE_OFF = "off"
PROGRESS_BAR_STYLE_ASCII = "ascii"
"""
.. deprecated:: pip 22.1
Custom progress bar styles are deprecated pip 22.1
will enforce this behaviour change.
"""
DEFAULT_PROGRESS_BAR_STYLE = PROGRESS_BAR_STYLE_OFF
PROGRESS_BAR_STYLE_FLAG = f"--progress-bar={DEFAULT_PROGRESS_BAR_STYLE}"
class Package(object):
__slots__ = ("name", "version")
name: str
version: str
def __init__(self, name: str, version: str):
self.name = name
self.version = version
def __str__(self) -> str:
return f"{self.name}=={self.version}"
def __repr__(self) -> str:
return f"Package<name={self.name},version={self.version}>"
@dataclass
class PackageInfo:
name: str
version: str
summary: str
homepage: str
author: str
author_email: str
license: str
location: str
requires: List[str]
required_by: List[str]
class AsyncPythonSubprocess:
def __init__(
self,
executable: Optional[str] = None,
pip_timeout: Optional[float] = None,
env: Optional[Mapping[str, str]] = None,
method=SubprocessMethod.Exec,
):
self._executable = executable if executable else sys.executable
self._pip_timeout = pip_timeout if pip_timeout else 0.0
self.env = dict(env) if env is not None else dict()
self.method = method
@property
def executable(self) -> str:
return self._executable
@property
def executable_dir(self) -> str:
return os.path.dirname(self._executable)
@classmethod
def create_system(cls, pip_timeout: Optional[float] = None):
return cls(sys.executable, pip_timeout)
async def start_python(
self,
*subcommands,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
writable=False,
method=SubprocessMethod.Exec,
) -> AsyncSubprocess:
if not subcommands:
ValueError("Empty subcommands arguments")
total_commands = [self._executable, *subcommands]
proc = await start_async_subprocess(
*total_commands,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
cwd=cwd,
env=env,
writable=writable,
method=method,
)
return proc
def make_pip_subcommands(self, *subcommands) -> List[str]:
result = [
"-m",
"pip",
"--no-color",
"--no-input",
"--disable-pip-version-check",
"--no-python-version-warning",
]
if self._pip_timeout > 0.0:
result += ["--timeout", str(self._pip_timeout)]
if subcommands:
result += subcommands
return result
async def start_pip(
self,
*subcommands,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
writable=False,
method=SubprocessMethod.Exec,
) -> AsyncSubprocess:
return await self.start_python(
*self.make_pip_subcommands(*subcommands),
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
cwd=cwd,
env=env,
writable=writable,
method=method,
)
async def start_python_simply(self, *subcommands) -> Tuple[List[str], List[str]]:
stdout_lines: List[str] = list()
stderr_lines: List[str] = list()
def _stdout_callback(data: bytes) -> None:
line = str(data, encoding="utf-8").strip()
if line:
stdout_lines.append(line)
def _stderr_callback(data: bytes) -> None:
line = str(data, encoding="utf-8").strip()
if line:
stderr_lines.append(line)
proc = await self.start_python(
*subcommands,
stdout_callback=_stdout_callback,
stderr_callback=_stderr_callback,
cwd=None,
env=self.env,
writable=False,
method=self.method,
)
exit_code = await proc.wait()
if exit_code != 0:
params_msg = f"code={exit_code}"
if stdout_lines:
stdout_text = reduce(lambda x, y: f"{x} {y}", stdout_lines)
params_msg += f",stdout={stdout_text}"
if stderr_lines:
stderr_text = reduce(lambda x, y: f"{x} {y}", stderr_lines)
params_msg += f",stderr={stderr_text}"
error_msg = f"python {subcommands[0]} error: {params_msg}"
raise RuntimeError(error_msg)
return stdout_lines, stderr_lines
async def start_pip_simply(self, *subcommands) -> Tuple[List[str], List[str]]:
return await self.start_python_simply(*self.make_pip_subcommands(*subcommands))
async def ensure_pip(self, isolate=True) -> Tuple[List[str], List[str]]:
"""
Run ``ensure_pip`` module.
:param isolate:
If pydevd is connected, the ``python -Im ensure_pip`` command does not work
properly. In this case, it is temporarily resolved by using the ``isolate``
flag as ``False``. If possible, use only for debugging and testing purposes.
"""
return await self.start_python_simply(
"-Im" if isolate else "-m",
"ensurepip",
"--upgrade",
"--default-pip",
)
async def recc_version(self) -> str:
stdout_lines, _ = await self.start_python_simply("-m", "recc", "--version")
assert len(stdout_lines) == 1
return stdout_lines[0].strip()
async def version(self) -> str:
stdout_lines, _ = await self.start_python_simply("--version")
assert len(stdout_lines) == 1
prefix, version = stdout_lines[0].split(" ", 1)
assert prefix == "Python"
return version
async def version_tuple(self) -> Tuple[int, int, int]:
versions = [int(i) for i in (await self.version()).split(".")]
assert len(versions) == 3
return versions[0], versions[1], versions[2]
async def download(
self,
package: str,
destination: str,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
) -> int:
proc = await self.start_pip(
"download",
"--dest",
destination,
PROGRESS_BAR_STYLE_FLAG,
package,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
)
return await proc.wait()
async def install(
self,
package: str,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
) -> int:
proc = await self.start_pip(
"install",
PROGRESS_BAR_STYLE_FLAG,
package,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
)
return await proc.wait()
async def upgrade(
self,
package: str,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
) -> int:
proc = await self.start_pip(
"install",
PROGRESS_BAR_STYLE_FLAG,
"--upgrade",
package,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
)
return await proc.wait()
async def uninstall(
self,
package: str,
stdout_callback: Optional[ReaderCallable] = None,
stderr_callback: Optional[ReaderCallable] = None,
) -> int:
proc = await self.start_pip(
"uninstall",
"--yes",
package,
stdout_callback=stdout_callback,
stderr_callback=stderr_callback,
)
return await proc.wait()
async def list(self) -> List[Package]:
stdout_lines, _ = await self.start_pip_simply("list", "--format=json")
json_text = reduce(lambda x, y: f"{x}{y}", stdout_lines)
packages = global_json_decoder(json_text)
return [Package(p["name"], p["version"]) for p in packages]
async def show(self, package: str) -> Dict[str, str]:
stdout_lines, _ = await self.start_pip_simply("show", package)
result = dict()
for header_line in stdout_lines:
# The output is in RFC-compliant mail header format.
items = header_line.split(":", maxsplit=1)
assert len(items) == 2
key = items[0].strip()
val = items[1].strip()
result[key] = val
return result
async def show_as_info(self, package: str) -> PackageInfo:
info = await self.show(package)
def _split_packages(text: str) -> List[str]:
return list(
filter(
lambda x: bool(x),
map(
lambda x: x.strip(),
text.split(","),
),
)
)
name = info.get("Name", "").strip()
version = info.get("Version", "").strip()
summary = info.get("Summary", "").strip()
homepage = info.get("Home-page", "").strip()
author = info.get("Author", "").strip()
author_email = info.get("Author-email", "").strip()
license_ = info.get("License", "").strip() # Shadows built-in name 'license'
location = info.get("Location", "").strip()
requires = _split_packages(info.get("Requires", ""))
required_by = _split_packages(info.get("Required-by", ""))
return PackageInfo(
name,
version,
summary,
homepage,
author,
author_email,
license_,
location,
requires,
required_by,
)
TestCase
# -*- coding: utf-8 -*-
import os
from sys import executable, version_info
from unittest import IsolatedAsyncioTestCase, main
from recc.subprocess.async_python_subprocess import AsyncPythonSubprocess
from recc.util.version import parse_version_numbers
class AsyncPythonSubprocessTestCase(IsolatedAsyncioTestCase):
async def test_version_tuple(self):
python = AsyncPythonSubprocess(executable)
version = await python.version_tuple()
self.assertEqual(version_info[:3], version)
async def test_list(self):
print("test_list cwd: ", os.getcwd())
python = AsyncPythonSubprocess(executable)
packages = await python.list()
self.assertLess(0, len(packages))
async def test_show(self):
python = AsyncPythonSubprocess(executable)
show = await python.show("numpy")
self.assertLess(0, len(show))
self.assertEqual("numpy", show["Name"])
async def test_show_as_info(self):
python = AsyncPythonSubprocess(executable)
show = await python.show_as_info("numpy")
self.assertEqual("numpy", show.name)
self.assertLessEqual((1, 22, 2), parse_version_numbers(show.version))
summary = "NumPy is the fundamental package for array computing with Python."
self.assertEqual(summary, show.summary)
self.assertEqual("https://www.numpy.org", show.homepage)
self.assertEqual("Travis E. Oliphant et al.", show.author)
self.assertEqual("", show.author_email)
self.assertEqual("BSD", show.license)
self.assertTrue(os.path.isdir(show.location))
self.assertEqual(0, len(show.requires))
self.assertLess(0, len(show.required_by))
if __name__ == "__main__":
main()
Download pip packages
중복되는 파일 없이 패키지를 다운로드 받고 캐시할 수 있도록 한다:
@staticmethod
def _read_hash(path: str, method: str) -> str:
with open(path, "rb") as f:
content = f.read()
if method == PIP_HASH_METHOD_SHA256:
return sha256(content).hexdigest()
else:
raise ValueError(f"Unsupported hash method: '{method}'")
async def _exists_pip_package(self, domain: str, package: str) -> bool:
assert self._local_storage
assert self._database
assert self._database.is_open()
pip_infos = await self._database.select_pip_by_domain_and_name(domain, package)
if not pip_infos:
return False
assert len(pip_infos) >= 1
pip_download_dir = self._local_storage.pip_download
for pip_info in pip_infos:
pip_path = os.path.join(pip_download_dir, pip_info.file)
if not os.path.isfile(pip_path):
logger.warning(f"File not found: {pip_path}")
return False
try:
hash_value = self._read_hash(pip_path, pip_info.hash_method)
if hash_value != pip_info.hash_value:
logger.warning(f"Hash mismatch: {pip_path}")
return False
except ValueError:
return False
return True
async def download_pip_packages(
self,
domain: str,
hash_method: str,
logging_encoding="utf-8",
) -> None:
assert self._local_storage
assert self._database
assert self._database.is_open()
def _stdout_callback(data: bytes) -> None:
line = str(data, encoding=logging_encoding).rstrip()
if line:
logger.debug(line)
def _stderr_callback(data: bytes) -> None:
line = str(data, encoding=logging_encoding).rstrip()
if line:
logger.warning(line)
for package in RECC_REQUIREMENTS_MAIN:
if await self._exists_pip_package(domain, package):
logger.debug(f"Exists pip package '{package}'")
continue
else:
await self._database.delete_pip_by_domain_and_name(domain, package)
with self._local_storage.create_temporary_directory() as tmpdir:
logger.debug(f"Run pip download '{package}'")
code = await AsyncPythonSubprocess.create_system().download(
package,
tmpdir,
_stdout_callback,
_stderr_callback,
)
if code == 0:
logger.debug(f"Subprocess is done: pip download '{package}'")
else:
raise RuntimeError(f"Error({code}) pip download '{package}'")
for filename in os.listdir(tmpdir):
filepath = os.path.abspath(os.path.join(tmpdir, filename))
assert os.path.isfile(filepath)
hash_value = self._read_hash(filepath, hash_method)
dest = os.path.join(self._local_storage.pip_download, filename)
if os.path.exists(dest):
try:
os.remove(dest)
logger.debug(f"Remove the existing pip file: '{dest}'")
except BaseException as e:
logger.warning(f"Error remove existing pip file: {e}")
continue
try:
move(filepath, dest)
except BaseException as e:
logger.warning(f"Error moving downloaded pip file: {e}")
continue
try:
await self._database.insert_pip(
domain, package, filename, hash_method, hash_value
)
except BaseException as e:
logger.warning(f"Database insert error: {e}")
continue
logger.debug(f"Done pip download '{package}'")