Aiodocker
Python Docker API client based on asyncio and aiohttp
Example
# -*- coding: utf-8 -*-
from typing import Optional, List, Union
from tarfile import TarFile
from aiodocker import Docker
from aiodocker.containers import DockerContainer
from recc.pm.async_pm_interface import AsyncProcessManagerInterface
from recc.pm.vars import LOCAL_BASE_URL
from recc.pm.signal import SIGNUM_TO_NAME
class AioDockerProcessManager(AsyncProcessManagerInterface):
def __init__(self, base_url=LOCAL_BASE_URL, **kwargs):
self._base_url = base_url
self._docker: Optional[Docker] = None
def is_open(self) -> bool:
return self._docker is not None
async def open(self) -> bool:
self._docker = Docker(self._base_url)
return self._docker is not None
async def close(self) -> None:
assert self._docker
await self._docker.close()
self._docker = None
async def get_images(self, name: Optional[str] = None) -> List[str]:
assert self._docker
result = list()
for image in await self._docker.images.list(before=name):
for tag in image.tags:
result.append(str(tag))
return result
async def pull(self, image: str, **kwargs) -> None:
assert self._docker
name, tag = image.split(":", 2)
self._docker.images.pull(name, tag=tag if tag else None)
async def version(self, key: Optional[str] = None) -> str:
assert self._docker
version = await self._docker.version()
if key is None:
return version["Version"]
return version[key]
async def create(self, image: str, command: Union[str, List[str]], **kwargs) -> str:
assert self._docker
config = {
"Cmd": command,
"Image": image,
}
config.update(**kwargs)
container = await self._docker.containers.create(config)
return container.id
async def get(self, key: str) -> DockerContainer:
assert self._docker
return await self._docker.containers.get(key)
async def start(self, key: str) -> None:
container = await self.get(key)
await container.start()
async def stop(self, key: str) -> None:
container = await self.get(key)
await container.stop()
async def restart(self, key: str) -> None:
container = await self.get(key)
await container.restart()
async def pause(self, key: str) -> None:
container = await self.get(key)
await container.pause()
async def unpause(self, key: str) -> None:
container = await self.get(key)
await container.unpause()
async def kill(self, key: str, signal: Union[str, int]) -> None:
if isinstance(signal, int):
signal_name = SIGNUM_TO_NAME[signal]
else:
assert isinstance(signal, str)
signal_name = signal
container = await self.get(key)
await container.kill(signal=signal_name)
async def wait(self, key: str, timeout: Optional[float] = None) -> None:
container = await self.get(key)
await container.wait(timeout=timeout)
async def get_archive(self, key: str, path: str) -> TarFile:
container = await self.get(key)
return await container.get_archive(path)
async def put_archive(self, key: str, path: str, data: bytes) -> bool:
container = await self.get(key)
return await container.put_archive(path, data)
async def logs(self, key: str, **kwargs) -> str:
container = await self.get(key)
return await container.log(**kwargs)