Skip to content

Node examples

Multiple Nodes can be simply and efficiently creating with asyncio

#!/usr/bin/env python3
An illustrative example of an asynchronous node system using asyncio.

This module demonstrates the creation and interaction of two types of nodes: NodeA and NodeB.

* NodeA:
    - input: queue
    - output: callback
* NodeB:
    - input: callback
    - output: queue

The communication pattern between nodes can be described as:
    NodeA --callback--> NodeB --queue--> NodeA

The flow begins with NodeA sending an initial message to NodeB via a callback. NodeB processes
this input and sends an incremented response to NodeA through a queue. NodeA, upon receiving
the message from the queue, again sends an incremented response to NodeB. This iterative
communication continues until the data received by NodeA reaches a predetermined value (`STOP_AT`).

Run this module directly to initiate and observe the interaction between NodeA and NodeB.

Jev Kuznetsov (c) 2023 ROX Automation
import asyncio
import logging
import coloredlogs

from roxbot.nodes.base import CallbackOutput, QueueOutput

LOG_FORMAT = "%(asctime)s [%(name)s] - %(message)s"

STOP_AT = 10  # NodeA wil stop when data reached this value

coloredlogs.install(level="DEBUG", fmt=LOG_FORMAT, datefmt="%H:%M:%S.%f")

class NodeA:
    """callback output and queue input"""

    def __init__(self, name: str) -> None: = name
        self.log = logging.getLogger(name)

        # define inputs and outputs
        self.output = CallbackOutput()
        self.input: asyncio.Queue = asyncio.Queue()

    async def handle_input(self):
        """coroutine to receive incoming messages from other node"""

        while True:
            data = await self.input.get()

            self.log.debug("%s", f"{} received {data}, sending {data+1}")
            self.output.send(data + 1)

            # notify queue that message is processed

            # exit loop if data reached STOP_AT
            if data >= STOP_AT:
                break"%s", f"{} reached {STOP_AT=}, {data=} exiting")

    async def main(self):
        """main coroutine"""

        # send initial message
        self.log.debug("sending initial message")

        await self.handle_input()

class NodeB:
    """queue output and callback input"""

    def __init__(self, name: str) -> None: = name
        self.log = logging.getLogger(name)

        # define inputs and outputs
        self.output = QueueOutput()
        self.input = self.input_callback

    def input_callback(self, data):
        """echo incoming data incrementing by 1"""

        self.log.debug("%s", f"{} received {data}, sending {data+1}")

        self.output.send(data + 1)

def main():
    # Initialize nodes"Creating NodeA and NodeB")
    node_a = NodeA("node_a")
    node_b = NodeB("node_b")

    # Establish communication between nodes"Connecting NodeA's output to NodeB's input and vice-versa")

    # Start the asynchronous operations for both nodes"Starting main asynchronous coroutines for both nodes")

        # Start the asynchronous event loop

    except KeyboardInterrupt:
        print("Exiting due to keyboard interrupt")
    except Exception as e:  # pylint: disable=broad-except
        logging.error("An error occurred: %s", str(e))

if __name__ == "__main__":