API Reference

Junjo: A python library for building and managing complex Graph Workflows.

This library provides the building blocks and tools for wrapping python functions into nodes, edges, and graphs that can be executed by a workflow.

This library also produces annotated Opentelemetry Spans to help make sense of execution telemetry.

class junjo.Condition[source]

Bases: Generic[StateT], ABC

Abstract base class for edge conditions in a workflow graph.

Implement a concrete condition that determines whether a transition along an edge should occur based only on the current state.

This is designed to be used with the Edge class, which represents a directed edge in the workflow graph. The condition is evaluated when determining whether to transition from the tail node to the head node.

Type Parameters:
StateT: The type of the state that the condition will evaluate against.

This should be a subclass of BaseState.

Responsibilities:
  • The condition should be stateless and only depend on the current state.

  • DO NOT use any side effects in the condition (e.g., network calls, database queries).

  • The condition should be a pure function of the state.

class MyCondition(Condition[MyState]):
    def evaluate(self, state: MyState) -> bool: # implement the abstract method
        return state.some_property == "some_value"

my_condition = MyCondition()
edges = [
    Edge(tail=node_1, head=node_2, condition=my_condition),
    Edge(tail=node_2, head=node_3),  # No condition, (or None) means the condition is always valid
]
abstractmethod evaluate(state: StateT) bool[source]

Evaluates whether the transition should occur based on store state.

Parameters:

store – The workflow store containing the current state.

Returns:

True if the transition should occur, False otherwise.

class junjo.Graph(source: Node | _NestableWorkflow, sink: Node | _NestableWorkflow, edges: list[Edge])[source]

Bases: object

Represents a directed graph of nodes and edges, defining the structure and flow of a workflow.

The Graph class is a fundamental component in Junjo, responsible for encapsulating the relationships between different processing units (Nodes or Subflows) and the conditions under which transitions between them occur.

It holds references to the entry point (source) and exit point (sink) of the graph, as well as a list of all edges that connect the nodes.

Parameters:
  • source (Node | _NestableWorkflow) – The starting node or subflow of the graph. Execution of the workflow begins here.

  • sink (Node | _NestableWorkflow) – The terminal node or subflow of the graph. Reaching this node signifies the completion of the workflow.

  • edges (list[Edge]) – A list of Edge instances that define the connections and transition logic between nodes in the graph.

Example:

from junjo import Node, Edge, Graph, BaseStore, Condition, BaseState

# Define a simple state (can be more complex in real scenarios)
class MyWorkflowState(BaseState):
    count: int | None = None

# Define a simple store
class MyWorkflowStore(BaseStore[MyWorkflowState]):
    async def set_count(self, payload: int) -> None:
        await self.set_state({"count": payload})

# Define some simple nodes
class FirstNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("First Node Executed")

class CountItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        # In a real scenario, you might get items from state and count them
        await store.set_count(5) # Example count
        print("Counted items")

class EvenItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Path taken for even items count.")

class OddItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Path taken for odd items count.")

class FinalNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Final Node Executed")

# Define a condition
class CountIsEven(Condition[MyWorkflowState]):
    def evaluate(self, state: MyWorkflowState) -> bool:
        if state.count is None:
            return False
        return state.count % 2 == 0

# Instantiate the nodes
first_node = FirstNode()
count_items_node = CountItemsNode()
even_items_node = EvenItemsNode()
odd_items_node = OddItemsNode()
final_node = FinalNode()

# Create the workflow graph
workflow_graph = Graph(
    source=first_node,
    sink=final_node,
    edges=[
        Edge(tail=first_node, head=count_items_node),
        Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()),
        Edge(tail=count_items_node, head=odd_items_node), # Fallback
        Edge(tail=even_items_node, head=final_node),
        Edge(tail=odd_items_node, head=final_node),
    ]
)
export_graphviz_assets(out_dir: str | Path = 'graphviz_out', fmt: str = 'svg', dot_cmd: str = 'dot', open_html: bool = False, clean: bool = True) dict[str, Path][source]

Render every digraph produced by to_dot_notation() and build a gallery HTML page whose headings use the human labels (e.g. “SampleSubflow”) instead of raw digraph identifiers.

Return type:

Ordered mapping digraph_name → rendered file path, **in encounter order**.

async get_next_node(store: BaseStore, current_node: Node | _NestableWorkflow) Node | _NestableWorkflow[source]

Retrieves the next node (or workflow / subflow) in the graph for the given current node. This method checks the edges connected to the current node and resolves the next node based on the conditions defined in the edges.

Parameters:
  • store (BaseStore) – The store instance to use for resolving the next node.

  • current_node (Node | _NestableWorkflow) – The current node or subflow in the graph.

Returns:

The next node or subflow in the graph.

Return type:

Node | _NestableWorkflow

serialize_to_json_string() str[source]

Converts the graph to a neutral serialized JSON string, representing RunConcurrent instances as subgraphs and includes Subflow graphs as well.

Returns:

A JSON string containing the graph structure.

Return type:

str

to_dot_notation() str[source]

Render the Junjo graph as a main overview digraph plus one additional digraph for each Subflow.

Strategy

  • In the overview we treat every Subflow node as an atomic component (shape=component, fillcolour light-yellow).

  • Any RunConcurrent node is rendered as a cluster, exactly like before.

  • For every Subflow we emit a second digraph subflow_<id> that expands its internal graph, again treating nested Subflows as atomic macro nodes (so the drill-down is recursive).

to_mermaid() str[source]

Converts the graph to Mermaid syntax. This is a placeholder for future implementation.

class junjo.Workflow(graph: Graph, store_factory: StoreFactory[StoreT], max_iterations: int = 100, hook_manager: HookManager | None = None, name: str | None = None)[source]

Bases: _NestableWorkflow[StateT, StoreT, None, None]

A Workflow is a top-level, executable collection of nodes and edges arranged as a graph. It manages its own state and store, distinct from any parent or sub-workflows.

This class is generic and requires four type parameters for a convenient and type safe developer experience:

Generic Type Parameters:
StateT: The type of state managed by this workflow, (subclass of BaseState)
StoreT: The type of store used by this workflow, (subclass of BaseStore)
Parameters:
  • name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.

  • graph (Graph) – The graph of nodes and edges that defines the workflow’s structure and execution flow.

  • store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (StoreT). This factory is invoked at the beginning of each execute() call to ensure a fresh state for the workflow’s specific execution.

  • max_iterations (int, optional) – The maximum number of times any single node can be executed within one workflow run. This helps prevent infinite loops. Defaults to 100.

  • hook_manager (HookManager | None, optional) – An optional HookManager for handling workflow lifecycle events and telemetry. Defaults to None.

workflow = Workflow[MyGraphState, MyGraphStore](
    name="demo_base_workflow",
    graph=graph,
    store_factory=lambda: MyGraphStore(initial_state=MyGraphState()),
    hook_manager=HookManager(verbose_logging=False, open_telemetry=True),
)
await workflow.execute()
class junjo.Subflow(graph: Graph, store_factory: StoreFactory[StoreT], max_iterations: int = 100, hook_manager: HookManager | None = None, name: str | None = None)[source]

Bases: _NestableWorkflow[StateT, StoreT, ParentStateT, ParentStoreT], ABC

A Subflow is a workflow that:
1. Executes within a parent workflow or parent subflow
2. Has its own isolated state and store
3. Can interact with it’s parent workflow state before and after execution via pre_run_actions() and post_run_actions()

This class is generic and requires four type parameters for a convenient and type safe developer experience:

Generic Type Parameters:
StateT: The type of state managed by this subflow, (subclass of BaseState)
StoreT: The type of store used by this subflow, (subclass of BaseStore)
ParentStateT: The type of state managed by the parent workflow, (subclass of BaseState)
ParentStoreT: The type of store used by the parent workflow, (subclass of BaseStore)
Parameters:
  • name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.

  • graph (Graph) – The graph of nodes and edges that defines the workflow’s structure and execution flow.

  • store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (StoreT). This factory is invoked at the beginning of each execute() call to ensure a fresh state for the workflow’s specific execution.

  • max_iterations (int, optional) – The maximum number of times any single node can be executed within one workflow run. This helps prevent infinite loops. Defaults to 100.

  • hook_manager (HookManager | None, optional) – An optional HookManager for handling workflow lifecycle events and telemetry. Defaults to None.

class ExampleSubflow(Subflow[SubflowState, SubflowStore, ParentState, ParentStore]):
    async def pre_run_actions(self, parent_store):
        parent_state = await parent_store.get_state()
        await self.store.set_parameter({
            "parameter": parent_state.parameter
        })

    async def post_run_actions(self, parent_store):
        async def post_run_actions(self, parent_store):
            sub_flow_state = await self.get_state()
            await parent_store.set_subflow_result(self, sub_flow_state.result)

# Instantiate the subflow
example_subflow = ExampleSubflow(
    graph=example_subflow_graph,
    store_factory=lambda: ExampleSubflowStore(
        initial_state=ExampleSubflowState()
    ),
)
abstractmethod async post_run_actions(parent_store: ParentStoreT) None[source]

This method is called after the workflow has run.

This is where you can update the parent store with the results of the workflow. This is useful for subflows that need to update the parent workflow store with their results.

Parameters:

parent_store – The parent store to update.

abstractmethod async pre_run_actions(parent_store: ParentStoreT) None[source]

This method is called before the workflow has run.

This is where you can pass initial state values from the parent workflow to the subflow state.

Parameters:

parent_store – The parent store to interact with.

In this example, we are passing a parameter from the parent store to the subflow store, using the subflow’s set_parameter method, defined in the subflow’s store.

class junjo.Node[source]

Bases: Generic[StoreT], ABC

Nodes are the building blocks of a workflow. They represent a single unit of work that can be executed within the context of a workflow.

Place business logic to be executed by the node in the service method. The service method is where the main logic of the node resides. It will be wrapped and annotated with OpenTelemetry tracing.

The Node is meant to remain decoupled from your business logic. While you can place business logic directly in the service method, it is recommended that you call a service function located in a separate module. This allows for better separation of concerns and makes it easier to test and maintain your code.

Type Parameters:

StoreT: The workflow store type that will be passed into this node during execution.

Responsibilities:
  • The Workflow passes the store to the node’s execute function.

  • The service function implements side effects using that store.

Example implementation: .. code-block:: python

class SaveMessageNode(Node[MessageWorkflowStore]):
async def service(self, store) -> None:

state = await store.get_state() # Get the current state

# Perform some business logic sentiment = await get_messasge_sentiment(state.message)

# Perform a state update await store.set_message_sentiment(sentiment)

add_patch(patch: JsonPatch) None[source]

Adds a patch to the list of patches.

async execute(store: StoreT, parent_id: str) None[source]

Execute the Node’s service function with OpenTelemetry tracing.

This method is responsible for tracing and error handling. It will acquire a tracer, start a new span, and call the service method. The service method should contain the side effects that this node will perform.

Parameters:
  • store (StoreT) – The store that will be passed to the node’s service function.

  • parent_id (str) – The ID of the parent span. This is used to create a child span for this node’s execution.

property id: str

Returns the unique identifier for the node.

property name: str

Returns the name of the node class instance.

property patches: list[JsonPatch]

Returns the list of patches that have been applied to the state by this node.

abstractmethod async service(store: StoreT) None[source]

This is main logic of the node. The concrete implementation of this method should contain the side effects that this node will perform.

This method is called by the execute method of the node. The execute method is responsible for tracing and error handling.

The service method should not be called directly. Instead, it should be called by the execute method of the node.

DO NOT EXECUTE node.service() DIRECTLY! Use node.execute() instead.

Parameters:

store (StoreT) – The store that will be passed to the node’s service function.

class junjo.RunConcurrent(name: str, items: list[Node | Subflow])[source]

Bases: Node

Execute a list of nodes or subflows concurrently. Under the hood, this uses asyncio.gather to run all items concurrently.

An instance of RunConcurrent can be added to a workflow’s graph the same was as any other node.

Parameters:
  • name – The name of this collection of concurrently executed nodes.

  • items – A list of nodes or subflows to execute with asyncio.gather.

node_1 = NodeOne()
node_2 = NodeTwo()
node_3 = NodeThree()

run_concurrent = RunConcurrent(
    name="Concurrent Execution",
    items=[node_1, node_2, node_3]
)
async execute(store: BaseStore, parent_id: str) None[source]

Execute the RunConcurrent node’s service function with OpenTelemetry tracing. This method is responsible for tracing and error handling.

Parameters:
  • store – The store to use for the items.

  • parent_id – The parent id of the workflow.

property id: str

Returns the unique identifier for the node or subflow.

property name: str

Returns the name of the node class instance.

async service(store: BaseStore) None[source]

Execute the provided nodes and subflows concurrently using asyncio.gather.

class junjo.BaseState[source]

Bases: BaseModel

Common base for states, with no unknown fields allowed.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class junjo.BaseStore(initial_state: StateT)[source]

Bases: Generic[StateT]

BaseStore represents a generic store for managing the state of a workflow. It is designed to be subclassed with a specific state type (Pydantic model).

The store is responsible for:
- Managing the state of the workflow.
- Making immuable updates to the state safely in a concurrent environment.
- Validating state updates against the Pydantic model.
- Providing methods to subscribe to state changes.
- Notifying subscribers when the state changes.

The store uses an asyncio.Lock to ensure that state updates are thread-safe and that subscribers are notified in a safe manner. This is important in an async environment where multiple coroutines may be trying to update the state or subscribe to changes at the same time.

Parameters:

initial_state – The initial state of the store, based on the Pydantic model.

async get_state() StateT[source]

Return a shallow copy of the current state. (Follows immutability principle)

async get_state_json() str[source]

Return the current state as a JSON string.

property id: str

Returns the unique identifier of a given store’s implementation.

async set_state(update: dict) None[source]

Update the store’s state with a dictionary of changes. | - Immutable update with a deep state copy | - Merges the current state with updates using model_copy(update=…). | - Validates that each updated field is valid for StateT. | - If there’s a change, notifies subscribers outside the lock.

Parameters:

update – A dictionary of updates to apply to the state.

class MessageWorkflowState(BaseState): # A pydantic model to represent the state
    received_message: Message

class MessageWorkflowStore(BaseStore[MessageWorkflowState]): # A concrete store for MessageWorkflowState
    async def set_received_message(self, payload: Message) -> None:
        await self.set_state({"received_message": payload})

payload = Message(...)
await store.set_received_message(payload) # Utilizes the set_state method to update a particular field
async subscribe(listener: Callable[[StateT], None] | Callable[[StateT], Awaitable[None]]) Callable[[], Awaitable[None]][source]

Register a listener (sync or async callable) to be called whenever the state changes. Returns an async unsubscribe function that, when awaited, removes this listener.

class junjo.Edge(tail: Node | _NestableWorkflow, head: Node | _NestableWorkflow, condition: Condition[StateT] | None = None)[source]

Bases: object

Represents a directed edge in the workflow graph.

An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur.

Parameters:
  • tail – The source node of the edge (where the transition originates).

  • head – The destination node of the edge (where the transition leads).

  • condition (Condition[StateT]) – An optional function that determines whether the transition from tail to head should occur. If None, the transition is always valid.

async next_node(store: BaseStore) Node | _NestableWorkflow | None[source]

Determines the next node in the workflow based on the edge’s condition (Condition[StateT]).

Parameters:

store (BaseStore) – The store instance to use for resolving the next node.

Returns:

The next node if the transition is valid, otherwise None.