API Reference¶
Junjo: A python library for building and managing complex Graph Workflows.
This library provides the building blocks and tools for wrapping python functions into nodes, edges, and graphs that can be executed by a workflow.
This library also produces annotated Opentelemetry Spans to help make sense of execution telemetry.
- class junjo.Condition[source]¶
-
Abstract base class for edge conditions in a workflow graph.
Implement a concrete condition that determines whether a transition along an edge should occur based only on the current state.
This is designed to be used with the Edge class, which represents a directed edge in the workflow graph. The condition is evaluated when determining whether to transition from the tail node to the head node.
- Type Parameters:
- StateT: The type of the state that the condition will evaluate against.
This should be a subclass of BaseState.
- Responsibilities:
The condition should be stateless and only depend on the current state.
DO NOT use any side effects in the condition (e.g., network calls, database queries).
The condition should be a pure function of the state.
class MyCondition(Condition[MyState]): def evaluate(self, state: MyState) -> bool: # implement the abstract method return state.some_property == "some_value" my_condition = MyCondition() edges = [ Edge(tail=node_1, head=node_2, condition=my_condition), Edge(tail=node_2, head=node_3), # No condition, (or None) means the condition is always valid ]
- class junjo.Graph(source: Node | _NestableWorkflow, sink: Node | _NestableWorkflow, edges: list[Edge])[source]¶
Bases:
object
Represents a directed graph of nodes and edges, defining the structure and flow of a workflow.
The Graph class is a fundamental component in Junjo, responsible for encapsulating the relationships between different processing units (Nodes or Subflows) and the conditions under which transitions between them occur.
It holds references to the entry point (source) and exit point (sink) of the graph, as well as a list of all edges that connect the nodes.
- Parameters:
source (Node | _NestableWorkflow) – The starting node or subflow of the graph. Execution of the workflow begins here.
sink (Node | _NestableWorkflow) – The terminal node or subflow of the graph. Reaching this node signifies the completion of the workflow.
edges (list[Edge]) – A list of
Edge
instances that define the connections and transition logic between nodes in the graph.
Example:
from junjo import Node, Edge, Graph, BaseStore, Condition, BaseState # Define a simple state (can be more complex in real scenarios) class MyWorkflowState(BaseState): count: int | None = None # Define a simple store class MyWorkflowStore(BaseStore[MyWorkflowState]): async def set_count(self, payload: int) -> None: await self.set_state({"count": payload}) # Define some simple nodes class FirstNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: # In a real scenario, you might get items from state and count them await store.set_count(5) # Example count print("Counted items") class EvenItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Final Node Executed") # Define a condition class CountIsEven(Condition[MyWorkflowState]): def evaluate(self, state: MyWorkflowState) -> bool: if state.count is None: return False return state.count % 2 == 0 # Instantiate the nodes first_node = FirstNode() count_items_node = CountItemsNode() even_items_node = EvenItemsNode() odd_items_node = OddItemsNode() final_node = FinalNode() # Create the workflow graph workflow_graph = Graph( source=first_node, sink=final_node, edges=[ Edge(tail=first_node, head=count_items_node), Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), Edge(tail=count_items_node, head=odd_items_node), # Fallback Edge(tail=even_items_node, head=final_node), Edge(tail=odd_items_node, head=final_node), ] )
- export_graphviz_assets(out_dir: str | Path = 'graphviz_out', fmt: str = 'svg', dot_cmd: str = 'dot', open_html: bool = False, clean: bool = True) dict[str, Path] [source]¶
Render every digraph produced by to_dot_notation() and build a gallery HTML page whose headings use the human labels (e.g. “SampleSubflow”) instead of raw digraph identifiers.
- Return type:
Ordered mapping digraph_name → rendered file path, **in encounter order**.
- async get_next_node(store: BaseStore, current_node: Node | _NestableWorkflow) Node | _NestableWorkflow [source]¶
Retrieves the next node (or workflow / subflow) in the graph for the given current node. This method checks the edges connected to the current node and resolves the next node based on the conditions defined in the edges.
- serialize_to_json_string() str [source]¶
Converts the graph to a neutral serialized JSON string, representing RunConcurrent instances as subgraphs and includes Subflow graphs as well.
- Returns:
A JSON string containing the graph structure.
- Return type:
- to_dot_notation() str [source]¶
Render the Junjo graph as a main overview digraph plus one additional digraph for each Subflow.
Strategy¶
In the overview we treat every Subflow node as an atomic component (shape=component, fillcolour light-yellow).
Any RunConcurrent node is rendered as a cluster, exactly like before.
For every Subflow we emit a second digraph subflow_<id> that expands its internal graph, again treating nested Subflows as atomic macro nodes (so the drill-down is recursive).
- class junjo.Workflow(graph: Graph, store_factory: StoreFactory[StoreT], max_iterations: int = 100, hook_manager: HookManager | None = None, name: str | None = None)[source]¶
Bases:
_NestableWorkflow
[StateT
,StoreT
,None
,None
]A Workflow is a top-level, executable collection of nodes and edges arranged as a graph. It manages its own state and store, distinct from any parent or sub-workflows.
This class is generic and requires four type parameters for a convenient and type safe developer experience:
- Generic Type Parameters:
- Parameters:
name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.
graph (Graph) – The graph of nodes and edges that defines the workflow’s structure and execution flow.
store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (
StoreT
). This factory is invoked at the beginning of eachexecute()
call to ensure a fresh state for the workflow’s specific execution.max_iterations (int, optional) – The maximum number of times any single node can be executed within one workflow run. This helps prevent infinite loops. Defaults to 100.
hook_manager (HookManager | None, optional) – An optional
HookManager
for handling workflow lifecycle events and telemetry. Defaults to None.
workflow = Workflow[MyGraphState, MyGraphStore]( name="demo_base_workflow", graph=graph, store_factory=lambda: MyGraphStore(initial_state=MyGraphState()), hook_manager=HookManager(verbose_logging=False, open_telemetry=True), ) await workflow.execute()
- class junjo.Subflow(graph: Graph, store_factory: StoreFactory[StoreT], max_iterations: int = 100, hook_manager: HookManager | None = None, name: str | None = None)[source]¶
Bases:
_NestableWorkflow
[StateT
,StoreT
,ParentStateT
,ParentStoreT
],ABC
- A Subflow is a workflow that:
- 1. Executes within a parent workflow or parent subflow2. Has its own isolated state and store3. Can interact with it’s parent workflow state before and after execution via
pre_run_actions()
andpost_run_actions()
This class is generic and requires four type parameters for a convenient and type safe developer experience:
- Generic Type Parameters:
- StateT: The type of state managed by this subflow, (subclass of
BaseState
)StoreT: The type of store used by this subflow, (subclass ofBaseStore
)ParentStateT: The type of state managed by the parent workflow, (subclass ofBaseState
)ParentStoreT: The type of store used by the parent workflow, (subclass ofBaseStore
)
- Parameters:
name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.
graph (Graph) – The graph of nodes and edges that defines the workflow’s structure and execution flow.
store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (
StoreT
). This factory is invoked at the beginning of eachexecute()
call to ensure a fresh state for the workflow’s specific execution.max_iterations (int, optional) – The maximum number of times any single node can be executed within one workflow run. This helps prevent infinite loops. Defaults to 100.
hook_manager (HookManager | None, optional) – An optional
HookManager
for handling workflow lifecycle events and telemetry. Defaults to None.
class ExampleSubflow(Subflow[SubflowState, SubflowStore, ParentState, ParentStore]): async def pre_run_actions(self, parent_store): parent_state = await parent_store.get_state() await self.store.set_parameter({ "parameter": parent_state.parameter }) async def post_run_actions(self, parent_store): async def post_run_actions(self, parent_store): sub_flow_state = await self.get_state() await parent_store.set_subflow_result(self, sub_flow_state.result) # Instantiate the subflow example_subflow = ExampleSubflow( graph=example_subflow_graph, store_factory=lambda: ExampleSubflowStore( initial_state=ExampleSubflowState() ), )
- abstractmethod async post_run_actions(parent_store: ParentStoreT) None [source]¶
This method is called after the workflow has run.
This is where you can update the parent store with the results of the workflow. This is useful for subflows that need to update the parent workflow store with their results.
- Parameters:
parent_store – The parent store to update.
- abstractmethod async pre_run_actions(parent_store: ParentStoreT) None [source]¶
This method is called before the workflow has run.
This is where you can pass initial state values from the parent workflow to the subflow state.
- Parameters:
parent_store – The parent store to interact with.
In this example, we are passing a parameter from the parent store to the subflow store, using the subflow’s set_parameter method, defined in the subflow’s store.
- class junjo.Node[source]¶
-
Nodes are the building blocks of a workflow. They represent a single unit of work that can be executed within the context of a workflow.
Place business logic to be executed by the node in the service method. The service method is where the main logic of the node resides. It will be wrapped and annotated with OpenTelemetry tracing.
The Node is meant to remain decoupled from your business logic. While you can place business logic directly in the service method, it is recommended that you call a service function located in a separate module. This allows for better separation of concerns and makes it easier to test and maintain your code.
- Type Parameters:
StoreT: The workflow store type that will be passed into this node during execution.
- Responsibilities:
The Workflow passes the store to the node’s execute function.
The service function implements side effects using that store.
Example implementation: .. code-block:: python
- class SaveMessageNode(Node[MessageWorkflowStore]):
- async def service(self, store) -> None:
state = await store.get_state() # Get the current state
# Perform some business logic sentiment = await get_messasge_sentiment(state.message)
# Perform a state update await store.set_message_sentiment(sentiment)
- async execute(store: StoreT, parent_id: str) None [source]¶
Execute the Node’s service function with OpenTelemetry tracing.
This method is responsible for tracing and error handling. It will acquire a tracer, start a new span, and call the service method. The service method should contain the side effects that this node will perform.
- Parameters:
store (StoreT) – The store that will be passed to the node’s service function.
parent_id (str) – The ID of the parent span. This is used to create a child span for this node’s execution.
- property patches: list[JsonPatch]¶
Returns the list of patches that have been applied to the state by this node.
- abstractmethod async service(store: StoreT) None [source]¶
This is main logic of the node. The concrete implementation of this method should contain the side effects that this node will perform.
This method is called by the execute method of the node. The execute method is responsible for tracing and error handling.
The service method should not be called directly. Instead, it should be called by the execute method of the node.
DO NOT EXECUTE node.service() DIRECTLY! Use node.execute() instead.
- Parameters:
store (StoreT) – The store that will be passed to the node’s service function.
- class junjo.RunConcurrent(name: str, items: list[Node | Subflow])[source]¶
Bases:
Node
Execute a list of nodes or subflows concurrently. Under the hood, this uses asyncio.gather to run all items concurrently.
An instance of RunConcurrent can be added to a workflow’s graph the same was as any other node.
- Parameters:
name – The name of this collection of concurrently executed nodes.
items – A list of nodes or subflows to execute with asyncio.gather.
node_1 = NodeOne() node_2 = NodeTwo() node_3 = NodeThree() run_concurrent = RunConcurrent( name="Concurrent Execution", items=[node_1, node_2, node_3] )
- class junjo.BaseState[source]¶
Bases:
BaseModel
Common base for states, with no unknown fields allowed.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class junjo.BaseStore(initial_state: StateT)[source]¶
Bases:
Generic
[StateT
]BaseStore represents a generic store for managing the state of a workflow. It is designed to be subclassed with a specific state type (Pydantic model).
- The store is responsible for:
- - Managing the state of the workflow.- Making immuable updates to the state safely in a concurrent environment.- Validating state updates against the Pydantic model.- Providing methods to subscribe to state changes.- Notifying subscribers when the state changes.
The store uses an asyncio.Lock to ensure that state updates are thread-safe and that subscribers are notified in a safe manner. This is important in an async environment where multiple coroutines may be trying to update the state or subscribe to changes at the same time.
- Parameters:
initial_state – The initial state of the store, based on the Pydantic model.
- async get_state() StateT [source]¶
Return a shallow copy of the current state. (Follows immutability principle)
- async set_state(update: dict) None [source]¶
Update the store’s state with a dictionary of changes. | - Immutable update with a deep state copy | - Merges the current state with updates using model_copy(update=…). | - Validates that each updated field is valid for StateT. | - If there’s a change, notifies subscribers outside the lock.
- Parameters:
update – A dictionary of updates to apply to the state.
class MessageWorkflowState(BaseState): # A pydantic model to represent the state received_message: Message class MessageWorkflowStore(BaseStore[MessageWorkflowState]): # A concrete store for MessageWorkflowState async def set_received_message(self, payload: Message) -> None: await self.set_state({"received_message": payload}) payload = Message(...) await store.set_received_message(payload) # Utilizes the set_state method to update a particular field
- class junjo.Edge(tail: Node | _NestableWorkflow, head: Node | _NestableWorkflow, condition: Condition[StateT] | None = None)[source]¶
Bases:
object
Represents a directed edge in the workflow graph.
An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur.
- Parameters:
tail – The source node of the edge (where the transition originates).
head – The destination node of the edge (where the transition leads).
condition (Condition[StateT]) – An optional function that determines whether the transition from tail to head should occur. If None, the transition is always valid.
- async next_node(store: BaseStore) Node | _NestableWorkflow | None [source]¶
Determines the next node in the workflow based on the edge’s condition (Condition[StateT]).
- Parameters:
store (BaseStore) – The store instance to use for resolving the next node.
- Returns:
The next node if the transition is valid, otherwise None.