[docs]classEdge:""" Represents a directed edge in the workflow graph. An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur. """def__init__(self,tail:Node|_NestableWorkflow,head:Node|_NestableWorkflow,condition:Condition[StateT]|None=None,):""" Args: tail: The source node of the edge (where the transition originates). head: The destination node of the edge (where the transition leads). condition (Condition[StateT]): An optional function that determines whether the transition from tail to head should occur. If None, the transition is always valid. """self.tail=tailself.head=headself.condition=condition
[docs]asyncdefnext_node(self,store:BaseStore)->Node|_NestableWorkflow|None:""" Determines the next node in the workflow based on the edge's condition (Condition[StateT]). Args: store (BaseStore): The store instance to use for resolving the next node. Returns: The next node if the transition is valid, otherwise None. """ifself.conditionisNone:returnself.headelse:state=awaitstore.get_state()ifstateisNone:raiseValueError("State is not available in the store.")returnself.headifself.condition.evaluate(state)elseNone