-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixing issue with event retrieval (#28)
* Fixing the issue with not retrieving events after the first batch * Implemented the notifier mock class * 🔖 v0.0.7 * Backed up to the previous implementation after some tests * Fixed the issue with expiration predicate * Linting * Reverted back the sig changes * Linting --------- Co-authored-by: Roman Glushko <roman.glushko.m@datarobot.com>
- Loading branch information
1 parent
d9aea9d
commit 2e8edd8
Showing
6 changed files
with
64 additions
and
12 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,55 @@ | ||
from os import PathLike | ||
from typing import Sequence, List | ||
|
||
from notifykit import Event | ||
|
||
|
||
class NotifierMock: | ||
# TODO: implement | ||
... | ||
""" | ||
A notifier mock that allows to control filesystems events without actually watching the filesystem | ||
""" | ||
|
||
def __init__(self, events_batches: List[List[Event]]) -> None: | ||
self._watch_paths: List[PathLike[str]] = [] | ||
self._events_batches = events_batches | ||
|
||
@property | ||
def watch_paths(self) -> List[PathLike[str]]: | ||
return self._watch_paths | ||
|
||
@property | ||
def events_batches(self) -> List[List[Event]]: | ||
return self._events_batches | ||
|
||
def add_event_batch(self, events_batch: List[Event]) -> None: | ||
self._events_batches.append(events_batch) | ||
|
||
def watch( | ||
self, | ||
paths: Sequence[PathLike[str]], | ||
recursive: bool = True, | ||
ignore_permission_errors: bool = False, | ||
) -> None: | ||
self._watch_paths.extend(paths) | ||
|
||
def unwatch(self, paths: Sequence[PathLike[str]]) -> None: | ||
for path in paths: | ||
self._watch_paths.remove(path) | ||
|
||
def __aiter__(self) -> "NotifierMock": | ||
return self | ||
|
||
def __iter__(self) -> "NotifierMock": | ||
return self | ||
|
||
def __next__(self) -> List[Event]: | ||
if not self._events_batches: | ||
raise StopIteration | ||
|
||
return self._events_batches.pop(0) | ||
|
||
async def __anext__(self) -> List[Event]: | ||
if not self._events_batches: | ||
raise StopAsyncIteration | ||
|
||
return self._events_batches.pop(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters