Skip to content

Watchdog

Python API library and shell utilities to monitor file system events.

Install

pip install watchdog

Example API Usage

A simple program that uses watchdog to monitor directories specified as command-line arguments and logs events generated:

import time

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer


class MyEventHandler(FileSystemEventHandler):
    def on_any_event(self, event: FileSystemEvent) -> None:
        print(event)


event_handler = MyEventHandler()
observer = Observer()
observer.schedule(event_handler, ".", recursive=True)
observer.start()
try:
    while True:
        time.sleep(1)
finally:
    observer.stop()
    observer.join()

Shell Utilities

Watchdog comes with an optional utility script called watchmedo. Please type watchmedo --help at the shell prompt to know more about this tool.

Here is how you can log the current directory recursively for events related only to *.py and *.txt files while ignoring all directory events:

watchmedo log \
    --patterns='*.py;*.txt' \
    --ignore-directories \
    --recursive \
    --verbose \
    .

You can use the shell-command subcommand to execute shell commands in response to events:

watchmedo shell-command \
    --patterns='*.py;*.txt' \
    --recursive \
    --command='echo "${watch_src_path}"' \
    .

Asyncio watchdog

import asyncio
from pathlib import Path
from typing import Optional

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer


class _EventHandler(FileSystemEventHandler):
    def __init__(self, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
                 *args, **kwargs):
        self._loop = loop
        self._queue = queue
        super(*args, **kwargs)

    def on_created(self, event: FileSystemEvent) -> None:
        self._loop.call_soon_threadsafe(self._queue.put_nowait, event)


class EventIterator(object):
    def __init__(self, queue: asyncio.Queue,
                 loop: Optional[asyncio.BaseEventLoop] = None):
        self.queue = queue

    def __aiter__(self):
        return self

    async def __anext__(self):
        item = await self.queue.get()

        if item is None:
            raise StopAsyncIteration

        return item


def watch(path: Path, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
          recursive: bool = False) -> None:
    """Watch a directory for changes."""
    handler = _EventHandler(queue, loop)

    observer = Observer()
    observer.schedule(handler, str(path), recursive=recursive)
    observer.start()
    print("Observer started")
    observer.join(10)
    loop.call_soon_threadsafe(queue.put_nowait, None)


async def consume(queue: asyncio.Queue) -> None:
    async for event in EventIterator(queue):
        print("Got an event!", event)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)

    futures = [
        loop.run_in_executor(None, watch, Path("."), queue, loop, False),
        consume(queue),
    ]

    loop.run_until_complete(asyncio.gather(*futures))

See also

Favorite site