Node examples
Multiple Nodes can be simply and efficiently creating with asyncio
#!/usr/bin/env python3
"""
An illustrative example of an asynchronous node system using asyncio.
This module demonstrates the creation and interaction of two types of nodes: NodeA and NodeB.
* NodeA:
- input: queue
- output: callback
* NodeB:
- input: callback
- output: queue
The communication pattern between nodes can be described as:
NodeA --callback--> NodeB --queue--> NodeA
The flow begins with NodeA sending an initial message to NodeB via a callback. NodeB processes
this input and sends an incremented response to NodeA through a queue. NodeA, upon receiving
the message from the queue, again sends an incremented response to NodeB. This iterative
communication continues until the data received by NodeA reaches a predetermined value (`STOP_AT`).
Usage:
Run this module directly to initiate and observe the interaction between NodeA and NodeB.
Author:
Jev Kuznetsov (c) 2023 ROX Automation
"""
import asyncio
import logging
import coloredlogs
from roxbot.nodes.base import CallbackOutput, QueueOutput
LOG_FORMAT = "%(asctime)s [%(name)s] - %(message)s"
STOP_AT = 10 # NodeA wil stop when data reached this value
coloredlogs.install(level="DEBUG", fmt=LOG_FORMAT, datefmt="%H:%M:%S.%f")
class NodeA:
"""callback output and queue input"""
def __init__(self, name: str) -> None:
self.name = name
self.log = logging.getLogger(name)
# define inputs and outputs
self.output = CallbackOutput()
self.input: asyncio.Queue = asyncio.Queue()
async def handle_input(self):
"""coroutine to receive incoming messages from other node"""
while True:
data = await self.input.get()
self.log.debug("%s", f"{self.name} received {data}, sending {data+1}")
self.output.send(data + 1)
# notify queue that message is processed
self.input.task_done()
# exit loop if data reached STOP_AT
if data >= STOP_AT:
break
self.log.info("%s", f"{self.name} reached {STOP_AT=}, {data=} exiting")
async def main(self):
"""main coroutine"""
# send initial message
self.log.debug("sending initial message")
self.output.send(1)
await self.handle_input()
class NodeB:
"""queue output and callback input"""
def __init__(self, name: str) -> None:
self.name = name
self.log = logging.getLogger(name)
# define inputs and outputs
self.output = QueueOutput()
self.input = self.input_callback
def input_callback(self, data):
"""echo incoming data incrementing by 1"""
self.log.debug("%s", f"{self.name} received {data}, sending {data+1}")
self.output.send(data + 1)
def main():
# Initialize nodes
logging.info("Creating NodeA and NodeB")
node_a = NodeA("node_a")
node_b = NodeB("node_b")
# Establish communication between nodes
logging.info("Connecting NodeA's output to NodeB's input and vice-versa")
node_a.output.connect(node_b.input)
node_b.output.connect(node_a.input)
# Start the asynchronous operations for both nodes
logging.info("Starting main asynchronous coroutines for both nodes")
try:
# Start the asynchronous event loop
asyncio.run(node_a.main())
except KeyboardInterrupt:
print("Exiting due to keyboard interrupt")
except Exception as e: # pylint: disable=broad-except
logging.error("An error occurred: %s", str(e))
if __name__ == "__main__":
main()