[docs]classRunConcurrent(Node):""" Execute a list of nodes or subflows concurrently. Under the hood, this uses asyncio.gather to run all items concurrently. An instance of RunConcurrent can be added to a workflow's graph the same was as any other node. """def__init__(self,name:str,items:list[Node|Subflow]):""" Args: name: The name of this collection of concurrently executed nodes. items: A list of nodes or subflows to execute with asyncio.gather. .. code-block:: python node_1 = NodeOne() node_2 = NodeTwo() node_3 = NodeThree() run_concurrent = RunConcurrent( name="Concurrent Execution", items=[node_1, node_2, node_3] ) """super().__init__()self.items=itemsself._id=generate_safe_id()self._name=namedef__repr__(self):"""Returns a string representation of the node or subflow."""returnf"<{type(self).__name__} id={self.id}>"@propertydefid(self)->str:"""Returns the unique identifier for the node or subflow."""returnself._id@propertydefname(self)->str:returnself._name
[docs]asyncdefservice(self,store:BaseStore)->None:""" Execute the provided nodes and subflows concurrently using asyncio.gather. """print(f"Executing concurrent items within {self.name} ({self.id})")ifnotself.items:return# Execute all items concurrently# Using asyncio.gather to run all items concurrentlytasks=[item.execute(store,self.id)foriteminself.items]awaitasyncio.gather(*tasks)print(f"Finished concurrent items within {self.name} ({self.id})")
[docs]asyncdefexecute(self,store:BaseStore,parent_id:str)->None:""" Execute the RunConcurrent node's service function with OpenTelemetry tracing. This method is responsible for tracing and error handling. Args: store: The store to use for the items. parent_id: The parent id of the workflow. """# Acquire a tracer (will be a real tracer if configured, otherwise no-op)tracer=trace.get_tracer(JUNJO_OTEL_MODULE_NAME)# Start a new span and keep a reference to the span objectwithtracer.start_as_current_span(self.name)asspan:try:# Set an attribute on the spanspan.set_attribute("junjo.span_type",JunjoOtelSpanTypes.RUN_CONCURRENT)span.set_attribute("junjo.parent_id",parent_id)span.set_attribute("junjo.id",self.id)# Perform your async operationawaitself.service(store)exceptExceptionase:print(f"Error executing node service: {e}")span.set_status(trace.StatusCode.ERROR,str(e))span.record_exception(e)raise