Skip to content

asl

ASLAutoma

Bases: GraphAutoma

An automa that builds agent structures from ASL (Agent Structured Language) definitions.

This class extends GraphAutoma and uses a declarative syntax to define workflows. It automatically builds the graph structure from definitions during initialization, handling both static and dynamic worker registration.

Examples:

>>> from bridgic.asl import graph, ASLAutoma
>>> 
>>> def add_one(x: int):
...     return x + 1
>>> 
>>> def add_two(x: int):
...     return x + 2
>>> 
>>> class MyGraph(ASLAutoma):
...     with graph as g:
...         a = add_one
...         b = add_two
...         +a >> b  # a is the start worker, b depends on a and is the output worker.
>>> 
>>> graph = MyGraph()
>>> result = await graph.arun(x=1)  # result: 4 (1+1+2)
Source code in bridgic/asl/_asl_automa.py
class ASLAutoma(GraphAutoma, metaclass=ASLAutomaMeta):
    """
    An automa that builds agent structures from ASL (Agent Structured Language) definitions.

    This class extends `GraphAutoma` and uses a declarative syntax to define workflows. It
    automatically builds the graph structure from definitions during initialization, handling 
    both static and dynamic worker registration.

    Examples
    --------
    >>> from bridgic.asl import graph, ASLAutoma
    >>> 
    >>> def add_one(x: int):
    ...     return x + 1
    >>> 
    >>> def add_two(x: int):
    ...     return x + 2
    >>> 
    >>> class MyGraph(ASLAutoma):
    ...     with graph as g:
    ...         a = add_one
    ...         b = add_two
    ...         +a >> b  # a is the start worker, b depends on a and is the output worker.
    >>> 
    >>> graph = MyGraph()
    >>> result = await graph.arun(x=1)  # result: 4 (1+1+2)
    """
    # The canvases of the automa (stored in bottom-up order).
    _top_canvas: _Canvas = None

    def __init__(
        self, 
        name: str = None, 
        thread_pool: Optional[ThreadPoolExecutor] = None, 
        running_options: Optional[RunningOptions] = None
    ):
        """
        Initialize the ASLAutoma instance.

        Parameters
        ----------
        name : str, optional
            The name of the automa. If None, a default name will be assigned.
        thread_pool : ThreadPoolExecutor, optional
            The thread pool for parallel running of I/O-bound tasks. If None, a default thread pool will be used.
        running_options : RunningOptions, optional
            The running options for the automa. If None, a default running options will be used.
        """
        self.running_options = running_options or RunningOptions()
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)
        self._dynamic_workers = {}
        if not self._top_canvas:
            self.automa = None
        else:
            top_canvas = self._top_canvas[-1]
            self.automa: GraphAutoma = self._build_graph(top_canvas)

    def _build_graph(self, canvas: _Canvas) -> GraphAutoma:
        """
        Build the graph structure from all canvases.

        This method iterates through all canvases in bottom-up order and builds the graph
        structure for each canvas. It separates static and dynamic elements and delegates
        the actual building to _inner_build_graph.
        """
        static_elements = {
            key: value 
            for key, value in canvas.elements.items() 
            if (isinstance(value, _Element) and not value.is_lambda) or isinstance(value, _Canvas)
        }
        dynamic_elements = {
            key: value 
            for key, value in canvas.elements.items() 
            if (isinstance(value, _Element) and value.is_lambda)
        }
        return self._inner_build_graph(canvas, static_elements, dynamic_elements)

    def _inner_build_graph(
        self, 
        canvas: _Canvas, 
        static_elements: Dict[str, "_Element"],
        dynamic_elements: Dict[str, "_Element"]
    ) -> GraphAutoma:
        """
        Build the graph structure for a specific canvas.

        This method handles the construction of both dynamic and static logic flows. For dynamic
        elements (lambda functions), it sets up callbacks that will add workers at runtime and remove 
        them when the execution completes. For static elements, it immediately adds them to the 
        graph with their dependencies and settings.

        Parameters
        ----------
        canvas : _Canvas
            The canvas to build the graph for.
        static_elements : Dict[str, "_Element"]
            Dictionary of static elements (non-lambda workers) to add to the graph.
        dynamic_elements : Dict[str, "_Element"]
            Dictionary of dynamic elements (lambda functions) that will generate workers at runtime.
        """
        automa = None
        current_canvas_key = canvas.key

        ###############################
        # build the dynamic logic flow
        ###############################
        running_options_callback = []
        for _, element in dynamic_elements.items():
            worker_material = element.worker_material
            params_names = element.cached_param_names

            # If the canvas is top level, use `RunningOptions` to add callback.
            if canvas.is_top_level():
                running_options_callback.append(
                    WorkerCallbackBuilder(
                        AsTopLevelDynamicCallback, 
                        init_kwargs={"__dynamic_lambda_func__": worker_material, "__param_names__": simplify_param_names(params_names)}
                    )
                )

            # Otherwise, delegate parent automa to add callback during building graph.
            else:
                parent_key = canvas.parent_canvas.key
                if parent_key not in self._dynamic_workers:
                    self._dynamic_workers[parent_key] = {}
                if current_canvas_key not in self._dynamic_workers[parent_key]:
                    self._dynamic_workers[parent_key][current_canvas_key] = []
                self._dynamic_workers[parent_key][current_canvas_key].append(element)

        # Make the automa.

        canvas.make_automa(running_options=RunningOptions(
            debug=self.running_options.debug,
            verbose=self.running_options.verbose,
            callback_builders=self.running_options.callback_builders + running_options_callback,
            model_config=self.running_options.model_config
        ))
        automa = canvas.worker_material
        if canvas.is_top_level():
            params_data = canvas.worker_material.get_input_param_names()
            set_method_signature(self.arun, params_data)


        ###############################
        # build the static logic flow
        ###############################
        for _, element in static_elements.items():
            key = element.key
            parent_key = element.parent_canvas.key
            worker_material = element.worker_material
            is_start = element.is_start
            is_output = element.is_output
            dependencies = [item.key for item in element.dependencies]
            args_mapping_rule = element.args_mapping_rule
            result_dispatching_rule = element.result_dispatching_rule

            # If the object is an instance of an object instance, it must be ensured that each time
            # an instance of the current ASLAutoma is created, it is an independent one of this object
            # instance. Here, the object has these forms:
            #   1. Canvas:
            #     a. graph (exactly is GraphAutoma) or concurrent (exactly is ConcurrentAutoma) etc.
            #   2. Element:
            #     a. Callable
            #     b. ASLAutoma
            #     c. GraphAutoma etc.
            #     d. Worker
            if isinstance(element, _Canvas):
                worker_material = self._build_graph(element)
            elif isinstance(element, _Element):
                if isinstance(worker_material, ASLAutoma):
                    asl_automa_class = type(worker_material)
                    running_options_callback = (
                        getattr(worker_material, "running_options", None).callback_builders
                        if getattr(worker_material, "running_options", None) 
                        else []
                    ) + self.running_options.callback_builders
                    worker_material = asl_automa_class(
                        name=getattr(worker_material, "name", None), 
                        thread_pool=getattr(worker_material, "thread_pool", None), 
                        running_options=RunningOptions(
                            debug=self.running_options.debug,
                            verbose=self.running_options.verbose,
                            callback_builders=running_options_callback,
                            model_config=self.running_options.model_config
                        )
                    )
                elif isinstance(worker_material, GraphAutoma):
                    graph_automa_class = type(worker_material)
                    running_options_callback = (
                        getattr(worker_material, "running_options", None).callback_builders
                        if getattr(worker_material, "running_options", None) 
                        else []
                    ) + self.running_options.callback_builders
                    worker_material = graph_automa_class(
                        name=getattr(worker_material, "name", None), 
                        thread_pool=getattr(worker_material, "thread_pool", None), 
                        running_options=RunningOptions(
                            debug=self.running_options.debug,
                            verbose=self.running_options.verbose,
                            callback_builders=running_options_callback,
                            model_config=self.running_options.model_config
                        )
                    )
                elif isinstance(worker_material, Worker):
                    worker_material = _copy_worker_safely(worker_material)
                elif isinstance(worker_material, Callable):
                    pass
            else:
                raise ValueError(f"Invalid worker material type: {type(worker_material)}.")

            # Prepare the callback builders.
            # If current element delegated dynamic workers to be added in current canvas.
            callback_builders = []
            if current_canvas_key in self._dynamic_workers and key in self._dynamic_workers[current_canvas_key]:
                delegated_dynamic_workers = self._dynamic_workers[current_canvas_key][key]
                for delegated_dynamic_element in delegated_dynamic_workers:
                    delegated_dynamic_func = delegated_dynamic_element.worker_material
                    delegated_dynamic_params_names = delegated_dynamic_element.cached_param_names
                    callback_builders.append(WorkerCallbackBuilder(
                        AsWorkerDynamicCallback,
                        init_kwargs={"__dynamic_lambda_func__": delegated_dynamic_func, "__param_names__": simplify_param_names(delegated_dynamic_params_names)}
                    ))

            # Update the signature in Data to set the __cached_param_names_of_arun or __cached_param_names_of_run.
            # Note: Python's name mangling mechanism
            # In Python, attributes that start with double underscores `__` but don't end with `__`
            # are subject to name mangling. The actual attribute name becomes `_ClassName__attribute_name`:
            # - __cached_param_names_of_arun in Worker class → _Worker__cached_param_names_of_arun
            # - __cached_param_names_of_run in Worker class → _Worker__cached_param_names_of_run
            # - __cached_param_names_of_callable in CallableWorker class → _CallableWorker__cached_param_names_of_callable
            # If we directly write `worker_material.__cached_param_names_of_xxx`, Python will interpret it
            # as a private attribute of the current module or class, not as an attribute of Worker or
            # CallableWorker. Therefore, we must use setattr() with the correct mangled attribute name.
            def set_cached_param_names(worker_material: Worker, override_params: Dict):
                if isinstance(worker_material, CallableWorker):
                    setattr(worker_material, '_CallableWorker__cached_param_names_of_callable', override_params)
                else:
                    if worker_material._is_arun_overridden():
                        setattr(worker_material, '_Worker__cached_param_names_of_arun', override_params)
                    else:
                        setattr(worker_material, '_Worker__cached_param_names_of_run', override_params)

            def get_param_names_dict(sig: inspect.Signature, exclude_default: bool = False) -> Dict:
                param_names_dict = {}
                for name, param in sig.parameters.items():
                    if exclude_default and param.default is not inspect.Parameter.empty:
                        continue
                    if param.kind not in param_names_dict:
                        param_names_dict[param.kind] = []

                    if param.default is inspect.Parameter.empty:
                        param_names_dict[param.kind].append((name, inspect._empty))
                    else:
                        param_names_dict[param.kind].append((name, param.default))
                return param_names_dict

            signature_name = f"{parent_key}.{key}" if parent_key else f"__TOP__.{key}"
            override_signature = getattr(worker_material, "__signature_overrides__", {}).get(signature_name, None)
            if override_signature:
                override_params = get_param_names_dict(override_signature)
                if isinstance(worker_material, Callable):
                    worker_material = CallableWorker(func_or_method=worker_material)
                set_cached_param_names(worker_material, override_params)

            # Build the automa.
            if isinstance(automa, ConcurrentAutoma):
                build_concurrent(
                    automa=automa,
                    key=key,
                    worker_material=worker_material,
                    callback_builders=callback_builders
                )
            elif isinstance(automa, GraphAutoma):
                build_graph(
                    automa=automa,
                    key=key,
                    worker_material=worker_material,
                    is_start=is_start,
                    is_output=is_output,
                    dependencies=dependencies,
                    args_mapping_rule=args_mapping_rule,
                    result_dispatching_rule=result_dispatching_rule,
                    callback_builders=callback_builders
                )
            else:
                raise ValueError(f"Invalid automa type: {type(automa)}.")
        return automa

    def dump_to_dict(self) -> Dict[str, Any]:
        """
        Dump the ASLAutoma instance to a dictionary.

        Returns
        -------
        Dict[str, Any]
            A dictionary containing the serialized state of the ASLAutoma instance.
        """
        state_dict = super().dump_to_dict()
        state_dict["automa"] = self.automa.dump_to_dict()
        return state_dict

    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        """
        Load the ASLAutoma instance from a dictionary.

        Parameters
        ----------
        state_dict : Dict[str, Any]
            A dictionary containing the serialized state of the ASLAutoma instance.
        """
        super().load_from_dict(state_dict)
        self.automa = state_dict["automa"]

    async def arun(
        self,
        *args: Tuple[Any, ...],
        feedback_data = None,
        **kwargs: Dict[str, Any]
    ) -> Any:
        """
        Run the automa asynchronously.

        Parameters
        ----------
        *args : Tuple[Any, ...]
            Positional arguments to pass to the automa.
        feedback_data : Any, optional
            Feedback data for the execution (default: None).
        **kwargs : Dict[str, Any]
            Keyword arguments to pass to the automa.

        Returns
        -------
        Any
            The result of the automa execution.
        """
        if not self.automa:
            return super().arun(*args, feedback_data=feedback_data, **kwargs)

        res = await self.automa.arun(*args, feedback_data=feedback_data, **kwargs)
        return res

    def __str__(self) -> str:
        return f"ASLAutoma(automa={self.automa})"

    def __repr__(self) -> str:
        return self.__str__()

dump_to_dict

dump_to_dict() -> Dict[str, Any]

Dump the ASLAutoma instance to a dictionary.

Returns:

Type Description
Dict[str, Any]

A dictionary containing the serialized state of the ASLAutoma instance.

Source code in bridgic/asl/_asl_automa.py
def dump_to_dict(self) -> Dict[str, Any]:
    """
    Dump the ASLAutoma instance to a dictionary.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the serialized state of the ASLAutoma instance.
    """
    state_dict = super().dump_to_dict()
    state_dict["automa"] = self.automa.dump_to_dict()
    return state_dict

load_from_dict

load_from_dict(state_dict: Dict[str, Any]) -> None

Load the ASLAutoma instance from a dictionary.

Parameters:

Name Type Description Default
state_dict Dict[str, Any]

A dictionary containing the serialized state of the ASLAutoma instance.

required
Source code in bridgic/asl/_asl_automa.py
def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
    """
    Load the ASLAutoma instance from a dictionary.

    Parameters
    ----------
    state_dict : Dict[str, Any]
        A dictionary containing the serialized state of the ASLAutoma instance.
    """
    super().load_from_dict(state_dict)
    self.automa = state_dict["automa"]

arun

async
arun(
    *args: Tuple[Any, ...],
    feedback_data=None,
    **kwargs: Dict[str, Any]
) -> Any

Run the automa asynchronously.

Parameters:

Name Type Description Default
*args Tuple[Any, ...]

Positional arguments to pass to the automa.

()
feedback_data Any

Feedback data for the execution (default: None).

None
**kwargs Dict[str, Any]

Keyword arguments to pass to the automa.

{}

Returns:

Type Description
Any

The result of the automa execution.

Source code in bridgic/asl/_asl_automa.py
async def arun(
    self,
    *args: Tuple[Any, ...],
    feedback_data = None,
    **kwargs: Dict[str, Any]
) -> Any:
    """
    Run the automa asynchronously.

    Parameters
    ----------
    *args : Tuple[Any, ...]
        Positional arguments to pass to the automa.
    feedback_data : Any, optional
        Feedback data for the execution (default: None).
    **kwargs : Dict[str, Any]
        Keyword arguments to pass to the automa.

    Returns
    -------
    Any
        The result of the automa execution.
    """
    if not self.automa:
        return super().arun(*args, feedback_data=feedback_data, **kwargs)

    res = await self.automa.arun(*args, feedback_data=feedback_data, **kwargs)
    return res

Data

Container for parameter data configuration.

This class stores type and default value information for function parameters, allowing dynamic modification of function signatures at declaration time. It can be attached to workers or callables using the left-multiplication operator (*).

Source code in bridgic/asl/_canvas_object.py
class Data:
    """
    Container for parameter data configuration.

    This class stores type and default value information for function parameters,
    allowing dynamic modification of function signatures at declaration time. It can be
    attached to workers or callables using the left-multiplication operator (*).
    """
    def __init__(self, **kwargs: Any) -> None:
        """
        Initialize the Data container with parameter configurations.

        Parameters
        ----------
        **kwargs : Any
            Keyword arguments where each key is a parameter name and each value
            is the default value for that parameter. The type is set to Any by default.
        """
        data = {
            Parameter.POSITIONAL_OR_KEYWORD: [],
        }
        for key, value in kwargs.items():
            data[Parameter.POSITIONAL_OR_KEYWORD].append((key, value))
        self.data = data

    def __rmul__(self, other: Union[Callable, Worker]):
        """
        Attach this Data configuration to a worker or callable using right-multiplication.

        This method allows syntax like `other * Data(param1=value1)` to attach parameter
        configuration to a worker or callable function.

        Parameters
        ----------
        other : Union[Callable, Worker]
            The worker or callable to attach the data configuration to.

        Returns
        -------
        Union[Callable, Worker]
            The same object with the `__data__` attribute set.
        """
        setattr(other, "__data__", self)
        return other

Settings dataclass

Configuration settings for canvas objects (workers, elements, and canvases).

This dataclass stores metadata about how a canvas object should be configured in the graph, including its key, start/output status, dependencies, and argument mapping rules.

Attributes:

Name Type Description
key str

The unique identifier for the canvas object. Defaults to KeyUnDifined if not set.

args_mapping_rule ArgsMappingRule

The rule for mapping arguments to this object. Defaults to ArgsMappingRule.AS_IS.

result_dispatching_rule ResultDispatchingRule

The rule for dispatching the result of this object to the next object. Defaults to ResultDispatchingRule.AS_IS.

Source code in bridgic/asl/_canvas_object.py
@dataclass
class Settings:
    """
    Configuration settings for canvas objects (workers, elements, and canvases).

    This dataclass stores metadata about how a canvas object should be configured
    in the graph, including its key, start/output status, dependencies, and argument
    mapping rules.

    Attributes
    ----------
    key : str
        The unique identifier for the canvas object. Defaults to KeyUnDifined if not set.
    args_mapping_rule : ArgsMappingRule
        The rule for mapping arguments to this object. Defaults to ArgsMappingRule.AS_IS.
    result_dispatching_rule : ResultDispatchingRule
        The rule for dispatching the result of this object to the next object. Defaults to ResultDispatchingRule.AS_IS.
    """
    key: str = None
    args_mapping_rule: ArgsMappingRule = None
    result_dispatching_rule: ResultDispatchingRule = None

    def __post_init__(self):
        """
        Initialize default values for Settings attributes after dataclass initialization.

        This method is automatically called by the dataclass decorator and ensures that
        all attributes have appropriate default values if they were not explicitly set.
        """
        if not self.key:
            self.key = KeyUnDifined()
        if not self.args_mapping_rule:
            self.args_mapping_rule = ArgsMappingRule.AS_IS
        if not self.result_dispatching_rule:
            self.result_dispatching_rule = ResultDispatchingRule.AS_IS

    def __rmul__(self, other: Union[Callable, Worker]):
        """
        Attach this Settings configuration to a worker or callable using right-multiplication.

        This method allows syntax like `other * Settings(key="worker1", result_dispatching_rule=ResultDispatchingRule.IN_ORDER)` to
        attach configuration settings to a worker or callable function.

        Parameters
        ----------
        other : Union[Callable, Worker]
            The worker or callable to attach the settings to.

        Returns
        -------
        Union[Callable, Worker]
            The same object with the `__settings__` attribute set.
        """
        setattr(other, "__settings__", self)
        return other

ASLField

Bases: FieldInfo

A custom Field class that extends Pydantic's FieldInfo with support for storing default type information.

The default_type parameter is stored as metadata and does not automatically generate default values. You must explicitly provide a default value if you want a default.

Source code in bridgic/asl/_canvas_object.py
class ASLField(FieldInfo):
    """
    A custom Field class that extends Pydantic's FieldInfo with support for storing default type information.

    The `default_type` parameter is stored as metadata and does not automatically generate default values.
    You must explicitly provide a `default` value if you want a default.
    """

    def __init__(
        self,
        type: Type[Any] = Any,
        *,
        default: Any = ...,
        dispatching_rule: ResultDispatchingRule = ResultDispatchingRule.AS_IS,
        **kwargs: Any
    ):
        """
        Initialize ASLField with optional default_type metadata.

        Parameters
        ----------
        default : Any
            Explicit default value. Must be provided if you want a default value.
        type : Type[Any]
            Type information stored as metadata. Does not automatically generate default values.
        dispatching_rule : ResultDispatchingRule
            The rule for dispatching the data to multiple workers.
        **kwargs : Any
            Other Field parameters (description, ge, le, etc.)
        """
        super().__init__(default=default, **kwargs)
        self.type = type
        self.dispatching_rule = dispatching_rule

ASLCompilationError

Bases: Exception

ASL code compilation error.

Source code in bridgic/asl/_error.py
5
6
7
8
9
class ASLCompilationError(Exception):
    """
    ASL code compilation error.
    """
    pass