Skip to content

Commit

Permalink
Event - make possible to unsubscribe from subscribed function
Browse files Browse the repository at this point in the history
  • Loading branch information
IGalat committed Feb 19, 2024
1 parent bc2dc94 commit 9eeb987
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/tapper/util/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
from typing import Any
from typing import Callable
from typing import ClassVar
from typing import Optional
from typing import Protocol

SubscribedFunction = Callable[[Any], None]
SubscribedFunction = Callable[[Any], Optional[bool]]
"""
If return value is False, the function is unsubscribed.
"""

_subscribers: dict[str, list[SubscribedFunction]] = dict()

Expand All @@ -25,7 +29,7 @@ def subscribe(topic: str, subscribed_function: SubscribedFunction) -> None:
:param topic: predefined string
:param subscribed_function: function that will receive messages.
It has to accept one parameter, and should be compatible
with publisher's message' data type
with publisher's message' data type.
"""
if topic not in _subscribers:
_subscribers[topic] = []
Expand All @@ -48,12 +52,14 @@ def publish(topic: str, message: Any) -> None:
:param topic: predefined string
:param message: Any data type, dataclass of this
should in a separate module to decouple from subscribers
should in a separate module to decouple from subscribers.
"""
if topic not in _subscribers:
return
for subscribed_function in _subscribers[topic]:
subscribed_function(message)
result = subscribed_function(message)
if result is False:
unsubscribe(topic, subscribed_function)


class EventDatatype(Protocol):
Expand Down

0 comments on commit 9eeb987

Please sign in to comment.