class ASLAutoma(GraphAutoma, metaclass=ASLAutomaMeta):
"""
An automa that builds agent structures from ASL (Agent Structured Language) definitions.
This class extends `GraphAutoma` and uses a declarative syntax to define workflows. It
automatically builds the graph structure from definitions during initialization, handling
both static and dynamic worker registration.
Examples
--------
>>> from bridgic.asl import graph, ASLAutoma
>>>
>>> def add_one(x: int):
... return x + 1
>>>
>>> def add_two(x: int):
... return x + 2
>>>
>>> class MyGraph(ASLAutoma):
... with graph as g:
... a = add_one
... b = add_two
... +a >> b # a is the start worker, b depends on a and is the output worker.
>>>
>>> graph = MyGraph()
>>> result = await graph.arun(x=1) # result: 4 (1+1+2)
"""
# The canvases of the automa (stored in bottom-up order).
_top_canvas: _Canvas = None
def __init__(
self,
name: str = None,
thread_pool: Optional[ThreadPoolExecutor] = None,
running_options: Optional[RunningOptions] = None
):
"""
Initialize the ASLAutoma instance.
Parameters
----------
name : str, optional
The name of the automa. If None, a default name will be assigned.
thread_pool : ThreadPoolExecutor, optional
The thread pool for parallel running of I/O-bound tasks. If None, a default thread pool will be used.
running_options : RunningOptions, optional
The running options for the automa. If None, a default running options will be used.
"""
self.running_options = running_options or RunningOptions()
super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)
self._dynamic_workers = {}
if not self._top_canvas:
self.automa = None
else:
top_canvas = self._top_canvas[-1]
self.automa: GraphAutoma = self._build_graph(top_canvas)
def _build_graph(self, canvas: _Canvas) -> GraphAutoma:
"""
Build the graph structure from all canvases.
This method iterates through all canvases in bottom-up order and builds the graph
structure for each canvas. It separates static and dynamic elements and delegates
the actual building to _inner_build_graph.
"""
static_elements = {
key: value
for key, value in canvas.elements.items()
if (isinstance(value, _Element) and not value.is_lambda) or isinstance(value, _Canvas)
}
dynamic_elements = {
key: value
for key, value in canvas.elements.items()
if (isinstance(value, _Element) and value.is_lambda)
}
return self._inner_build_graph(canvas, static_elements, dynamic_elements)
def _inner_build_graph(
self,
canvas: _Canvas,
static_elements: Dict[str, "_Element"],
dynamic_elements: Dict[str, "_Element"]
) -> GraphAutoma:
"""
Build the graph structure for a specific canvas.
This method handles the construction of both dynamic and static logic flows. For dynamic
elements (lambda functions), it sets up callbacks that will add workers at runtime and remove
them when the execution completes. For static elements, it immediately adds them to the
graph with their dependencies and settings.
Parameters
----------
canvas : _Canvas
The canvas to build the graph for.
static_elements : Dict[str, "_Element"]
Dictionary of static elements (non-lambda workers) to add to the graph.
dynamic_elements : Dict[str, "_Element"]
Dictionary of dynamic elements (lambda functions) that will generate workers at runtime.
"""
automa = None
current_canvas_key = canvas.key
###############################
# build the dynamic logic flow
###############################
running_options_callback = []
for _, element in dynamic_elements.items():
worker_material = element.worker_material
params_names = element.cached_param_names
# If the canvas is top level, use `RunningOptions` to add callback.
if canvas.is_top_level():
running_options_callback.append(
WorkerCallbackBuilder(
AsTopLevelDynamicCallback,
init_kwargs={"__dynamic_lambda_func__": worker_material, "__param_names__": simplify_param_names(params_names)}
)
)
# Otherwise, delegate parent automa to add callback during building graph.
else:
parent_key = canvas.parent_canvas.key
if parent_key not in self._dynamic_workers:
self._dynamic_workers[parent_key] = {}
if current_canvas_key not in self._dynamic_workers[parent_key]:
self._dynamic_workers[parent_key][current_canvas_key] = []
self._dynamic_workers[parent_key][current_canvas_key].append(element)
# Make the automa.
canvas.make_automa(running_options=RunningOptions(
debug=self.running_options.debug,
verbose=self.running_options.verbose,
callback_builders=self.running_options.callback_builders + running_options_callback,
model_config=self.running_options.model_config
))
automa = canvas.worker_material
if canvas.is_top_level():
params_data = canvas.worker_material.get_input_param_names()
set_method_signature(self.arun, params_data)
###############################
# build the static logic flow
###############################
for _, element in static_elements.items():
key = element.key
parent_key = element.parent_canvas.key
worker_material = element.worker_material
is_start = element.is_start
is_output = element.is_output
dependencies = [item.key for item in element.dependencies]
args_mapping_rule = element.args_mapping_rule
result_dispatching_rule = element.result_dispatching_rule
# If the object is an instance of an object instance, it must be ensured that each time
# an instance of the current ASLAutoma is created, it is an independent one of this object
# instance. Here, the object has these forms:
# 1. Canvas:
# a. graph (exactly is GraphAutoma) or concurrent (exactly is ConcurrentAutoma) etc.
# 2. Element:
# a. Callable
# b. ASLAutoma
# c. GraphAutoma etc.
# d. Worker
if isinstance(element, _Canvas):
worker_material = self._build_graph(element)
elif isinstance(element, _Element):
if isinstance(worker_material, ASLAutoma):
asl_automa_class = type(worker_material)
running_options_callback = (
getattr(worker_material, "running_options", None).callback_builders
if getattr(worker_material, "running_options", None)
else []
) + self.running_options.callback_builders
worker_material = asl_automa_class(
name=getattr(worker_material, "name", None),
thread_pool=getattr(worker_material, "thread_pool", None),
running_options=RunningOptions(
debug=self.running_options.debug,
verbose=self.running_options.verbose,
callback_builders=running_options_callback,
model_config=self.running_options.model_config
)
)
elif isinstance(worker_material, GraphAutoma):
graph_automa_class = type(worker_material)
running_options_callback = (
getattr(worker_material, "running_options", None).callback_builders
if getattr(worker_material, "running_options", None)
else []
) + self.running_options.callback_builders
worker_material = graph_automa_class(
name=getattr(worker_material, "name", None),
thread_pool=getattr(worker_material, "thread_pool", None),
running_options=RunningOptions(
debug=self.running_options.debug,
verbose=self.running_options.verbose,
callback_builders=running_options_callback,
model_config=self.running_options.model_config
)
)
elif isinstance(worker_material, Worker):
worker_material = _copy_worker_safely(worker_material)
elif isinstance(worker_material, Callable):
pass
else:
raise ValueError(f"Invalid worker material type: {type(worker_material)}.")
# Prepare the callback builders.
# If current element delegated dynamic workers to be added in current canvas.
callback_builders = []
if current_canvas_key in self._dynamic_workers and key in self._dynamic_workers[current_canvas_key]:
delegated_dynamic_workers = self._dynamic_workers[current_canvas_key][key]
for delegated_dynamic_element in delegated_dynamic_workers:
delegated_dynamic_func = delegated_dynamic_element.worker_material
delegated_dynamic_params_names = delegated_dynamic_element.cached_param_names
callback_builders.append(WorkerCallbackBuilder(
AsWorkerDynamicCallback,
init_kwargs={"__dynamic_lambda_func__": delegated_dynamic_func, "__param_names__": simplify_param_names(delegated_dynamic_params_names)}
))
# Update the signature in Data to set the __cached_param_names_of_arun or __cached_param_names_of_run.
# Note: Python's name mangling mechanism
# In Python, attributes that start with double underscores `__` but don't end with `__`
# are subject to name mangling. The actual attribute name becomes `_ClassName__attribute_name`:
# - __cached_param_names_of_arun in Worker class → _Worker__cached_param_names_of_arun
# - __cached_param_names_of_run in Worker class → _Worker__cached_param_names_of_run
# - __cached_param_names_of_callable in CallableWorker class → _CallableWorker__cached_param_names_of_callable
# If we directly write `worker_material.__cached_param_names_of_xxx`, Python will interpret it
# as a private attribute of the current module or class, not as an attribute of Worker or
# CallableWorker. Therefore, we must use setattr() with the correct mangled attribute name.
def set_cached_param_names(worker_material: Worker, override_params: Dict):
if isinstance(worker_material, CallableWorker):
setattr(worker_material, '_CallableWorker__cached_param_names_of_callable', override_params)
else:
if worker_material._is_arun_overridden():
setattr(worker_material, '_Worker__cached_param_names_of_arun', override_params)
else:
setattr(worker_material, '_Worker__cached_param_names_of_run', override_params)
def get_param_names_dict(sig: inspect.Signature, exclude_default: bool = False) -> Dict:
param_names_dict = {}
for name, param in sig.parameters.items():
if exclude_default and param.default is not inspect.Parameter.empty:
continue
if param.kind not in param_names_dict:
param_names_dict[param.kind] = []
if param.default is inspect.Parameter.empty:
param_names_dict[param.kind].append((name, inspect._empty))
else:
param_names_dict[param.kind].append((name, param.default))
return param_names_dict
signature_name = f"{parent_key}.{key}" if parent_key else f"__TOP__.{key}"
override_signature = getattr(worker_material, "__signature_overrides__", {}).get(signature_name, None)
if override_signature:
override_params = get_param_names_dict(override_signature)
if isinstance(worker_material, Callable):
worker_material = CallableWorker(func_or_method=worker_material)
set_cached_param_names(worker_material, override_params)
# Build the automa.
if isinstance(automa, ConcurrentAutoma):
build_concurrent(
automa=automa,
key=key,
worker_material=worker_material,
callback_builders=callback_builders
)
elif isinstance(automa, GraphAutoma):
build_graph(
automa=automa,
key=key,
worker_material=worker_material,
is_start=is_start,
is_output=is_output,
dependencies=dependencies,
args_mapping_rule=args_mapping_rule,
result_dispatching_rule=result_dispatching_rule,
callback_builders=callback_builders
)
else:
raise ValueError(f"Invalid automa type: {type(automa)}.")
return automa
def dump_to_dict(self) -> Dict[str, Any]:
"""
Dump the ASLAutoma instance to a dictionary.
Returns
-------
Dict[str, Any]
A dictionary containing the serialized state of the ASLAutoma instance.
"""
state_dict = super().dump_to_dict()
state_dict["automa"] = self.automa.dump_to_dict()
return state_dict
def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
"""
Load the ASLAutoma instance from a dictionary.
Parameters
----------
state_dict : Dict[str, Any]
A dictionary containing the serialized state of the ASLAutoma instance.
"""
super().load_from_dict(state_dict)
self.automa = state_dict["automa"]
async def arun(
self,
*args: Tuple[Any, ...],
feedback_data = None,
**kwargs: Dict[str, Any]
) -> Any:
"""
Run the automa asynchronously.
Parameters
----------
*args : Tuple[Any, ...]
Positional arguments to pass to the automa.
feedback_data : Any, optional
Feedback data for the execution (default: None).
**kwargs : Dict[str, Any]
Keyword arguments to pass to the automa.
Returns
-------
Any
The result of the automa execution.
"""
if not self.automa:
return super().arun(*args, feedback_data=feedback_data, **kwargs)
res = await self.automa.arun(*args, feedback_data=feedback_data, **kwargs)
return res
def __str__(self) -> str:
return f"ASLAutoma(automa={self.automa})"
def __repr__(self) -> str:
return self.__str__()