Skip to content

Python:selectors

High-level I/O multiplexing

FIFO Example

import os
import selectors

sel = selectors.KqueueSelector()

try:
    os.unlink("./myfifo")
except:
    pass
os.mkfifo("./myfifo", 0o600)


def cb(fp):
    sel.unregister(fp)
    print(f"got {fp.read()}")
    fp.close()
    fp2 = open("./myfifo", "rb")
    sel.register(fp2, selectors.EVENT_READ, cb)

if __name__ == "__main__":
    orig_fp = open("./myfifo", "rb")
    print("open done")
    ev = sel.register(orig_fp, selectors.EVENT_READ, cb)
    print(f"registration done for {ev}")

    while True:
        events = sel.select()
        print(events)
        for key, mask in events:
            key.data(key.fileobj)

asyncio 적용 방법

loop.add_reader를 사용하면 된다. Python:EventLoop#How to await a select.select call in python asyncio 항목 참조.

See also

Favorite site