Skip to content

Node examples

Multiple Nodes can be simply and efficiently creating with asyncio

#!/usr/bin/env python3
"""
An illustrative example of an asynchronous node system using asyncio.

This module demonstrates the creation and interaction of two types of nodes: NodeA and NodeB.

* NodeA:
    - input: queue
    - output: callback
* NodeB:
    - input: callback
    - output: queue


The communication pattern between nodes can be described as:
    NodeA --callback--> NodeB --queue--> NodeA

The flow begins with NodeA sending an initial message to NodeB via a callback. NodeB processes
this input and sends an incremented response to NodeA through a queue. NodeA, upon receiving
the message from the queue, again sends an incremented response to NodeB. This iterative
communication continues until the data received by NodeA reaches a predetermined value (`STOP_AT`).


Usage:
Run this module directly to initiate and observe the interaction between NodeA and NodeB.

Author:
Jev Kuznetsov (c) 2023 ROX Automation
"""
import asyncio
import logging
import coloredlogs

from roxbot.nodes.base import CallbackOutput, QueueOutput

LOG_FORMAT = "%(asctime)s [%(name)s] - %(message)s"


STOP_AT = 10  # NodeA wil stop when data reached this value

coloredlogs.install(level="DEBUG", fmt=LOG_FORMAT, datefmt="%H:%M:%S.%f")


class NodeA:
    """callback output and queue input"""

    def __init__(self, name: str) -> None:
        self.name = name
        self.log = logging.getLogger(name)

        # define inputs and outputs
        self.output = CallbackOutput()
        self.input: asyncio.Queue = asyncio.Queue()

    async def handle_input(self):
        """coroutine to receive incoming messages from other node"""

        while True:
            data = await self.input.get()

            self.log.debug("%s", f"{self.name} received {data}, sending {data+1}")
            self.output.send(data + 1)

            # notify queue that message is processed
            self.input.task_done()

            # exit loop if data reached STOP_AT
            if data >= STOP_AT:
                break

        self.log.info("%s", f"{self.name} reached {STOP_AT=}, {data=} exiting")

    async def main(self):
        """main coroutine"""

        # send initial message
        self.log.debug("sending initial message")
        self.output.send(1)

        await self.handle_input()


class NodeB:
    """queue output and callback input"""

    def __init__(self, name: str) -> None:
        self.name = name
        self.log = logging.getLogger(name)

        # define inputs and outputs
        self.output = QueueOutput()
        self.input = self.input_callback

    def input_callback(self, data):
        """echo incoming data incrementing by 1"""

        self.log.debug("%s", f"{self.name} received {data}, sending {data+1}")

        self.output.send(data + 1)


def main():
    # Initialize nodes
    logging.info("Creating NodeA and NodeB")
    node_a = NodeA("node_a")
    node_b = NodeB("node_b")

    # Establish communication between nodes
    logging.info("Connecting NodeA's output to NodeB's input and vice-versa")
    node_a.output.connect(node_b.input)
    node_b.output.connect(node_a.input)

    # Start the asynchronous operations for both nodes
    logging.info("Starting main asynchronous coroutines for both nodes")

    try:
        # Start the asynchronous event loop
        asyncio.run(node_a.main())

    except KeyboardInterrupt:
        print("Exiting due to keyboard interrupt")
    except Exception as e:  # pylint: disable=broad-except
        logging.error("An error occurred: %s", str(e))


if __name__ == "__main__":
    main()