Skip to content

Commit

Permalink
Integrate ultrasonic to framework
Browse files Browse the repository at this point in the history
* Migrating Ultrasonic into Interactive and hooking up the path node.
  • Loading branch information
danielbloy committed Aug 1, 2024
1 parent 3b39962 commit aa15a40
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 66 deletions.
97 changes: 52 additions & 45 deletions halloween/2024/path/code.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,82 @@
# Entry point for a path node. This supports both path nodes that are on either
# side of the path. One will be the primary node and the other the secondary.
import asyncio

import board

from interactive.animation import Flicker
from interactive.environment import are_pins_available
from interactive.interactive import Interactive
from interactive.log import set_log_level, info, INFO
from interactive.log import info
from interactive.polyfills.animation import ORANGE, BLACK
from interactive.polyfills.pixel import new_pixels

PRIMARY = True
BUTTON_PIN = None
BUZZER_PIN = None
BUZZER_VOLUME = 0.1
SKULL_BRIGHTNESS = 1.0
SKULL_OFF = 0.0
SKULL_SPEED = 0.1
SKULL_COLOUR = ORANGE
SKULL_PINS = [None, None, None, None, None, None]
SKULL_PINS = [board.GP10, board.GP11, board.GP12, board.GP13, board.GP14, board.GP15]

# Perform import of configuration here to allow for overrides from the config file.
from interactive.configuration import *

pixels = [new_pixels(pin, 8, brightness=SKULL_BRIGHTNESS) for pin in SKULL_PINS if pin is not None]
animations = [Flicker(pixel, speed=SKULL_SPEED, color=SKULL_COLOUR) for pixel in pixels]

# Default settings
if are_pins_available():
# noinspection PyPackageRequirements
import board

BUTTON_PIN = board.GP27
BUZZER_PIN = board.GP2
async def cancel() -> None:
# TODO: This could probable just call stop_display()
for animation in animations:
animation.freeze()

SKULL_PINS = [board.GP10, board.GP11, board.GP12, board.GP13, board.GP14, board.GP15]
for pixel in pixels:
pixel.fill(BLACK)
pixel.write()

if __name__ == '__main__':

set_log_level(INFO)
async def stop_display() -> None:
for pixel in pixels:
pixel.brightness = SKULL_OFF
pixel.show()

# Try loading local device settings as overrides.
try:
# noinspection PyPackageRequirements
from config import *

info("Config file loaded")
async def start_display() -> None:
for pixel in pixels:
pixel.brightness = SKULL_BRIGHTNESS

except ImportError:
info("No config file was found")
t1 = asyncio.create_task(test_task_1())
t2 = asyncio.create_task(test_task_2())

config = Interactive.Config()
config.buzzer_pin = BUZZER_PIN
config.button_pin = BUTTON_PIN
config.buzzer_volume = BUZZER_VOLUME

interactive = Interactive(config)
async def run_display() -> None:
for animation in animations:
animation.animate()

# Construct the pixels and animate them.
pixels = [new_pixels(pin, 8, brightness=SKULL_BRIGHTNESS) for pin in SKULL_PINS if pin is not None]
animations = [Flicker(pixel, speed=SKULL_SPEED, color=SKULL_COLOUR) for pixel in pixels]

async def test_task_1() -> None:
info("Start test task 1")
await asyncio.sleep(2)
for pixel in pixels:
pixel.brightness = SKULL_OFF
pixel.show()
info("End test task 1")

async def animate_skulls() -> None:
for animation in animations:
animation.animate()

async def test_task_2() -> None:
info("Start test task 2")
await asyncio.sleep(1)
info("End test task 2")

interactive.runner.add_loop_task(animate_skulls)

config = get_node_config()
config.trigger_start = start_display
config.trigger_run = run_display
config.trigger_stop = stop_display

# TODO: Control how the skulls are enabled/disabled.
interactive = Interactive(config)

async def callback() -> None:
if interactive.cancel:
for animation in animations:
animation.freeze()

for pixel in pixels:
pixel.fill(BLACK)
pixel.write()
async def callback() -> None:
if interactive.cancel:
await cancel()


interactive.run(callback)
interactive.run(callback)
Empty file removed halloween/2024/path/path.py
Empty file.
Empty file removed halloween/2024/spiders/spiders.py
Empty file.
54 changes: 54 additions & 0 deletions interactive/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This file is used to setup the default configuration for a typical node.
# Using it removes some boilerplate from the node code. It also sets up
# the logging level.
from interactive.environment import are_pins_available
from interactive.interactive import Interactive
from interactive.log import set_log_level, INFO

LOG_LEVEL = INFO

BUTTON_PIN = None

BUZZER_PIN = None
BUZZER_VOLUME = 0.1

ULTRASONIC_TRIGGER_PIN = None
ULTRASONIC_ECHO_PIN = None

TRIGGER_DISTANCE = 100
TRIGGER_DURATION = 60

if are_pins_available():
# noinspection PyPackageRequirements
import board

# Default settings
BUTTON_PIN = board.GP27
BUZZER_PIN = board.GP2
ULTRASONIC_TRIGGER_PIN = board.GP7
ULTRASONIC_ECHO_PIN = board.GP6

# Try loading local device settings as overrides.
try:
# noinspection PyPackageRequirements
from config import *

print("Config file loaded")

except ImportError:
print("No config file was found")

set_log_level(LOG_LEVEL)


def get_node_config() -> Interactive.Config:
config = Interactive.Config()
config.buzzer_pin = BUZZER_PIN
config.button_pin = BUTTON_PIN
config.buzzer_volume = BUZZER_VOLUME
config.ultrasonic_trigger_pin = ULTRASONIC_TRIGGER_PIN
config.ultrasonic_echo_pin = ULTRASONIC_ECHO_PIN
config.trigger_distance = TRIGGER_DISTANCE
config.trigger_duration = TRIGGER_DURATION

return config
50 changes: 36 additions & 14 deletions interactive/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
from interactive.polyfills.buzzer import new_buzzer
from interactive.polyfills.ultrasonic import new_ultrasonic
from interactive.runner import Runner
from interactive.scheduler import new_triggered_task, Triggerable
from interactive.ultrasonic import UltrasonicTrigger

if is_running_on_desktop():
from collections.abc import Callable, Awaitable


# TODO: When the ultrasonic sensor support is added, it should automatically do the
# distance measurements and trigger events based on distance.

class Interactive:
"""
Interactive is the entry point class and sets up a running environment based on the
Expand All @@ -33,19 +32,26 @@ def __init__(self):
self.button_pin = None
self.buzzer_pin = None
self.buzzer_volume = 1.0
self.ultrasonic_trigger = None
self.ultrasonic_echo = None
self.ultrasonic_trigger_pin = None
self.ultrasonic_echo_pin = None
self.trigger_distance = 9999
self.trigger_duration = 0
self.trigger_start = None
self.trigger_run = None
self.trigger_stop = None

def __str__(self):
return f"""
Button:
Pin ......... : {self.button_pin}
Pin ............... : {self.button_pin}
Buzzer:
Pin ......... : {self.buzzer_pin}
Volume ...... : {self.buzzer_volume}
Pin ............... : {self.buzzer_pin}
Volume ............ : {self.buzzer_volume}
Ultrasonic Sensor:
Trigger ..... : {self.ultrasonic_trigger}
Echo ........ : {self.ultrasonic_echo}
Trigger ........... : {self.ultrasonic_trigger_pin}
Echo .............. : {self.ultrasonic_echo_pin}
Distance .......... : {self.trigger_distance} cm
Duration .......... : {self.trigger_duration} seconds
"""

def log(self, level):
Expand Down Expand Up @@ -79,10 +85,22 @@ def __init__(self, config: Config):

self.ultrasonic = None
self.ultrasonic_controller = None

if self.config.ultrasonic_trigger is not None and self.config.ultrasonic_echo is not None:
self.ultrasonic = new_ultrasonic(self.config.ultrasonic_trigger, self.config.ultrasonic_echo)
# TODO: setup controller
self.trigger = None
self.triggerable = Triggerable()

if self.config.ultrasonic_trigger_pin is not None and self.config.ultrasonic_echo_pin is not None:
self.ultrasonic = new_ultrasonic(self.config.ultrasonic_trigger_pin, self.config.ultrasonic_echo_pin)
self.trigger = UltrasonicTrigger(self.ultrasonic)
self.trigger.register(self.runner)
self.trigger.add_trigger(self.config.trigger_distance, self.__trigger_handler, self.config.trigger_duration)

trigger_loop = new_triggered_task(
self.triggerable,
duration=self.config.trigger_duration,
start=self.config.trigger_start,
run=self.config.trigger_run,
stop=self.config.trigger_stop)
self.runner.add_loop_task(trigger_loop)

@property
def cancel(self) -> bool:
Expand Down Expand Up @@ -119,3 +137,7 @@ async def __long_press_handler(self) -> None:
if not self.runner.cancel and self.buzzer_controller:
# TODO: This needs to be a proper action
self.buzzer_controller.beeps(5)

async def __trigger_handler(self, distance: float, actual: float) -> None:
info(f"Distance {distance} handler triggered: {actual}")
self.triggerable.triggered = True
70 changes: 70 additions & 0 deletions interactive/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,73 @@ async def handler() -> None:
await task()

return handler


class Triggerable:
"""Trivial implementation for a triggerable object."""

def __init__(self):
self.triggered = False


def new_triggered_task(
triggerable,
duration: float,
start: Callable[[], Awaitable[None]] = None,
run: Callable[[], Awaitable[None]] = None,
stop: Callable[[], Awaitable[None]] = None,
cancel_func: Callable[[], bool] = never_terminate) -> Callable[[], Awaitable[None]]:
"""
Returns an async task that will only invoke the functions start, stop and run if the
trigger has been activated. The start function will be called once when the trigger
is activated, run will be called as a normal loop task whilst the trigger is activated
and stop will be called once when the trigger is deactivated; which will occurs as the
specified number of seconds after the trigger has been activated.
At least one of start, stop and run must be provided, but they need not all be specified.
Once a trigger is activated, it will not be activated again until after it has expired
and been deactivated. The triggerable object is used to activate the trigger via a
"triggered" property.
The returned task can be added to a Runner so it is called when triggered.
:param triggerable: Object that has a triggered property which will activate.
:param duration: The duration that trigger lasts (i.e. the time between start and stop calls).
:param start: This is called once when the trigger is activated
:param run: This is called once every cycle when triggered.
:param stop: This is called once when the trigger expires.
:param cancel_func: A function that returns whether to cancel the task or not.
"""

if start is None and run is None and stop is None:
raise ValueError("at least one of start, run or stop must be specified")

running = False
stop_time = 0

async def handler() -> None:
nonlocal running, stop_time

now = time.monotonic()

if triggerable.triggered and not running:
debug("Start running trigger event")
stop_time = now + duration
running = True
if start is not None:
await start()

triggerable.triggered = False

if running and now >= stop_time:
debug("Stop running trigger event")
running = False
if stop is not None:
await stop()

if running and run is not None:
debug("Sunning trigger event")
await run()

return new_loop_task(handler, cancel_func)
5 changes: 5 additions & 0 deletions tests/interactive/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
TEST_VALUE = 123.456

TEST_STRING = "Hello world!"

TRIGGER_DISTANCE = 99999
17 changes: 17 additions & 0 deletions tests/interactive/test_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from interactive import configuration


class TestNode:
def test_config_is_returned(self) -> None:
"""
Tests that configuration defaults are loaded as well as the local overrides
contained in config.py.
"""
config = configuration.get_node_config()

assert config is not None
assert config.trigger_distance == 99999

# These are just random configuration values from the config.
assert configuration.TEST_VALUE == 123.456
assert configuration.TEST_STRING == "Hello world!"
Loading

0 comments on commit aa15a40

Please sign in to comment.