Python:selectors
High-level I/O multiplexing
FIFO Example
import os
import selectors
sel = selectors.KqueueSelector()
try:
os.unlink("./myfifo")
except:
pass
os.mkfifo("./myfifo", 0o600)
def cb(fp):
sel.unregister(fp)
print(f"got {fp.read()}")
fp.close()
fp2 = open("./myfifo", "rb")
sel.register(fp2, selectors.EVENT_READ, cb)
if __name__ == "__main__":
orig_fp = open("./myfifo", "rb")
print("open done")
ev = sel.register(orig_fp, selectors.EVENT_READ, cb)
print(f"registration done for {ev}")
while True:
events = sel.select()
print(events)
for key, mask in events:
key.data(key.fileobj)
asyncio 적용 방법
loop.add_reader
를 사용하면 된다. Python:EventLoop#How to await a select.select call in python asyncio 항목 참조.
See also
- Python:select
- Python:selectors
- IOCP
- devpoll
- epoll
- kqueue