iridis

iridis is a rust library for building dataflow applications. It provides simple APIs to:

  • Make a standalone node in Rust
  • Assemble nodes into a dataflow graph
  • Customize the runtime with plugins

iridis is inspired by both dora-rs and zenoh-flow, two projects that focus on dataflow programming. It aims to provide a more lightweight, up to date and extensible solution for building dataflow applications in Rust.

Features

  • Fast builds: iridis is designed to be simple and easy to use. It tries to lower the number of dependencies and the complexity of the code so that each time you want to try your application you don't have to spend dozen of seconds to compile it!

  • Fast, that's all: iridis design is based on shared-library loading. So that each node is in fact the same global process, allowing for sharing data effectively.

  • Async: iridis is built on top of tokio, a powerful async runtime for Rust. This allows you to build highly concurrent applications with ease.

  • Extensible: iridis provides a plugin system that allows you to extend the runtime with custom plugins. This allows you to customize the behavior of the runtime to suit your needs. (see pyridis for a example of a plugin that allows you to use python as a node in the dataflow graph)

Related projects

  • pyridis - A plugin for iridis that allows you to use python as a node in the dataflow graph.
  • distore - A store repository that allows you to use already made nodes in your application.
  • iris - A CLI tool that allows you to create applicatios directly from a YAML file.

Getting Started

This part of the wiki showcases how to get started with iridis and how to use it in your own projects.

Installation

Prerequisites

To use iridis, you need to have the following installed:

  • rustup
  • cargo

Because of the shared-library design of iridis, you need to use the same rust toolchain for all the nodes, plugins and applications. This project is usig the latest stable version of rust, so you should use the same version for your own projects. See rust-toolchain.toml.

For an application

Each application that uses iridis is a Cargo binary crate. You can create a new Cargo binary crate using the following command:

cargo new --bin my_app

Then, add iridis as a dependency in your Cargo.toml file:

[dependencies]
iridis = "0.3"

Or add it with cargo add:

cargo add iridis

For a node

Each node that uses iridis is a Cargo library crate (optionally cdylib). You can create a new Cargo library crate using the following command:

cargo new --lib my_node

Then, add iridis-node as a dependency in your Cargo.toml file:

[dependencies]
iridis-node = "0.3"

Or add it with cargo add:

cargo add iridis-node

Because a node can be compiled as a cdylib, you need to add the following lines in your Cargo.toml file:

[lib]
crate-type = ["cdylib", "rlib"]

[features]
cdylib = []

Note: If you only intend to use the node as a static library, you can remove the cdylib field from the crate-type list. However, keep the cdylib field in the features section, otherwise you will get a warning.

Note: The iridis-node crate is included in the iridis crate, so you don't need to add it as a dependency if you are using iridis directly.

For a plugin

Two different kind of plugins are available in iridis as for now:

  • FileExtPlugin: this plugin allows you to load a file with a specific extension. For example, if you want to load a file with the .py extension, you need to add the PythonFileExt plugin in your runtime.

  • UrlSchemePlugin: this plugin allows you to load a different kind of url than the default file:// scheme. For example, if you want to load a node from the http:// scheme, you would need to add some kind of http plugin. For now, only the file:// and builtin:// schemes are available in iridis, but you can create your own plugin to load a node from a different scheme.

The installation procedure is the same as for a node, but the crate to add is either iridis-file-ext or iridis-url-scheme.

Note: The iridis-file-ext crate and iridis-url-scheme crate are included in the iridis crate, so you don't need to add them as a dependency if you are using iridis directly.

For a message

Each message used in iridis is an Arrow data-format message defined by the ArrowMessage trait in the iridis-message crate. If you want to create custom messages, you need to add the iridis-message crate as a dependency in your Cargo.toml file:

[dependencies]
iridis-message = "0.3"

Or add it with cargo add:

cargo add iridis-message

Note: The iridis-message crate is included in the iridis crate, so you don't need to add it as a dependency if you are using iridis directly.

Usage

Layout and Flows

An application is essentially a 'graph' of nodes. Where nodes can be seen as a 'function' that takes an input and produces an output or a standalone process that will just produce or consume data. The nodes are connected together by 'flows' that describe how the data is passed from one node to another.

Layout

It's really important when making a dataflow application to prepare the layout of the application before even writing the code. You can draw your application on paper and then translate it to code.

The first step is to create a DataflowLayout that describes the number of nodes, their names and their inputs/outputs. This is done by using the node method of the DataflowLayout struct. The node method takes a name and a closure.

#![allow(unused)]
fn main() {
use iridis_layout::prelude::{thirdparty::*, *};

let mut layout = DataflowLayout::new();

let (_source, output) = layout
    .node("source", async |builder: &mut Builder| {
        builder.output("out")
    })
    .await;
}

The node method will create a new node in the layout with the given name and will always return at least the associated NodeID object (which is just a label together with an uuid to represent this node). Inside the closure, you have access to a Builder that can add inputs, outputs and queries to the node. You can return anything you want from the closure, but it is recommended to return the result of the builder methods. This will allow you to use the PrimitiveID object later on to create the flows.

Flows

Once you have create the layout, it's recommended to build it as an immutable shared object:

#![allow(unused)]
fn main() {
let layout = layout.build();
}

Then you can create the connections (the flows) for this layout. This is done by using the Flows struct. The Flows struct takes a DataflowLayout and a closure that will be called to create the flows.

#![allow(unused)]
fn main() {
use iridis_flows::prelude::{thirdparty::*, *};

let flows = Flows::new(layout.clone(), async move |flows: &mut Connector| {
    flows.connect(op_in, output, None)?;
    flows.connect(input, op_out, None)?;

    Ok(())
})
.await?;
}

Note that the connect method will recognize each IO kind, so you don't have to worry about the order between the input and output, the query and the queryable. The connect method will also take care of the creation of the communication channels, and so you can adjust the capacity of the channel by passing a Some(capacity) value as the last parameter. If you pass None, the default capacity will be used (128).

Messages

iridis nodes communicate with each other using the Arrow data-format. This allows for efficient communication without copying the data.

To create Arrow messages, the iridis-message crate defines a trait and a derive procedure to automatically implement the trait for your own types.

The ArrowMessage trait is implemented for all rust primitives (including Option and Enums with no values). It is also implemented for all arrow arrys such as UInt8Array, Float64Array, StringArray, etc.

Then to create a new ArrowMessage, you can use the macro:

#![allow(unused)]
fn main() {
#[derive(ArrowMessage)]
struct MyMessage {}
}

Each field should then implement the ArrowMessage trait.

#![allow(unused)]
fn main() {
use iridis_message::prelude::{
    thirdparty::{arrow_array::*, *},
    *,
};

#[derive(Debug, ArrowMessage)]
struct Metadata {
    name: Option<String>,
    width: u32,
    height: u32,
}

#[derive(Debug, ArrowMessage)]
struct Image {
    data: UInt8Array,
    metadata: Option<Metadata>,
}
}

Then you can create messages without copy, using the Arrow data-format:

#![allow(unused)]
fn main() {
use iridis_message::prelude::{
    thirdparty::{arrow_array::*, arrow_data::*, *},
    *,
};

let image = Image {
    data: UInt8Array::from(vec![1, 2, 3]),
    metadata: Some(Metadata {
        name: Some("example".to_string()),
        width: 12,
        height: 12,
    }),
};

let arrow = ArrayData::try_from(image)?; // No copy made
let image = Image::try_from(arrow)?; // Again, no copy made, this is the same underlying buffer
}

Node

A node is always represented as a rust struct that derives the Node procedure and implements the Node trait. Here are some details about those procedures:

The macros

#[derive(Node)]

The #[derive(Node)] procedure will do two things:

  • Create symbols if the cdylib feature is enabled, so that the runtime is able to load the node dynamically.

  • Implement a default tokio Runtime that will automatically reuse the context runtime if the node has been statically linked, or create a new one if the node is dynamically linked.

The generated code is as follow:

#![allow(unused)]
fn main() {
#[cfg(feature = "cdylib")]
#[doc(hidden)]
#[unsafe(no_mangle)]
pub static IRIDIS_NODE: iridis_node::prelude::DynamicallyLinkedNodeInstance = |inputs, outputs, queries, queryables, configuration| {
    <#name>::new(inputs, outputs, queries, queryables, configuration)
};

static DEFAULT_TOKIO_RUNTIME: std::sync::LazyLock<iridis_node::prelude::thirdparty::tokio::runtime::Runtime> =
    std::sync::LazyLock::new(|| iridis_node::prelude::thirdparty::tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"));

fn default_runtime<T: Send + 'static>(
    task: impl Future<Output = T> + Send + 'static,
) -> iridis_node::prelude::thirdparty::tokio::task::JoinHandle<T> {
    match iridis_node::prelude::thirdparty::tokio::runtime::Handle::try_current() {
        Ok(handle) => handle.spawn(task),
        Err(_) => DEFAULT_TOKIO_RUNTIME.spawn(task)
    }
}
}

#[node(runtime = "default_runtime")] impl Node

First of all, the Node trait implements the following methods:

#![allow(unused)]
fn main() {
pub trait Node: Send + Sync {
    fn new(
        inputs: Inputs,
        outputs: Outputs,
        queries: Queries,
        queryables: Queryables,
        configuration: serde_yml::Value,
    ) -> tokio::task::JoinHandle<Result<Box<dyn Node>>>
    where
        Self: Sized;

    fn start(self: Box<Self>) -> tokio::task::JoinHandle<Result<()>>;
}
}

So each node must implement the new and start methods. See below for more details. As you can see each method returns a tokio::task::JoinHandle, which means that the node is executed in a tokio task. But writing the new method is a bit tricky, because the Node trait is not async, so you can't use async fn directly. Instead, you have to use a runtime (the default_runtime for example) function to spawn the task. This is why we use the procedure #[node(runtime = "default_runtime")].

It transforms your code so that:

#![allow(unused)]
fn main() {
async fn new(
    _: Inputs,
    _: Outputs,
    _: Queries,
    _: Queryables,
    _: serde_yml::Value,
) -> Result<Self> {
    Ok(Self {})
}
}

Will be transformed to that:

#![allow(unused)]
fn main() {
fn new (
    _: Inputs,
    _: Outputs,
    _: Queries,
    _: Queryables,
    _: serde_yml::Value,
) -> JoinHandle<Result<Box<dyn Node>>> {
    default_runtime(async {
        Ok(Box::new(Self {}) as Box<dyn Node>)
    })
}
}

IOs

To better understand what are the parameters of the new method, we must understand the application pipeline:

  • You create a DataflowLayout that describes the number of nodes, their names and their inputs/outputs

  • You create the connections (the flows) for this layout

  • You then pass this to the runtime that will load the nodes according to the layout and the flows.

This is when the new method is called: each node can retrieve the IO objects that are passed to it by the runtime. To retrieve it, simply use the with function:

#![allow(unused)]
fn main() {
Self {
    output: outputs
        .with("out")
        .await
        .wrap_err("Failed to create output")?,
}
}

Be careful, the name out must match the one provided inside the DataflowLayout. Also, the objects are typed by default. Which means that each IO has to precise the ArrowMessage type that it will receive/send or query:

#![allow(unused)]
fn main() {
use iridis_node::prelude::{thirdparty::*, *};

#[derive(Node)]
pub struct MySource {
    pub output: Output<String>,
}
}

See the Messages section for more details about the ArrowMessage type.

In case of Query/Queryables, two ArrowMessage types must be provided, one for the request message, and one for the response message. For example:

#![allow(unused)]
fn main() {
pub compare_to_128: Queryable<u8, String>,
}

Here the request is an u8 and the response is a String.

Note: if you don't want to manipulate typed IOs, you can use the RawInput, RawOutput, RawQuery and RawQueryable alternatives, together with the with_raw function. This will allow you to manipulate ArrayData directly with no serialization/deserialization.

Configuration

Each node also receives a configuration parameter. This is a serde_yaml::Value object that can be used to pass configuration parameters to the node. This is useful if you want to pass some parameters to the node at runtime, so you can use the same node implementation for different configurations.

Start

The start method is called once all nodes have been loaded. It consumes the node and so, when the function returns, the node is no longer available, it will be dropped. You have to take care of the loop yourself. For example, you can have a loop that sends messages every second:

#![allow(unused)]
fn main() {
async fn start(self: Box<Self>) -> Result<()> {
    while let Ok(()) = self
        .output
        .send("tick".to_string())
        .await
        .wrap_err("Failed to send message")
    {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }

    Ok(())
}
}

Here we use a while let Ok(()) loop instead of just a loop, because when aborting the node the send function will eventually return an channel closed error. So we can use this to stop the loop and exit the node gracefully.

The send method takes an ArrowMessage as a parameter, it will then serialize it as an ArrayData without copy, add a Header and send it to the output channel. The input that will receives the message, will receives this DataflowMessage object as a tuple containing the Header and the reserialized ArrowMessage object.

The same logic applies for Query/Queryable. See the examples section for more details.

Runtime

Once a layout and the flows are created, you can create a Runtime that will run the application. Upon creating a runtime, you can load your plugins that will be used to alter the behavior of the runtime. For example, you can load a FileExtPlugin that will allow you to load files with a specific extension, or a UrlSchemePlugin that will allow you to load files with a specific URL scheme.

#![allow(unused)]
fn main() {
let runtime = Runtime::new(
        async |file_ext: &mut FileExtLoader, url_scheme: &mut UrlSchemeLoader| Ok(()),
    )
    .await?;
}

Then you can call the run method to run the application. The run method takes a Flows object and a closure that will be called to load the nodes. The closure takes a Loader object that can be used to load the nodes.

#![allow(unused)]
fn main() {
runtime.run(flows, async move |loader: &mut Loader| {
    loader
        .load::<Transport>(operator, serde_yml::from_str("")?)
        .await?;

    loader
        .load_url(
            Url::parse("file:///path/to/some/dylib")?,
            sink,
            serde_yml::from_str("")?,
        )
        .await?;

    Ok(())
})
.await
}

The pipeline of the node loading by url is as follow:

  • The Loader extracts the scheme of the url.
  • It selects the plugin that matches the scheme.
  • It then calls the load method of the plugin, passing the url and the node to load, it also provides all the file-ext plugins so that the url-scheme plugin can rely on those plugins.

Plugis

FileExtPlugin

A FileExtPlugin takes a PathBuf as parameter and returns a RuntimeNode. It also defines its target file extensions.

#![allow(unused)]
fn main() {
#[derive(FileExtPlugin)]
pub struct DefaultFileExtPlugin {}

#[file_ext_plugin(runtime = "default_runtime")]
impl FileExtPlugin for DefaultFileExtPlugin {
    async fn new() -> Result<Self>
    where
        Self: Sized,
    {
        Ok(DefaultFileExtPlugin {})
    }

    fn target(&self) -> Vec<String> {
        vec!["so".to_string(), "dylib".to_string(), "dll".to_string()]
    }

    async fn load(
        &self,
        path: std::path::PathBuf,

        inputs: Inputs,
        outputs: Outputs,
        queries: Queries,
        queryables: Queryables,
        configuration: serde_yml::Value,
    ) -> Result<iridis_runtime_core::prelude::RuntimeNode> {
        match path.extension() {
            Some(ext) => {
                if ext == std::env::consts::DLL_EXTENSION {
                    let path_buf = path.clone();
                    let (library, constructor) = tokio::task::spawn_blocking(move || {
                        let library = unsafe {
                            #[cfg(target_family = "unix")]
                            let library = libloading::os::unix::Library::open(
                                Some(path_buf.clone()),
                                libloading::os::unix::RTLD_NOW | libloading::os::unix::RTLD_GLOBAL,
                            )
                            .wrap_err(format!("Failed to load path {:?}", path_buf))?;

                            #[cfg(not(target_family = "unix"))]
                            let library = Library::new(path_buf.clone())
                                .wrap_err(format!("Failed to load path {:?}", path_buf))?;

                            library
                        };

                        let constructor = unsafe {
                            library
                                .get::<*mut DynamicallyLinkedNodeInstance>(b"IRIDIS_NODE")
                                .wrap_err(format!(
                                    "Failed to load symbol 'IRIDIS_NODE' from dylib {:?}",
                                    path_buf
                                ))?
                                .read()
                        };

                        Ok::<_, eyre::Report>((library, constructor))
                    })
                    .await??;

                    Ok(RuntimeNode::DynamicallyLinked(DynamicallyLinkedNode {
                        _library: library,
                        handle: (constructor)(inputs, outputs, queries, queryables, configuration)
                            .await?
                            .wrap_err(format!(
                                "Failed to create dynamically linked node from dylib {:?}",
                                path,
                            ))?,
                    }))
                } else {
                    Err(eyre::eyre!(
                        "Unsupported file extension '{:?}'. On this platform it must be '{}'",
                        ext,
                        std::env::consts::DLL_EXTENSION
                    ))
                }
            }
            None => Err(eyre::eyre!("No file extension found for path {:?}", path)),
        }
    }
}
}

UrlSchemePlugin

An UrlSchemePlugin takes a Url as parameter and returns a RuntimeNode. It also defines its target URL schemes. It can uses a FileExtPlugin to load the file extension.

#![allow(unused)]
fn main() {
#[derive(UrlSchemePlugin)]
pub struct DefaultUrlSchemePlugin {}

#[url_scheme_plugin(runtime = "default_runtime")]
impl UrlSchemePlugin for DefaultUrlSchemePlugin {
    async fn new() -> Result<Self>
    where
        Self: Sized,
    {
        Ok(DefaultUrlSchemePlugin {})
    }

    fn target(&self) -> Vec<String> {
        vec!["file".to_string(), "builtin".to_string()]
    }

    #[allow(clippy::too_many_arguments)]
    async fn load(
        &self,
        url: Url,

        inputs: Inputs,
        outputs: Outputs,
        queries: Queries,
        queryables: Queryables,
        configuration: serde_yml::Value,
        file_ext: Arc<FileExtManager>,
    ) -> Result<iridis_runtime_core::prelude::RuntimeNode> {
        match url.scheme() {
            "file" => {
                let path = url
                    .to_file_path()
                    .map_err(|_| eyre::eyre!("Url '{}' cannot be made into a path buf", url))?;

                file_ext
                    .load(path, inputs, outputs, queries, queryables, configuration)
                    .await
            }
            "builtin" => Ok(RuntimeNode::StaticallyLinked(
                new_builtin(
                    Builtin::from_string(url.path())
                        .wrap_err(format!("Invalid builtin name '{}'", url.path()))?,
                    inputs,
                    outputs,
                    queries,
                    queryables,
                    configuration,
                )
                .await?,
            )),
            _ => Err(eyre::eyre!(
                "Url scheme '{}' is not supported",
                url.scheme()
            )),
        }
    }
}
}

Examples

See the example folder

Messages

#![allow(unused)]
fn main() {
use iridis_message::prelude::{
    thirdparty::{arrow_array::*, arrow_data::*, *},
    *,
};

#[derive(Debug, ArrowMessage)]
enum Encoding {
    RGB8,
    RGBA8,
    BGR8,
    BGRA8,
}

#[derive(Debug, ArrowMessage)]
struct Metadata {
    name: Option<String>,
    width: u32,
    height: u32,
    encoding: Option<Encoding>,
}

#[derive(Debug, ArrowMessage)]
struct Image {
    data: UInt8Array,
    metadata: Option<Metadata>,
}
}

Node

#![allow(unused)]
fn main() {
use iridis_node::prelude::{thirdparty::*, *};

#[derive(Node)]
pub struct MyClient {
    pub ask_128: Query<u8, String>,
    pub ask_64: Query<u8, String>,
}

#[node(runtime = "default_runtime")]
impl Node for MyClient {
    async fn new(
        _: Inputs,
        _: Outputs,
        mut queries: Queries,
        _: Queryables,
        _: serde_yml::Value,
    ) -> Result<Self> {
        Ok(Self {
            ask_128: queries
                .with("ask_128")
                .await
                .wrap_err("Failed to create ask_128 queryable")?,
            ask_64: queries
                .with("ask_64")
                .await
                .wrap_err("Failed to create compare_to_64 queryable")?,
        })
    }

    async fn start(mut self: Box<Self>) -> Result<()> {
        let (_, answer) = self
            .ask_128
            .query(100)
            .await
            .wrap_err("Failed to query ask_128")?;

        println!("Answer: {}", answer);

        let (_, answer) = self
            .ask_128
            .query(200)
            .await
            .wrap_err("Failed to query ask_128")?;

        println!("Answer: {}", answer);

        let (_, answer) = self
            .ask_64
            .query(100)
            .await
            .wrap_err("Failed to query ask_64")?;

        println!("Answer: {}", answer);

        let (_, answer) = self
            .ask_64
            .query(2)
            .await
            .wrap_err("Failed to query ask_64")?;

        println!("Answer: {}", answer);

        Ok(())
    }
}

use iridis_node::prelude::{thirdparty::*, *};

#[derive(Node)]
pub struct MyService {
    pub compare_to_128: Queryable<u8, String>,
    pub compare_to_64: Queryable<u8, String>,
}

#[node(runtime = "default_runtime")]
impl Node for MyService {
    async fn new(
        _: Inputs,
        _: Outputs,
        _: Queries,
        mut queryables: Queryables,
        _: serde_yml::Value,
    ) -> Result<Self> {
        Ok(Self {
            compare_to_128: queryables
                .with("compare_to_128")
                .await
                .wrap_err("Failed to create compare_to_128 queryable")?,
            compare_to_64: queryables
                .with("compare_to_64")
                .await
                .wrap_err("Failed to create compare_to_64 queryable")?,
        })
    }

    async fn start(self: Box<Self>) -> Result<()> {
        let mut compare_to_128 = self.compare_to_128;
        let task_128: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
            while let Ok(()) = compare_to_128
                .on_query(async |query| match query > 128 {
                    true => Ok(format!("{} is greater than 128", query).to_string()),
                    false => Ok(format!("{} is less than or equal to 128", query).to_string()),
                })
                .await
            {}

            Ok(())
        });

        let mut compare_to_64 = self.compare_to_64;
        let task_64: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
            while let Ok(()) = compare_to_64
                .on_query(async |query| match query > 64 {
                    true => Ok(format!("{} is greater than 64", query).to_string()),
                    false => Ok(format!("{} is less than or equal to 64", query).to_string()),
                })
                .await
            {}

            Ok(())
        });

        task_128.await??;
        task_64.await??;

        Ok(())
    }
}

use iridis_node::prelude::{thirdparty::*, *};

#[derive(Node)]
pub struct MySink {
    pub input: Input<String>,
}

#[node(runtime = "default_runtime")]
impl Node for MySink {
    async fn new(
        mut inputs: Inputs,
        _: Outputs,
        _: Queries,
        _: Queryables,
        _: serde_yml::Value,
    ) -> Result<Self> {
        Ok(Self {
            input: inputs.with("in").await.wrap_err("Failed to create input")?,
        })
    }

    async fn start(mut self: Box<Self>) -> Result<()> {
        while let Ok((_, message)) = self.input.recv().await {
            println!("Received message: {}", message);
        }

        Ok(())
    }
}

use std::time::Duration;

use iridis_node::prelude::{thirdparty::*, *};

#[derive(Node)]
pub struct MySource {
    pub output: Output<String>,
}

#[node(runtime = "default_runtime")]
impl Node for MySource {
    async fn new(
        _: Inputs,
        mut outputs: Outputs,
        _: Queries,
        _: Queryables,
        _: serde_yml::Value,
    ) -> Result<Self> {
        Ok(Self {
            output: outputs
                .with("out")
                .await
                .wrap_err("Failed to create output")?,
        })
    }

    async fn start(self: Box<Self>) -> Result<()> {
        while let Ok(()) = self
            .output
            .send("tick".to_string())
            .await
            .wrap_err("Failed to send message")
        {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }

        Ok(())
    }
}
}

Runtime

use iridis::prelude::{thirdparty::*, *};

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let mut layout = DataflowLayout::new();

    let (source, output) = layout
        .node("source", async |builder: &mut Builder| {
            builder.output("out")
        })
        .await;

    let (operator, (op_in, op_out)) = layout
        .node("operator", async |builder: &mut Builder| {
            (builder.input("in"), builder.output("out"))
        })
        .await;

    let (sink, input) = layout
        .node("sink", async |builder: &mut Builder| builder.input("in"))
        .await;

    let layout = layout.build();

    let flows = Flows::new(layout.clone(), async move |flows: &mut Connector| {
        flows.connect(op_in, output, None)?;
        flows.connect(input, op_out, None)?;

        Ok(())
    })
    .await?;

    let runtime = Runtime::new(
        async |_file_ext: &mut FileExtLoader, _url_scheme: &mut UrlSchemeLoader| Ok(()),
    )
    .await?;

    runtime
        .run(flows, async move |loader: &mut Loader| {
            loader
                .load_url(
                    iridis_examples::dylib("source", None)?,
                    source,
                    serde_yml::from_str("")?,
                )
                .await?;

            loader
                .load::<Transport>(operator, serde_yml::from_str("")?)
                .await?;

            loader
                .load_url(
                    iridis_examples::dylib("sink", None)?,
                    sink,
                    serde_yml::from_str("")?,
                )
                .await?;

            Ok(())
        })
        .await
}

use iridis::prelude::{thirdparty::*, *};

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt::init();

    let mut layout = DataflowLayout::new();

    let (service, (compare_to_128, compare_to_64)) = layout
        .node("service", async |builder: &mut Builder| {
            (
                builder.queryable("compare_to_128"),
                builder.queryable("compare_to_64"),
            )
        })
        .await;

    let (client, (ask_128, ask_64)) = layout
        .node("client", async |builder: &mut Builder| {
            (builder.query("ask_128"), builder.query("ask_64"))
        })
        .await;

    let layout = layout.build();

    let flows = Flows::new(layout.clone(), async move |flows: &mut Connector| {
        flows.connect(ask_128, compare_to_128, None)?;
        flows.connect(ask_64, compare_to_64, None)?;

        Ok(())
    })
    .await?;

    let runtime = Runtime::new(
        async |_file_ext: &mut FileExtLoader, _url_scheme: &mut UrlSchemeLoader| Ok(()),
    )
    .await?;

    runtime
        .run(flows, async move |loader: &mut Loader| {
            loader
                .load_url(
                    iridis_examples::dylib("service", None)?,
                    service,
                    serde_yml::from_str("")?,
                )
                .await?;

            loader
                .load_url(
                    iridis_examples::dylib("client", None)?,
                    client,
                    serde_yml::from_str("")?,
                )
                .await?;

            Ok(())
        })
        .await
}