Node
A node is always represented as a rust
struct
that derives the Node
procedure and implements the Node
trait. Here are some details about those procedures:
The macros
#[derive(Node)]
The #[derive(Node)]
procedure will do two things:
-
Create symbols if the
cdylib
feature is enabled, so that theruntime
is able to load the node dynamically. -
Implement a default
tokio
Runtime
that will automatically reuse the contextruntime
if the node has been statically linked, or create a new one if the node is dynamically linked.
The generated code is as follow:
#![allow(unused)] fn main() { #[cfg(feature = "cdylib")] #[doc(hidden)] #[unsafe(no_mangle)] pub static IRIDIS_NODE: iridis_node::prelude::DynamicallyLinkedNodeInstance = |inputs, outputs, queries, queryables, configuration| { <#name>::new(inputs, outputs, queries, queryables, configuration) }; static DEFAULT_TOKIO_RUNTIME: std::sync::LazyLock<iridis_node::prelude::thirdparty::tokio::runtime::Runtime> = std::sync::LazyLock::new(|| iridis_node::prelude::thirdparty::tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime")); fn default_runtime<T: Send + 'static>( task: impl Future<Output = T> + Send + 'static, ) -> iridis_node::prelude::thirdparty::tokio::task::JoinHandle<T> { match iridis_node::prelude::thirdparty::tokio::runtime::Handle::try_current() { Ok(handle) => handle.spawn(task), Err(_) => DEFAULT_TOKIO_RUNTIME.spawn(task) } } }
#[node(runtime = "default_runtime")] impl Node
First of all, the Node
trait implements the following methods:
#![allow(unused)] fn main() { pub trait Node: Send + Sync { fn new( inputs: Inputs, outputs: Outputs, queries: Queries, queryables: Queryables, configuration: serde_yml::Value, ) -> tokio::task::JoinHandle<Result<Box<dyn Node>>> where Self: Sized; fn start(self: Box<Self>) -> tokio::task::JoinHandle<Result<()>>; } }
So each node must implement the new
and start
methods. See below for more details.
As you can see each method returns a tokio::task::JoinHandle
, which means that the node is executed in a tokio
task. But writing the new
method is a bit tricky, because the Node
trait is not async
, so you can't use async fn
directly. Instead, you have to use a runtime
(the default_runtime
for example) function to spawn the task. This is why we use the procedure #[node(runtime = "default_runtime")]
.
It transforms your code so that:
#![allow(unused)] fn main() { async fn new( _: Inputs, _: Outputs, _: Queries, _: Queryables, _: serde_yml::Value, ) -> Result<Self> { Ok(Self {}) } }
Will be transformed to that:
#![allow(unused)] fn main() { fn new ( _: Inputs, _: Outputs, _: Queries, _: Queryables, _: serde_yml::Value, ) -> JoinHandle<Result<Box<dyn Node>>> { default_runtime(async { Ok(Box::new(Self {}) as Box<dyn Node>) }) } }
IOs
To better understand what are the parameters of the new
method, we must understand the application pipeline:
-
You create a
DataflowLayout
that describes the number of nodes, their names and their inputs/outputs -
You create the connections (the flows) for this layout
-
You then pass this to the
runtime
that will load the nodes according to the layout and the flows.
This is when the new
method is called: each node can retrieve the IO objects that are passed to it by the runtime
. To retrieve it, simply use the with
function:
#![allow(unused)] fn main() { Self { output: outputs .with("out") .await .wrap_err("Failed to create output")?, } }
Be careful, the name out
must match the one provided inside the DataflowLayout
. Also, the objects are typed by default. Which means that each IO has to precise the ArrowMessage
type that it will receive/send or query:
#![allow(unused)] fn main() { use iridis_node::prelude::{thirdparty::*, *}; #[derive(Node)] pub struct MySource { pub output: Output<String>, } }
See the Messages section for more details about the ArrowMessage
type.
In case of Query/Queryables, two ArrowMessage
types must be provided, one for the request message, and one for the response message. For example:
#![allow(unused)] fn main() { pub compare_to_128: Queryable<u8, String>, }
Here the request is an u8
and the response is a String
.
Note: if you don't want to manipulate typed IOs, you can use the RawInput
, RawOutput
, RawQuery
and RawQueryable
alternatives, together with the with_raw
function. This will allow you to manipulate ArrayData
directly with no serialization/deserialization.
Configuration
Each node also receives a configuration
parameter. This is a serde_yaml::Value
object that can be used to pass configuration parameters to the node. This is useful if you want to pass some parameters to the node at runtime, so you can use the same node implementation for different configurations.
Start
The start
method is called once all nodes have been loaded. It consumes the node and so, when the function returns, the node is no longer available, it will be dropped. You have to take care of the loop yourself. For example, you can have a loop that sends messages every second:
#![allow(unused)] fn main() { async fn start(self: Box<Self>) -> Result<()> { while let Ok(()) = self .output .send("tick".to_string()) .await .wrap_err("Failed to send message") { tokio::time::sleep(Duration::from_secs(1)).await; } Ok(()) } }
Here we use a while let Ok(())
loop instead of just a loop
, because when aborting the node the send
function will eventually return an channel closed
error. So we can use this to stop the loop and exit the node gracefully.
The send
method takes an ArrowMessage
as a parameter, it will then serialize it as an ArrayData
without copy, add a Header
and send it to the output channel. The input
that will receives the message, will receives this DataflowMessage
object
as a tuple containing the Header
and the reserialized ArrowMessage
object.
The same logic applies for Query/Queryable
. See the examples section for more details.