diff --git a/libkirk/ltx.py b/libkirk/ltx.py index 996756d..b0a9812 100644 --- a/libkirk/ltx.py +++ b/libkirk/ltx.py @@ -6,7 +6,6 @@ .. moduleauthor:: Andrea Cervesato """ import os -import select import asyncio import logging import typing @@ -505,6 +504,8 @@ async def connect(self) -> None: self._logger.info("Connecting to LTX") + os.set_blocking(self._stdout_fd, False) + self._exception = None self._task = libkirk.create_task(self._polling()) @@ -586,7 +587,15 @@ async def _read(self, size: int) -> bytes: """ Blocking I/O method to read from stdout. """ - return await libkirk.to_thread(os.read, self._stdout_fd, size) + data = None + try: + data = await libkirk.to_thread(os.read, self._stdout_fd, size) + except BlockingIOError: + # we ensure other threads will take action if reading + # procedure is too fast for this process + os.sched_yield() + + return data async def _write(self, data: bytes) -> None: """ @@ -608,44 +617,35 @@ async def _polling(self) -> None: """ self._logger.info("Starting producer") - poller = select.epoll() - poller.register(self._stdout_fd, select.EPOLLIN) - # force utf-8 encoding by using raw=False unpacker = msgpack.Unpacker(raw=False) try: while not self._stop: - events = await libkirk.to_thread(poller.poll, 0.1) - - for fdesc, _ in events: - if fdesc != self._stdout_fd: - continue - - data = await self._read(self.BUFFSIZE) - if not data: - continue + data = await self._read(self.BUFFSIZE) + if not data: + continue - self._logger.debug("Unpacking bytes: %s", data) + self._logger.debug("Unpacking bytes: %s", data) - unpacker.feed(data) + unpacker.feed(data) - while True: - try: - msg = unpacker.unpack() - if not msg: - continue + while True: + try: + msg = unpacker.unpack() + if not msg: + continue - self._logger.info("Received message: %s", msg) - if not isinstance(msg, list): - raise LTXError("Message must be an array") + self._logger.info("Received message: %s", msg) + if not isinstance(msg, list): + raise LTXError("Message must be an array") - if msg[0] == Request.ERROR: - raise LTXError(msg[1]) + if msg[0] == Request.ERROR: + raise LTXError(msg[1]) - await self._feed_requests(msg) - except msgpack.OutOfData: - break + await self._feed_requests(msg) + except msgpack.OutOfData: + break except LTXError as err: self._exception = err finally: