MetidaFlows
A lightweight experimental workflow engine for Julia
MetidaFlows.jl is an early-stage project for building typed, graph-based execution workflows in Julia. It is currently under active development and not yet stable for production use.
The goal of the project is to explore how workflow systems can be expressed naturally using Julia’s type system and multiple dispatch, while keeping the core execution model explicit and minimal.
⚠️ Project status
This package is experimental.
- APIs are subject to change without notice
- internal execution semantics are still evolving
- some subsystems are incomplete or in prototype state
- backward compatibility is not guaranteed
You should expect breaking changes while the architecture stabilizes.
Core idea
A workflow in MetidaFlows consists of:
- Nodes — units of computation
- Ports — typed inputs and outputs
- Connections — edges between ports
- Schedulers — execution strategies
Instead of hidden execution magic, everything is explicit:
- execution is driven by a scheduler
- data is passed through typed ports
- invalidation is propagated through the graph
Key features (current)
Typed node system via Julia multiple dispatch
Directed graph workflow model
Strict connection validation (including type checking)
Two execution modes:
- DAW (deterministic DAG execution)
- ABW (queue-based / agent-style execution, experimental)
Incremental execution and invalidation propagation
Node state tracking (
:idle,:dirty,:clean, etc.)Input buffering system
Hooks for validation and execution lifecycle
Basic serialization utilities
Minimal example
using MetidaFlows
using CSV, DataFrames
struct CSVNode <: AbstractNodeType end
struct DataFrameNode <: AbstractNodeType end
csv_spec = NodeSpec(
"Load CSV",
PortSpec[],
[PortSpec("CSV File", CSV.File, :csv)],
[:file]
)
df_spec = NodeSpec(
"DataFrame",
[PortSpec("CSV File", CSV.File, :csv)],
[PortSpec("DataFrame", DataFrame, :dataframe)]
)
function MetidaFlows.execute_unsafe!(node::DataNode{CSVNode})
csv = CSV.File(node.settings[:file])
setdata!(node, :csv, csv)
return [:csv]
end
function MetidaFlows.execute_unsafe!(node::DataNode{DataFrameNode})
csv = getinputdata(node, :csv)
setdata!(node, :dataframe, DataFrame(csv))
return [:dataframe]
end
workflow = Workflow(0)
id1 = add_node!(workflow, DataNode(CSVNode, csv_spec))
id2 = add_node!(workflow, DataNode(DataFrameNode, df_spec))
add_connection!(workflow, id1, :csv, id2, :csv)
setsettings!(workflow, id1, Dict(:file => "data.csv"))
scheduler!(workflow)
df = getdata(workflow, id2, :dataframe)Design goals
This project is guided by a few principles:
explicit execution model (no hidden state machines)
composability via small typed primitives
extensibility through multiple dispatch
clear separation between:
- structure (graph)
- execution (scheduler)
- behavior (nodes)
support for both deterministic and dynamic workflows
Planned direction
The architecture is still evolving. Current exploration areas include:
- improved logging and audit system
- caching and checkpointing strategies
Contributing & feedback
Feedback is especially welcome on:
- execution semantics
- scheduler design
- invalidation model
- API ergonomics
- real-world workflow use cases
The project is intentionally in an exploratory phase, so design discussions are highly valuable at this stage.
Documentation for MetidaFlows.
MetidaFlows.add_connection!MetidaFlows.add_node!MetidaFlows.connection_to_dictMetidaFlows.delete_connection!MetidaFlows.delete_node!MetidaFlows.execute!MetidaFlows.execute_unsafe!MetidaFlows.execution_node_validationMetidaFlows.find_connectionsMetidaFlows.get_childrenMetidaFlows.get_parentsMetidaFlows.getconnectionMetidaFlows.getdataMetidaFlows.getdataMetidaFlows.getidMetidaFlows.getinputdataMetidaFlows.getnodeMetidaFlows.getportconnectionsMetidaFlows.getportnumberMetidaFlows.getporttypeMetidaFlows.getporttypeMetidaFlows.getpositionMetidaFlows.getstateMetidaFlows.getstatusMetidaFlows.haveinputsMetidaFlows.invalidate_downstream!MetidaFlows.isnodeexistMetidaFlows.isportexistMetidaFlows.isreadyMetidaFlows.mark_dirty!MetidaFlows.node_to_dictMetidaFlows.portspec_to_dictMetidaFlows.reset!MetidaFlows.reset_status!MetidaFlows.scheduler!MetidaFlows.scheduler!MetidaFlows.setdata!MetidaFlows.setid!MetidaFlows.setinputbuffer!MetidaFlows.setposition!MetidaFlows.setsettings!MetidaFlows.setsettings_unsafe!MetidaFlows.setstate!MetidaFlows.setstatus!MetidaFlows.spec_to_dictMetidaFlows.validate_nodeMetidaFlows.validate_resultMetidaFlows.validate_settingsMetidaFlows.workflow_to_dict
MetidaFlows.add_connection! — Method
add_connection!(model::Workflow, c::NodeConnection)Add connection to workflow.
Performs:
- connection validation,
- connection registration,
- incoming/outgoing index updates.
If the source node already has status :clean, its output data is immediately propagated into the target node input buffer.
Returns
Assigned connection identifier (Int).
MetidaFlows.add_node! — Method
add_node!(model::Workflow, node::AbstractDataNode)Add node to workflow.
Assigns a new unique node identifier and registers the node in workflow storage.
Returns
Assigned node identifier (Int).
MetidaFlows.connection_to_dict — Method
connection_to_dict(conn::NodeConnection) -> DictConvert connection to dictionary representation.
MetidaFlows.delete_connection! — Method
delete_connection!(model::Workflow, id::Int)Remove connection from workflow.
Performs:
- deletion of corresponding child input buffer entry,
- removal from incoming/outgoing indices,
- deletion from workflow connection storage.
Returns
trueif connection existed and was removed.falseotherwise.
MetidaFlows.delete_node! — Method
delete_node!(model::Workflow, id::Int)Remove node from workflow.
Performs:
- deletion of all incoming and outgoing connections,
- cleanup of connection indices,
- removal of node from workflow storage.
Returns
trueif node existed and was removed.falseotherwise.
MetidaFlows.execute! — Method
execute!(model::Workflow, id::Int; execute_upstream::Bool = true, invalidate_downstream::Bool = true, check_cyclic::Bool = true)Execute workflow node.
Main workflow execution entry point.
Execution Stages
- Initialize per-run execution state and logs.
- Optionally detect recursive cyclic execution.
- Skip execution for nodes already marked
:clean. - Mark node as
:executing. - Optionally execute upstream dependencies recursively.
- Validate node structure and execution readiness.
- Validate node settings.
- Execute node implementation via
execute_unsafe!. - Store execution state (
ready_ports). - Propagate outputs downstream through input buffers.
- Optionally invalidate downstream nodes.
- Validate execution result.
- Mark node as
:clean.
Arguments
execute_upstream: recursively execute all parent nodes before executing current node.invalidate_downstream: invalidate downstream execution results after successful execution.check_cyclic: detect recursive execution loops during direct execution calls.
Returns
Vector of output port labels (Vector{Symbol}) produced during execution.
MetidaFlows.execute_unsafe! — Method
execute_unsafe!(node::AbstractDataNode)Low-level node execution interface.
This function contains node-specific execution logic.
The default implementation throws an error and must be specialized for every executable node type.
MetidaFlows.execution_node_validation — Method
execution_node_validation(node::AbstractDataNode)Internal function. Validate node readiness before execution.
Checks that:
- every declared input port has a corresponding value in
node.input_buffer, validate_nodesucceeds.
This validation is intended for runtime execution safety, ensuring that all required inputs are available before calling execute_unsafe!.
Returns
trueif node is ready for execution.falseotherwise.
MetidaFlows.find_connections — Method
find_connections(model::Workflow, id::Int)Return all connection IDs associated with a node (both incoming and outgoing).
MetidaFlows.get_children — Method
get_children(model::Workflow, id::Int)Get children.
MetidaFlows.get_parents — Method
get_parents(model::Workflow, id::Int)Return parents - id vector
MetidaFlows.getconnection — Method
getconnection(model::Workflow, id::Int)Return connection by identifier id.
MetidaFlows.getdata — Method
getdata(node::AbstractDataNode, l::Symbol)Return output data stored under output port label.
MetidaFlows.getdata — Method
getdata(model::Workflow, id::Int, l::Symbol)Return output data stored under output port label.
MetidaFlows.getid — Method
getid(node::AbstractDataNode) -> IntReturn node unique identifier.
MetidaFlows.getinputdata — Method
getinputdata(node::AbstractDataNode, l::Symbol)Read value from node input buffer.
Does NOT consume or clear buffer.
MetidaFlows.getnode — Method
getnode(model::Workflow, id::Int)Return node by identifier id.
MetidaFlows.getportconnections — Method
getportconnections(model::Workflow, id::Int, label::Symbol; direction = :both)Return all connections attached to a specific port.
Direction:
:input:output:both
MetidaFlows.getportnumber — Method
getportnumber(node::AbstractDataNode, l::Symbol, direction::Symbol)Return index of port by label and direction (:input or :output).
MetidaFlows.getporttype — Method
getporttype(node::AbstractDataNode, i::Int, direction::Symbol)Return Julia datatype of port by index and direction.
MetidaFlows.getporttype — Method
getporttype(node, label, direction) -> TypeReturn Julia datatype of port by label.
MetidaFlows.getposition — Method
getposition(node::AbstractDataNode) -> Tuple{Int,Int}Return node UI/graph position.
MetidaFlows.getstate — Method
getstate(node::AbstractDataNode, s::Symbol)Get value from node execution state.
MetidaFlows.getstatus — Method
getstatus(node::AbstractDataNode) -> SymbolReturn execution status of node.
Possible values:
:idle:dirty:clean:executing:failed:invalid_node:invalid_settings:invalid_result
MetidaFlows.haveinputs — Method
haveinputs(node::AbstractDataNode)Returns true if node has at least one input port defined in its spec.
MetidaFlows.invalidate_downstream! — Method
invalidate_downstream!(model::Workflow, id::Int)Recursively invalidate all downstream nodes.
Marks the specified node and all descendant nodes as :dirty using mark_dirty!.
The traversal follows all outgoing connections recursively.
MetidaFlows.isnodeexist — Method
isnodeexist(model::Workflow, id::Int)MetidaFlows.isportexist — Function
isportexist(node::AbstractDataNode, port::Symbol, direction::Symbol = :any)Check whether a port exists in node specification.
Direction:
:input:output:any
MetidaFlows.isready — Method
isready(model::Workflow, id::Int)Check whether node is ready for execution.
A node is considered ready when:
- all parent nodes connected through incoming edges have status
:clean.
Notes
- Current node status itself is not checked.
- Input buffer completeness is validated separately via
execution_node_validation.
MetidaFlows.mark_dirty! — Method
mark_dirty!(node::AbstractDataNode)Invalidate node execution result.
Performs the following operations:
- sets node status to
:dirty, - clears
ready_ports, - clears cached output data stored in
node.data.
This function intentionally does not clear:
- node settings,
- input buffer,
- execution logs,
- execution counters/state metadata.
MetidaFlows.node_to_dict — Method
node_to_dict(node::DataNode) -> DictConvert node to JSON-serializable dictionary.
MetidaFlows.portspec_to_dict — Method
portspec_to_dict(ps::PortSpec) -> DictConvert PortSpec to dictionary representation.
MetidaFlows.reset! — Method
reset!(model::Workflow)Reset workflow execution state.
Marks every node in the workflow as :dirty using mark_dirty!, clears node execution state, and removes all cached output data.
MetidaFlows.reset_status! — Method
reset_status!(model::Workflow)Reset only node statuses.
Sets status of every node in the workflow to :dirty.
MetidaFlows.scheduler! — Method
scheduler!(model::Workflow{ABW}; maxiter = 1000)Execute workflow using queue-based agent/event scheduling.
This scheduler is intended for dynamic or agent-based workflows (ABW), where execution readiness is determined during runtime.
Arguments
maxiter: maximum number of scheduler iterations before aborting execution.
MetidaFlows.scheduler! — Method
scheduler!(model::Workflow{DAW})Execute entire data analysis workflow (DAW) using topological ordering.
This scheduler is designed for deterministic acyclic data-analysis workflows.
Execution Steps
- Build workflow graph.
- Validate graph acyclicity.
- Generate new workflow
run_id. - Reset node execution states.
- Execute nodes in topological order.
Notes
- Nodes are executed exactly once per scheduler run.
- Upstream execution and downstream invalidation are disabled because execution order is already guaranteed by topology.
- Cyclic workflows are rejected before execution starts.
MetidaFlows.setdata! — Method
setdata!(node::AbstractDataNode, l::Symbol, d)Store output data in node.
MetidaFlows.setid! — Method
setid!(node::AbstractDataNode, id::Int) -> DataNodeSet node identifier. Mutates node in-place.
MetidaFlows.setinputbuffer! — Method
setinputbuffer!(node, label, value) -> BoolWrite value into node input buffer.
Used by workflow engine to propagate outputs between nodes.
MetidaFlows.setposition! — Method
setposition!(node::AbstractDataNode, p::Tuple{Int,Int}) -> DataNodeSet UI/graph position of node.
MetidaFlows.setsettings! — Method
setsettings!(model::Workflow, id::Int, settings::Dict{Symbol, <: Any})Apply new node settings and invalidate dependent nodes.
Settings are applied using setsettings_unsafe!, after which the target node and all downstream nodes are invalidated via invalidate_downstream!.
Notes
This function is the safe high-level entry point for mutating node configuration inside a workflow.
MetidaFlows.setsettings_unsafe! — Method
setsettings_unsafe!(node::AbstractDataNode, settings::Dict{Symbol, <: Any})Direct mutation of node settings without invalidation. Can be re-implemented for every node type.
Default implementation copies all provided key-value pairs into node.settings.
Warning
This function does NOT invalidate cached execution results or downstream nodes.
Use setsettings! for normal workflow operation
MetidaFlows.setstate! — Method
setstate!(node::AbstractDataNode, s::Symbol, v) -> DataNodeStore value in node execution state.
MetidaFlows.setstatus! — Method
setstatus!(node::AbstractDataNode, s::Symbol) -> DataNodeSet execution status of node.
This does NOT trigger validation or propagation. Pure mutation.
MetidaFlows.spec_to_dict — Method
spec_to_dict(spec::NodeSpec) -> DictConvert NodeSpec to dictionary representation.
MetidaFlows.validate_node — Method
validate_node(node::AbstractDataNode)Validate node structure and configuration.
Default implementation always returns true.
This function is intended for specialization by concrete node implementations.
Typical validation rules may include:
- internal consistency checks,
- structural constraints,
- node-specific invariants.
Returns
trueif node structure is valid.falseotherwise.
MetidaFlows.validate_result — Method
validate_result(node::AbstractDataNode)Validate node execution result.
Called after node execution completes.
Default implementation always returns true.
This function is intended for specialization by concrete node implementations.
Typical validation rules may include:
- output datatype verification,
- required output ports presence,
- shape or schema validation,
- domain-specific consistency checks.
Returns
trueif execution result is valid.falseotherwise.
MetidaFlows.validate_settings — Method
validate_settings(node::AbstractDataNode)Validate node settings before execution.
Default implementation always returns true.
This function is intended for specialization by concrete node implementations.
Typical validation rules may include:
- required setting presence,
- range checks,
- semantic validation of configuration values.
Returns
trueif settings are valid.falseotherwise.
MetidaFlows.workflow_to_dict — Method
workflow_to_dict(w::Workflow) -> DictConvert workflow to dictionary representation.