Skip to content

Aiodocker

Python Docker API client based on asyncio and aiohttp

Example

# -*- coding: utf-8 -*-

from typing import Optional, List, Union
from tarfile import TarFile
from aiodocker import Docker
from aiodocker.containers import DockerContainer
from recc.pm.async_pm_interface import AsyncProcessManagerInterface
from recc.pm.vars import LOCAL_BASE_URL
from recc.pm.signal import SIGNUM_TO_NAME


class AioDockerProcessManager(AsyncProcessManagerInterface):
    def __init__(self, base_url=LOCAL_BASE_URL, **kwargs):
        self._base_url = base_url
        self._docker: Optional[Docker] = None

    def is_open(self) -> bool:
        return self._docker is not None

    async def open(self) -> bool:
        self._docker = Docker(self._base_url)
        return self._docker is not None

    async def close(self) -> None:
        assert self._docker
        await self._docker.close()
        self._docker = None

    async def get_images(self, name: Optional[str] = None) -> List[str]:
        assert self._docker
        result = list()
        for image in await self._docker.images.list(before=name):
            for tag in image.tags:
                result.append(str(tag))
        return result

    async def pull(self, image: str, **kwargs) -> None:
        assert self._docker
        name, tag = image.split(":", 2)
        self._docker.images.pull(name, tag=tag if tag else None)

    async def version(self, key: Optional[str] = None) -> str:
        assert self._docker
        version = await self._docker.version()
        if key is None:
            return version["Version"]
        return version[key]

    async def create(self, image: str, command: Union[str, List[str]], **kwargs) -> str:
        assert self._docker
        config = {
            "Cmd": command,
            "Image": image,
        }
        config.update(**kwargs)
        container = await self._docker.containers.create(config)
        return container.id

    async def get(self, key: str) -> DockerContainer:
        assert self._docker
        return await self._docker.containers.get(key)

    async def start(self, key: str) -> None:
        container = await self.get(key)
        await container.start()

    async def stop(self, key: str) -> None:
        container = await self.get(key)
        await container.stop()

    async def restart(self, key: str) -> None:
        container = await self.get(key)
        await container.restart()

    async def pause(self, key: str) -> None:
        container = await self.get(key)
        await container.pause()

    async def unpause(self, key: str) -> None:
        container = await self.get(key)
        await container.unpause()

    async def kill(self, key: str, signal: Union[str, int]) -> None:
        if isinstance(signal, int):
            signal_name = SIGNUM_TO_NAME[signal]
        else:
            assert isinstance(signal, str)
            signal_name = signal
        container = await self.get(key)
        await container.kill(signal=signal_name)

    async def wait(self, key: str, timeout: Optional[float] = None) -> None:
        container = await self.get(key)
        await container.wait(timeout=timeout)

    async def get_archive(self, key: str, path: str) -> TarFile:
        container = await self.get(key)
        return await container.get_archive(path)

    async def put_archive(self, key: str, path: str, data: bytes) -> bool:
        container = await self.get(key)
        return await container.put_archive(path, data)

    async def logs(self, key: str, **kwargs) -> str:
        container = await self.get(key)
        return await container.log(**kwargs)

See also

Favorite site