MetidaFlows

A lightweight experimental workflow engine for Julia


MetidaFlows.jl is an early-stage project for building typed, graph-based execution workflows in Julia. It is currently under active development and not yet stable for production use.

The goal of the project is to explore how workflow systems can be expressed naturally using Julia’s type system and multiple dispatch, while keeping the core execution model explicit and minimal.


⚠️ Project status

This package is experimental.

  • APIs are subject to change without notice
  • internal execution semantics are still evolving
  • some subsystems are incomplete or in prototype state
  • backward compatibility is not guaranteed

You should expect breaking changes while the architecture stabilizes.


Core idea

A workflow in MetidaFlows consists of:

  • Nodes — units of computation
  • Ports — typed inputs and outputs
  • Connections — edges between ports
  • Schedulers — execution strategies

Instead of hidden execution magic, everything is explicit:

  • execution is driven by a scheduler
  • data is passed through typed ports
  • invalidation is propagated through the graph

Key features (current)

  • Typed node system via Julia multiple dispatch

  • Directed graph workflow model

  • Strict connection validation (including type checking)

  • Two execution modes:

    • DAW (deterministic DAG execution)
    • ABW (queue-based / agent-style execution, experimental)
  • Incremental execution and invalidation propagation

  • Node state tracking (:idle, :dirty, :clean, etc.)

  • Input buffering system

  • Hooks for validation and execution lifecycle

  • Basic serialization utilities


Minimal example

using MetidaFlows
using CSV, DataFrames

struct CSVNode <: AbstractNodeType end
struct DataFrameNode <: AbstractNodeType end

csv_spec = NodeSpec(
    "Load CSV",
    PortSpec[],
    [PortSpec("CSV File", CSV.File, :csv)],
    [:file]
)

df_spec = NodeSpec(
    "DataFrame",
    [PortSpec("CSV File", CSV.File, :csv)],
    [PortSpec("DataFrame", DataFrame, :dataframe)]
)

function MetidaFlows.execute_unsafe!(node::DataNode{CSVNode})
    csv = CSV.File(node.settings[:file])
    setdata!(node, :csv, csv)
    return [:csv]
end

function MetidaFlows.execute_unsafe!(node::DataNode{DataFrameNode})
    csv = getinputdata(node, :csv)
    setdata!(node, :dataframe, DataFrame(csv))
    return [:dataframe]
end

workflow = Workflow(0)

id1 = add_node!(workflow, DataNode(CSVNode, csv_spec))
id2 = add_node!(workflow, DataNode(DataFrameNode, df_spec))

add_connection!(workflow, id1, :csv, id2, :csv)

setsettings!(workflow, id1, Dict(:file => "data.csv"))

scheduler!(workflow)

df = getdata(workflow, id2, :dataframe)

Design goals

This project is guided by a few principles:

  • explicit execution model (no hidden state machines)

  • composability via small typed primitives

  • extensibility through multiple dispatch

  • clear separation between:

    • structure (graph)
    • execution (scheduler)
    • behavior (nodes)
  • support for both deterministic and dynamic workflows


Planned direction

The architecture is still evolving. Current exploration areas include:

  • improved logging and audit system
  • caching and checkpointing strategies

Contributing & feedback

Feedback is especially welcome on:

  • execution semantics
  • scheduler design
  • invalidation model
  • API ergonomics
  • real-world workflow use cases

The project is intentionally in an exploratory phase, so design discussions are highly valuable at this stage.

Documentation for MetidaFlows.

MetidaFlows.add_connection!Method
add_connection!(model::Workflow, c::NodeConnection)

Add connection to workflow.

Performs:

  • connection validation,
  • connection registration,
  • incoming/outgoing index updates.

If the source node already has status :clean, its output data is immediately propagated into the target node input buffer.

Returns

Assigned connection identifier (Int).

source
MetidaFlows.add_node!Method
add_node!(model::Workflow, node::AbstractDataNode)

Add node to workflow.

Assigns a new unique node identifier and registers the node in workflow storage.

Returns

Assigned node identifier (Int).

source
MetidaFlows.delete_connection!Method
delete_connection!(model::Workflow, id::Int)

Remove connection from workflow.

Performs:

  • deletion of corresponding child input buffer entry,
  • removal from incoming/outgoing indices,
  • deletion from workflow connection storage.

Returns

  • true if connection existed and was removed.
  • false otherwise.
source
MetidaFlows.delete_node!Method
delete_node!(model::Workflow, id::Int)

Remove node from workflow.

Performs:

  • deletion of all incoming and outgoing connections,
  • cleanup of connection indices,
  • removal of node from workflow storage.

Returns

  • true if node existed and was removed.
  • false otherwise.
source
MetidaFlows.execute!Method
execute!(model::Workflow, id::Int; execute_upstream::Bool = true, invalidate_downstream::Bool = true, check_cyclic::Bool = true)

Execute workflow node.

Main workflow execution entry point.

Execution Stages

  1. Initialize per-run execution state and logs.
  2. Optionally detect recursive cyclic execution.
  3. Skip execution for nodes already marked :clean.
  4. Mark node as :executing.
  5. Optionally execute upstream dependencies recursively.
  6. Validate node structure and execution readiness.
  7. Validate node settings.
  8. Execute node implementation via execute_unsafe!.
  9. Store execution state (ready_ports).
  10. Propagate outputs downstream through input buffers.
  11. Optionally invalidate downstream nodes.
  12. Validate execution result.
  13. Mark node as :clean.

Arguments

  • execute_upstream: recursively execute all parent nodes before executing current node.
  • invalidate_downstream: invalidate downstream execution results after successful execution.
  • check_cyclic: detect recursive execution loops during direct execution calls.

Returns

Vector of output port labels (Vector{Symbol}) produced during execution.

source
MetidaFlows.execute_unsafe!Method
execute_unsafe!(node::AbstractDataNode)

Low-level node execution interface.

This function contains node-specific execution logic.

The default implementation throws an error and must be specialized for every executable node type.

source
MetidaFlows.execution_node_validationMethod
execution_node_validation(node::AbstractDataNode)

Internal function. Validate node readiness before execution.

Checks that:

  • every declared input port has a corresponding value in node.input_buffer,
  • validate_node succeeds.

This validation is intended for runtime execution safety, ensuring that all required inputs are available before calling execute_unsafe!.

Returns

  • true if node is ready for execution.
  • false otherwise.
source
MetidaFlows.getdataMethod
getdata(node::AbstractDataNode, l::Symbol)

Return output data stored under output port label.

source
MetidaFlows.getdataMethod
getdata(model::Workflow, id::Int, l::Symbol)

Return output data stored under output port label.

source
MetidaFlows.getinputdataMethod
getinputdata(node::AbstractDataNode, l::Symbol)

Read value from node input buffer.

Does NOT consume or clear buffer.

source
MetidaFlows.getportconnectionsMethod
getportconnections(model::Workflow, id::Int, label::Symbol; direction = :both)

Return all connections attached to a specific port.

Direction:

  • :input
  • :output
  • :both
source
MetidaFlows.getportnumberMethod
getportnumber(node::AbstractDataNode, l::Symbol, direction::Symbol)

Return index of port by label and direction (:input or :output).

source
MetidaFlows.getporttypeMethod
getporttype(node::AbstractDataNode, i::Int, direction::Symbol)

Return Julia datatype of port by index and direction.

source
MetidaFlows.getstatusMethod
getstatus(node::AbstractDataNode) -> Symbol

Return execution status of node.

Possible values:

  • :idle
  • :dirty
  • :clean
  • :executing
  • :failed
  • :invalid_node
  • :invalid_settings
  • :invalid_result
source
MetidaFlows.haveinputsMethod
haveinputs(node::AbstractDataNode)

Returns true if node has at least one input port defined in its spec.

source
MetidaFlows.invalidate_downstream!Method
invalidate_downstream!(model::Workflow, id::Int)

Recursively invalidate all downstream nodes.

Marks the specified node and all descendant nodes as :dirty using mark_dirty!.

The traversal follows all outgoing connections recursively.

source
MetidaFlows.isportexistFunction
isportexist(node::AbstractDataNode, port::Symbol, direction::Symbol = :any)

Check whether a port exists in node specification.

Direction:

  • :input
  • :output
  • :any
source
MetidaFlows.isreadyMethod
isready(model::Workflow, id::Int)

Check whether node is ready for execution.

A node is considered ready when:

  • all parent nodes connected through incoming edges have status :clean.

Notes

source
MetidaFlows.mark_dirty!Method
mark_dirty!(node::AbstractDataNode)

Invalidate node execution result.

Performs the following operations:

  • sets node status to :dirty,
  • clears ready_ports,
  • clears cached output data stored in node.data.

This function intentionally does not clear:

  • node settings,
  • input buffer,
  • execution logs,
  • execution counters/state metadata.
source
MetidaFlows.reset!Method
reset!(model::Workflow)

Reset workflow execution state.

Marks every node in the workflow as :dirty using mark_dirty!, clears node execution state, and removes all cached output data.

source
MetidaFlows.scheduler!Method
scheduler!(model::Workflow{ABW}; maxiter = 1000)

Execute workflow using queue-based agent/event scheduling.

This scheduler is intended for dynamic or agent-based workflows (ABW), where execution readiness is determined during runtime.

Arguments

  • maxiter: maximum number of scheduler iterations before aborting execution.
source
MetidaFlows.scheduler!Method
scheduler!(model::Workflow{DAW})

Execute entire data analysis workflow (DAW) using topological ordering.

This scheduler is designed for deterministic acyclic data-analysis workflows.

Execution Steps

  1. Build workflow graph.
  2. Validate graph acyclicity.
  3. Generate new workflow run_id.
  4. Reset node execution states.
  5. Execute nodes in topological order.

Notes

  • Nodes are executed exactly once per scheduler run.
  • Upstream execution and downstream invalidation are disabled because execution order is already guaranteed by topology.
  • Cyclic workflows are rejected before execution starts.
source
MetidaFlows.setid!Method
setid!(node::AbstractDataNode, id::Int) -> DataNode

Set node identifier. Mutates node in-place.

source
MetidaFlows.setinputbuffer!Method
setinputbuffer!(node, label, value) -> Bool

Write value into node input buffer.

Used by workflow engine to propagate outputs between nodes.

source
MetidaFlows.setsettings!Method
setsettings!(model::Workflow, id::Int, settings::Dict{Symbol, <: Any})

Apply new node settings and invalidate dependent nodes.

Settings are applied using setsettings_unsafe!, after which the target node and all downstream nodes are invalidated via invalidate_downstream!.

Notes

This function is the safe high-level entry point for mutating node configuration inside a workflow.

source
MetidaFlows.setsettings_unsafe!Method
setsettings_unsafe!(node::AbstractDataNode, settings::Dict{Symbol, <: Any})

Direct mutation of node settings without invalidation. Can be re-implemented for every node type.

Default implementation copies all provided key-value pairs into node.settings.

Warning

This function does NOT invalidate cached execution results or downstream nodes.

Use setsettings! for normal workflow operation

source
MetidaFlows.setstatus!Method
setstatus!(node::AbstractDataNode, s::Symbol) -> DataNode

Set execution status of node.

This does NOT trigger validation or propagation. Pure mutation.

source
MetidaFlows.validate_nodeMethod
validate_node(node::AbstractDataNode)

Validate node structure and configuration.

Default implementation always returns true.

This function is intended for specialization by concrete node implementations.

Typical validation rules may include:

  • internal consistency checks,
  • structural constraints,
  • node-specific invariants.

Returns

  • true if node structure is valid.
  • false otherwise.
source
MetidaFlows.validate_resultMethod
validate_result(node::AbstractDataNode)

Validate node execution result.

Called after node execution completes.

Default implementation always returns true.

This function is intended for specialization by concrete node implementations.

Typical validation rules may include:

  • output datatype verification,
  • required output ports presence,
  • shape or schema validation,
  • domain-specific consistency checks.

Returns

  • true if execution result is valid.
  • false otherwise.
source
MetidaFlows.validate_settingsMethod
validate_settings(node::AbstractDataNode)

Validate node settings before execution.

Default implementation always returns true.

This function is intended for specialization by concrete node implementations.

Typical validation rules may include:

  • required setting presence,
  • range checks,
  • semantic validation of configuration values.

Returns

  • true if settings are valid.
  • false otherwise.
source