Skip to content

Commit

Permalink
[1030] - 2.5.0 (#42)
Browse files Browse the repository at this point in the history
* --custom_batch_size N

* adds --ntfy argument

* sends a notification on start

* notification function

* status_notification prep
- brain.Ponderation -> models.Ponderation
- ntfy.py -> notification.py
- prep notification.status_notification function

* Your collected {rep} unique posts over the last 24h

* persist

* improve persist.py , test concrent writes and abprut cancel

* custom serializer

* PersistedDict class

* once_per_day uses persist

* notify_at

* source_type is set from process_batch with a static social list

* ipfs schema updated after previous change

* last_notification, renamed status_notification -> statistics_notification

* docker_version_notifier

* add a nice message about ntfy when using it

* embarassing typo fix : embarassement -> embarassment

---------

Co-authored-by: Mathias Dail <93382891+MathiasExorde@users.noreply.github.com>
  • Loading branch information
6r17 and MathiasExorde committed Sep 20, 2023
1 parent 139cd43 commit 613decc
Show file tree
Hide file tree
Showing 16 changed files with 959 additions and 144 deletions.
82 changes: 82 additions & 0 deletions exorde/at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
at.py
Calls function at specified time. It mimics a cron like feature using a throttle
logic.
IT DOES NOT:
- loop forever : the function is still expeced to be called periodicly
IT DOES:
- throttle the parameter function to be called only onced at a specified
date
- persist that it has called the action
- delete the actions of previous day
EXPECT:
- list of times
- the path where to persist the data
- the async action to be called
MISC:
- persist.py
- throttle.py
"""
from datetime import datetime
from typing import Callable

from datetime import datetime, time, timedelta, date
from collections import deque

from exorde.persist import PersistedDict


def at(hours: list[time], path: str, action: Callable):
def assert_integrity(persisted: PersistedDict, index): # index is a _Date
if not index in persisted or not isinstance(persisted[index], list):
persisted[index] = []
current_date = date.today()
previous_day = (current_date - timedelta(days=1)).strftime("%Y-%m-%d")
try:
del persisted[previous_day]
except:
pass

def custom_serializer(obj):
if isinstance(obj, time): # Serialize datetime.time objects to strings
time_str = obj.strftime("%H:%M:%S")
return {"__time__": True, "value": time_str}
return obj

def custom_object_hook(obj):
if (
"__time__" in obj
): # Handle deserialization of datetime.time objects
time_str = obj["value"]
hour, minute, second = map(int, time_str.split(":"))
time_obj = time(hour, minute, second)
return time_obj
return obj

persisted = PersistedDict(
path,
serializer=custom_serializer,
custom_object_hook=custom_object_hook,
)
result = None

async def wrapper(*args, **kwargs):
nonlocal result, persisted
current_date = datetime.now().date().strftime("%Y-%m-%d")
assert_integrity(persisted, current_date)
print(persisted)
for hour in hours:
hour_as_datetime = datetime.combine(date.today(), hour)
if (
datetime.now() >= hour_as_datetime
and hour not in persisted[current_date]
):
persisted[current_date] = persisted[current_date] + [hour]
result = await action(*args, **kwargs)

# returned result is a non-feature
return result

return wrapper
32 changes: 19 additions & 13 deletions exorde/brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,16 @@
from typing import Union, Callable
from types import ModuleType
from exorde.counter import AsyncItemCounter
from datetime import datetime, timedelta
from datetime import datetime, timedelta, time

from exorde.statistics_notification import statistics_notification

LIVE_PONDERATION: str = "https://raw.githubusercontent.com/exorde-labs/TestnetProtocol/main/targets/modules_configuration_v2.json"
DEV_PONDERATION: str = "https://gist.githubusercontent.com/MathiasExorde/179ce30c736d1e3af924a767fadd2088/raw/d16444bc06cb4028f95647dafb6d55ee201fd8c6/new_module_configuration.json"
PONDERATION_URL: str = LIVE_PONDERATION

from dataclasses import dataclass
from typing import Dict, List, Union


@dataclass
class Ponderation:
enabled_modules: Dict[str, List[str]]
generic_modules_parameters: Dict[str, Union[int, str, bool]]
specific_modules_parameters: Dict[str, Dict[str, Union[int, str, bool]]]
weights: Dict[str, float]
from typing import Union
from exorde.models import Ponderation


async def _get_ponderation() -> Ponderation:
Expand Down Expand Up @@ -115,6 +109,7 @@ async def choose_keyword() -> str:
return selected_keyword


from exorde.at import at
from datetime import timedelta
import logging

Expand Down Expand Up @@ -183,11 +178,22 @@ async def think(
command_line_arguments: argparse.Namespace, counter: AsyncItemCounter
) -> tuple[ModuleType, dict, str]:
ponderation: Ponderation = await get_ponderation()
quota_layer = await generate_quota_layer(command_line_arguments, counter)
only_layer = await generate_only_layer(
quota_layer: dict[str, float] = await generate_quota_layer(
command_line_arguments, counter
)
only_layer: dict[str, float] = await generate_only_layer(
ponderation.weights, command_line_arguments
)
await print_counts(ponderation, counter, quota_layer, only_layer)

croned_statistics_notification = at(
[time(hour, 0) for hour in command_line_arguments.notify_at],
"/tmp/exorde/statistics_notifications.json",
statistics_notification,
)
await croned_statistics_notification(
ponderation, counter, quota_layer, only_layer, command_line_arguments
)
module: Union[ModuleType, None] = None
choosen_module_path: str = ""
user_module_overwrite: dict[str, str] = {
Expand Down
27 changes: 26 additions & 1 deletion exorde/counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,40 @@
from datetime import datetime, timedelta
from typing import Dict

from exorde.persist import persist, load

STATS_FILE_PATH = "/tmp/exorde/stats.json"


def ItemCounterSerializer(obj):
if isinstance(obj, datetime):
return {"__datetime__": True, "value": obj.timestamp()}
if isinstance(obj, deque):
return {"__deque__": True, "value": list(obj)}
return obj


def ItemCounterObjectHook(obj):
if "__datetime__" in obj:
return datetime.fromtimestamp(obj["value"])
if "__deque__" in obj:
return deque(obj["value"])
return obj


class AsyncItemCounter:
def __init__(self):
self.data: Dict[str, deque] = {}
self.data: Dict[str, deque] = load(
STATS_FILE_PATH, ItemCounterObjectHook
)

async def increment(self, key: str) -> None:
occurrences = self.data.get(key, deque())
occurrences.append(datetime.now())
self.data[key] = occurrences
await persist(
self.data, STATS_FILE_PATH, custom_serializer=ItemCounterSerializer
)

async def count_last_n_items(self, n_items: int) -> Dict[str, int]:
result = {}
Expand Down
57 changes: 57 additions & 0 deletions exorde/docker_version_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""notify user of new docker image version"""

import os
import logging
import argparse

from typing import Callable

from exorde.persist import PersistedDict
from exorde.notification import send_notification
from exorde.models import LiveConfiguration


def build_docker_version_notifier() -> Callable:
last_notified_version = PersistedDict(
"/tmp/exorde/docker_version_notification.json"
)

async def docker_version_notifier(
live_configuration: LiveConfiguration,
command_line_arguments: argparse.Namespace,
) -> None:
current_img_version = os.environ.get("EXORDE_DOCKER_IMG_VERSION", None)
"""set during build time"""

if not current_img_version:
"""does nothing if no image version is specified in env"""
return

"""else check the version and notify the user"""

nonlocal last_notified_version
"""last version the user has been notified for"""

live_version = live_configuration.get("docker_version", None)
"""and docker version is specified by the network"""
if not live_version:
logging.warning("no docker version specified in LiveConfiguration")
return

if live_version != current_img_version:
"""notify"""
if (
last_notified_version["last_notification"] == None
or last_notified_version != live_version
):
await send_notification(
command_line_arguments, "A new exorde image is available"
)
last_notified_version["last_notification"] = live_version

return docker_version_notifier


docker_version_notifier = build_docker_version_notifier()

__all__ = ["docker_version_notifier"]
3 changes: 3 additions & 0 deletions exorde/get_worker_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
def get_worker_account(worker_name: str) -> Account:
"""Return a worker key based on a name, key stored in .config"""
keys_file = Path.home() / ".config" / "exorde" / f"{worker_name}.json"
logging.info(
f"config directory is : {Path.home() / '.config' / 'exorde' }"
)
if keys_file.exists():
with open(keys_file, "r") as f:
keys = json.load(f)
Expand Down
41 changes: 41 additions & 0 deletions exorde/last_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""last_notification is a string contained in LiveConfiguration"""

import logging
import argparse
from typing import Callable

from exorde.persist import PersistedDict
from exorde.notification import send_notification
from exorde.models import LiveConfiguration


def build_last_notification() -> Callable:
persisted_last_notification = PersistedDict(
"/tmp/exorde/last_notification.json"
)

async def last_notification(
live_configuration: LiveConfiguration,
command_line_arguments: argparse.Namespace,
) -> None:
last_notification = live_configuration.get("last_notification", None)
if not last_notification:
logging.warning("no last_notification found in LiveConfiguration")
return
nonlocal persisted_last_notification
if (
persisted_last_notification["last_notification"] == None
or persisted_last_notification["last_notification"]
!= last_notification
):
await send_notification(command_line_arguments, last_notification)
persisted_last_notification[
"last_notification"
] = last_notification

return last_notification


last_notification = build_last_notification()

__all__ = ["last_notification"]
53 changes: 53 additions & 0 deletions exorde/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from exorde.self_update import self_update
from exorde.get_balance import get_balance
from exorde.counter import AsyncItemCounter
from exorde.last_notification import last_notification
from exorde.docker_version_notifier import docker_version_notifier

import logging

Expand Down Expand Up @@ -57,6 +59,12 @@ async def main(command_line_arguments: argparse.Namespace):
)
os._exit(1)

from exorde.notification import send_notification

await send_notification(
command_line_arguments,
f"{static_configuration['worker_account'].address} has started",
)
logging.info(
f"Worker-Address is : {static_configuration['worker_account'].address}"
)
Expand Down Expand Up @@ -121,6 +129,10 @@ async def main(command_line_arguments: argparse.Namespace):
"An error occured while logging the current reputation"
)
cursor += 1
await docker_version_notifier(
live_configuration, command_line_arguments
)
await last_notification(live_configuration, command_line_arguments)
if live_configuration and live_configuration["online"]:
# quality_job = await get_available_quality_job()
# if quality_job:
Expand Down Expand Up @@ -194,6 +206,15 @@ def clear_env():
import re


def batch_size_type(value):
ivalue = int(value)
if ivalue < 5 or ivalue > 200:
raise argparse.ArgumentTypeError(
f"custom_batch_size must be between 5 and 200 (got {ivalue})"
)
return ivalue


def run():
def validate_module_spec(spec: str) -> str:
pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*=https?://github\.com/[a-zA-Z0-9_\-\.]+/[a-zA-Z0-9_\-\.]+$"
Expand Down Expand Up @@ -250,6 +271,31 @@ def validate_quota_spec(quota_spec: str) -> dict:
help="quota a domain per 24h (domain=amount)",
)

parser.add_argument(
"-ntfy",
"--ntfy",
default="",
type=str,
help="Provides notification using a ntfy.sh topic",
)

def parse_list(s):
try:
return int(s)
except ValueError:
raise argparse.ArgumentTypeError(
"Invalid list format. Use comma-separated integers."
)

parser.add_argument(
"-na",
"--notify_at",
type=parse_list,
action="append",
help="List of integers",
default=[],
)

parser.add_argument(
"-d",
"--debug",
Expand All @@ -259,6 +305,11 @@ def validate_quota_spec(quota_spec: str) -> dict:
const=logging.DEBUG,
default=logging.INFO,
)
parser.add_argument(
"--custom_batch_size",
type=batch_size_type,
help="Custom batch size (between 5 and 200).",
)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)

Expand Down Expand Up @@ -297,6 +348,8 @@ def validate_quota_spec(quota_spec: str) -> dict:
clear_env()

command_line_arguments: argparse.Namespace = parser.parse_args()
if len(command_line_arguments.notify_at) == 0:
command_line_arguments.notify_at = [12, 19]
try:
logging.info("Initializing exorde-client...")
asyncio.run(main(command_line_arguments))
Expand Down
Loading

0 comments on commit 613decc

Please sign in to comment.