API Reference

Junjo: A python library for building and managing complex Graph Workflows.

This library provides the building blocks and tools for wrapping python functions into nodes, edges, and graphs that can be executed by a workflow.

This library also produces annotated Opentelemetry Spans to help make sense of execution telemetry.

class junjo.Condition[source]

Bases: Generic[StateT], ABC

Abstract base class for edge conditions in a workflow graph.

Implement a concrete condition that determines whether a transition along an edge should occur based only on the current state.

This is designed to be used with the Edge class, which represents a directed edge in the workflow graph. The condition is evaluated when determining whether to transition from the tail node to the head node.

Type Parameters:
StateT: The type of the state that the condition will evaluate against.

This should be a subclass of BaseState.

Responsibilities:
  • The condition should be stateless and only depend on the current state.

  • DO NOT use any side effects in the condition (e.g., network calls, database queries).

  • The condition should be a pure function of the state.

class MyCondition(Condition[MyState]):
    def evaluate(self, state: MyState) -> bool: # implement the abstract method
        return state.some_property == "some_value"

my_condition = MyCondition()
edges = [
    Edge(tail=node_1, head=node_2, condition=my_condition),
    Edge(tail=node_2, head=node_3),  # No condition, (or None) means the condition is always valid
]
abstractmethod evaluate(state: StateT) bool[source]

Evaluates whether the transition should occur based on store state.

Parameters:

store – The workflow store containing the current state.

Returns:

True if the transition should occur, False otherwise.

class junjo.Graph(source: Node | _NestableWorkflow, sink: Node | _NestableWorkflow, edges: list[Edge])[source]

Bases: object

Represents a directed graph of nodes and edges.

async get_next_node(store: BaseStore, current_node: Node | _NestableWorkflow) Node | _NestableWorkflow[source]

Retrieves the next node (or workflow / subflow) in the graph for the given current node. This method checks the edges connected to the current node and resolves the next node based on the conditions defined in the edges.

Parameters:
  • store (BaseStore) – The store instance to use for resolving the next node.

  • current_node (Node | _NestableWorkflow) – The current node or subflow in the graph.

Returns:

The next node or subflow in the graph.

Return type:

Node | _NestableWorkflow

serialize_to_json_string() str[source]

Converts the graph to a neutral serialized JSON string, representing RunConcurrent instances as subgraphs and includes Subflow graphs as well.

Returns:

A JSON string containing the graph structure.

Return type:

str

to_dot_notation() str[source]

Currently Broken: Converts the graph to DOT notation.

to_graphviz() str[source]

Converts the graph to Graphviz format. This is a placeholder for future implementation.

to_mermaid() str[source]

Currently Broken: Generates a Mermaid diagram string from the graph.

The junjo-server telemetry server will produce a proper mermaid diagram for the workflow executions.

class junjo.Workflow(graph: Graph, store: StoreT, max_iterations: int = 100, hook_manager: HookManager | None = None, name: str | None = None)[source]

Bases: _NestableWorkflow[StateT, StoreT, None, None]

Represents a top level workflow that can be executed.

Generic Type Parameters:
StateT: The type of state managed by this workflow
StoreT: The type of store used by this workflow

A workflow is a collection of nodes and edges as a graph that can be executed.

workflow = Workflow[MyGraphState, MyGraphStore](
    name="demo_base_workflow",
    graph=graph,
    store=graph_store,
    hook_manager=HookManager(verbose_logging=False, open_telemetry=True),
)
await workflow.execute()
class junjo.Subflow(graph: Graph, store: StoreT, max_iterations: int = 100)[source]

Bases: _NestableWorkflow[StateT, StoreT, ParentStateT, ParentStoreT], ABC

Represents a subflow execution that can interact with a parent workflow.

Generic Type Parameters:
StateT: The type of state managed by this subflow
StoreT: The type of store used by this subflow
ParentStateT: The type of state managed by the parent workflow
ParentStoreT: The type of store used by the parent workflow
A subflow is a workflow that:
1. Executes within a parent workflow
2. Has its own isolated state and store
3. Can interact with the parent workflow’s state before and after execution
class ExampleSubFlow(Subflow[SubflowState, SubflowStore, ParentState, ParentStore]):
    async def pre_run_actions(self, parent_store):
        parent_state = await parent_store.get_state()
        await self.store.set_parameter({
            "parameter": parent_state.parameter
        })

    async def post_run_actions(self, parent_store):
        async def post_run_actions(self, parent_store):
            sub_flow_state = await self.get_state()
            await parent_store.set_subflow_result(self, sub_flow_state.result)

Initializes the Subflow.

Parameters:
  • graph – The workflow graph.

  • store – The store instance for this subflow.

  • max_iterations – The maximum number of times a node can be executed before raising an exception (defaults to 100)

abstractmethod async post_run_actions(parent_store: ParentStoreT) None[source]

This method is called after the workflow has run.

This is where you can update the parent store with the results of the workflow. This is useful for subflows that need to update the parent workflow store with their results.

Parameters:

parent_store – The parent store to update.

abstractmethod async pre_run_actions(parent_store: ParentStoreT) None[source]

This method is called before the workflow has run.

This is where you can pass initial state values from the parent workflow to the subflow state.

Parameters:

parent_store – The parent store to interact with.

In this example, we are passing a parameter from the parent store to the subflow store, using the subflow’s set_parameter method, defined in the subflow’s store.

class junjo.Node[source]

Bases: Generic[StoreT], ABC

Nodes are the building blocks of a workflow. They represent a single unit of work that can be executed within the context of a workflow.

Place business logic to be executed by the node in the service method. The service method is where the main logic of the node resides. It will be wrapped and annotated with OpenTelemetry tracing.

The Node is meant to remain decoupled from your business logic. While you can place business logic directly in the service method, it is recommended that you call a service function located in a separate module. This allows for better separation of concerns and makes it easier to test and maintain your code.

Type Parameters:

StoreT: The workflow store type that will be passed into this node during execution.

Responsibilities:
  • The Workflow passes the store to the node’s execute function.

  • The service function implements side effects using that store.

Example implementation: .. code-block:: python

class SaveMessageNode(Node[MessageWorkflowStore]):
async def service(self, store) -> None:

state = await store.get_state() # Get the current state

# Perform some business logic sentiment = await get_messasge_sentiment(state.message)

# Perform a state update await store.set_message_sentiment(sentiment)

add_patch(patch: JsonPatch) None[source]

Adds a patch to the list of patches.

async execute(store: StoreT, parent_id: str) None[source]

Execute the Node’s service function with OpenTelemetry tracing.

This method is responsible for tracing and error handling. It will acquire a tracer, start a new span, and call the service method. The service method should contain the side effects that this node will perform.

Parameters:
  • store (StoreT) – The store that will be passed to the node’s service function.

  • parent_id (str) – The ID of the parent span. This is used to create a child span for this node’s execution.

property id: str

Returns the unique identifier for the node.

property name: str

Returns the name of the node class instance.

property patches: list[JsonPatch]

Returns the list of patches that have been applied to the state by this node.

abstractmethod async service(store: StoreT) None[source]

This is main logic of the node. The concrete implementation of this method should contain the side effects that this node will perform.

This method is called by the execute method of the node. The execute method is responsible for tracing and error handling.

The service method should not be called directly. Instead, it should be called by the execute method of the node.

DO NOT EXECUTE node.service() DIRECTLY! Use node.execute() instead.

Parameters:

store (StoreT) – The store that will be passed to the node’s service function.

class junjo.RunConcurrent(name: str, items: list[Node | Subflow])[source]

Bases: Node

Execute a list of nodes or subflows concurrently. Under the hood, this uses asyncio.gather to run all items concurrently.

An instance of RunConcurrent can be added to a workflow’s graph the same was as any other node.

Parameters:
  • name – The name of this collection of concurrently executed nodes.

  • items – A list of nodes or subflows to execute with asyncio.gather.

node_1 = NodeOne()
node_2 = NodeTwo()
node_3 = NodeThree()

run_concurrent = RunConcurrent(
    name="Concurrent Execution",
    items=[node_1, node_2, node_3]
)
async execute(store: BaseStore, parent_id: str) None[source]

Execute the RunConcurrent node’s service function with OpenTelemetry tracing. This method is responsible for tracing and error handling.

Parameters:
  • store – The store to use for the items.

  • parent_id – The parent id of the workflow.

property id: str

Returns the unique identifier for the node or subflow.

property name: str

Returns the name of the node class instance.

async service(store: BaseStore) None[source]

Execute the provided nodes and subflows concurrently using asyncio.gather.

class junjo.BaseState[source]

Bases: BaseModel

Common base for states, with no unknown fields allowed.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class junjo.BaseStore(initial_state: StateT)[source]

Bases: Generic[StateT]

BaseStore represents a generic store for managing the state of a workflow. It is designed to be subclassed with a specific state type (Pydantic model).

The store is responsible for:
- Managing the state of the workflow.
- Making immuable updates to the state safely in a concurrent environment.
- Validating state updates against the Pydantic model.
- Providing methods to subscribe to state changes.
- Notifying subscribers when the state changes.

The store uses an asyncio.Lock to ensure that state updates are thread-safe and that subscribers are notified in a safe manner. This is important in an async environment where multiple coroutines may be trying to update the state or subscribe to changes at the same time.

Parameters:

initial_state – The initial state of the store, based on the Pydantic model.

async get_state() StateT[source]

Return a shallow copy of the current state. (Follows immutability principle)

async get_state_json() str[source]

Return the current state as a JSON string.

property id: str

Returns the unique identifier of a given store’s implementation.

async set_state(update: dict) None[source]

Update the store’s state with a dictionary of changes. | - Immutable update with a deep state copy | - Merges the current state with updates using model_copy(update=…). | - Validates that each updated field is valid for StateT. | - If there’s a change, notifies subscribers outside the lock.

Parameters:

update – A dictionary of updates to apply to the state.

class MessageWorkflowState(BaseState): # A pydantic model to represent the state
    received_message: Message

class MessageWorkflowStore(BaseStore[MessageWorkflowState]): # A concrete store for MessageWorkflowState
    async def set_received_message(self, payload: Message) -> None:
        await self.set_state({"received_message": payload})

payload = Message(...)
await store.set_received_message(payload) # Utilizes the set_state method to update a particular field
async subscribe(listener: Callable[[StateT], None] | Callable[[StateT], Awaitable[None]]) Callable[[], Awaitable[None]][source]

Register a listener (sync or async callable) to be called whenever the state changes. Returns an async unsubscribe function that, when awaited, removes this listener.

class junjo.Edge(tail: Node | _NestableWorkflow, head: Node | _NestableWorkflow, condition: Condition[StateT] | None = None)[source]

Bases: object

Represents a directed edge in the workflow graph.

An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur.

Parameters:
  • tail – The source node of the edge (where the transition originates).

  • head – The destination node of the edge (where the transition leads).

  • condition (Condition[StateT]) – An optional function that determines whether the transition from tail to head should occur. If None, the transition is always valid.

async next_node(store: BaseStore) Node | _NestableWorkflow | None[source]

Determines the next node in the workflow based on the edge’s condition (Condition[StateT]).

Parameters:

store (BaseStore) – The store instance to use for resolving the next node.

Returns:

The next node if the transition is valid, otherwise None.