Skip to content

agentic

The Agentic module provides core components for building intelligent agent systems.

This module contains various Automa implementations for orchestrating and executing LLM-based workflows or agents. These Automa implementations are typically composed together to build complex intelligent agents with advanced capabilities.

ConcurrentAutoma

Bases: GraphAutoma

This class is to provide concurrent execution of multiple workers.

In accordance with the defined "Concurrency Model of Worker", each worker within a ConcurrentAutoma can be configured to operate in one of two concurrency modes:

  1. Async Mode: Workers execute concurrently in an asynchronous fashion, driven by the event loop of the main thread. This execution mode corresponds to the arun() method of the Worker.
  2. Parallel Mode: Workers execute synchronously, each running in a dedicated thread within a thread pool managed by the ConcurrentAutoma. This execution mode corresponds to the run() method of the Worker.

Upon completion of all worker tasks, the concurrent automa instance aggregates the result outputs from each worker into a single list, which is then returned to the caller.

Source code in bridgic/core/agentic/_concurrent_automa.py
class ConcurrentAutoma(GraphAutoma):
    """
    This class is to provide concurrent execution of multiple workers.

    In accordance with the defined "Concurrency Model of Worker", each worker within 
    a ConcurrentAutoma can be configured to operate in one of two concurrency modes:

    1. **Async Mode**: Workers execute concurrently in an asynchronous fashion, driven 
    by the event loop of the main thread. This execution mode corresponds to the `arun()` 
    method of the Worker.
    2. **Parallel Mode**: Workers execute synchronously, each running in a dedicated 
    thread within a thread pool managed by the ConcurrentAutoma. This execution mode 
    corresponds to the `run()` method of the Worker.

    Upon completion of all worker tasks, the concurrent automa instance aggregates 
    the result outputs from each worker into a single list, which is then returned 
    to the caller.
    """

    # Automa type.
    AUTOMA_TYPE: ClassVar[AutomaType] = AutomaType.Concurrent

    _MERGER_WORKER_KEY: Final[str] = "__merger__"

    def __init__(
        self,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        # Implementation notes:
        # There are two types of workers in the concurrent automa:
        # 1. Concurrent workers: These workers will be concurrently executed with each other.
        # 2. The Merger worker: This worker will merge the results of all the concurrent workers.

        cls = type(self)
        if cls.AUTOMA_TYPE == AutomaType.Concurrent:
            # The _registered_worker_funcs data are from @worker decorators.
            # Initialize the decorated concurrent workers.
            for worker_key, worker_func in self._registered_worker_funcs.items():
                super().add_func_as_worker(
                    key=worker_key,
                    func=worker_func,
                    is_start=True,
                )

        # Add a hidden worker as the merger worker, which will merge the results of all the start workers.
        super().add_func_as_worker(
            key=self._MERGER_WORKER_KEY,
            func=self._merge_workers_results,
            dependencies=super().all_workers(),
            is_output=True,
            args_mapping_rule=ArgsMappingRule.MERGE,
        )

    def _merge_workers_results(self, results: List[Any]) -> List[Any]:
        return results

    @override
    def add_worker(
        self,
        key: str,
        worker: Worker,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
        callback_builders: List[WorkerCallbackBuilder] = [],
    ) -> None:
        """
        Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the worker.
        worker : Worker
            The worker instance to be registered.
        args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
            The rule for mapping input arguments to the worker.
        callback_builders : List[WorkerCallbackBuilder], default []
            The list of callback builders to be registered for the worker.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")
        # Implementation notes:
        # Concurrent workers are implemented as start workers in the underlying graph automa.
        super().add_worker(key=key, worker=worker, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
        super().add_dependency(self._MERGER_WORKER_KEY, key)

    @override
    def add_func_as_worker(
        self,
        key: str,
        func: Callable,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
        callback_builders: List[WorkerCallbackBuilder] = [],
    ) -> None:
        """
        Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the function worker.
        func : Callable
            The function to be added as a concurrent worker to the automa.
        args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
            The rule for mapping input arguments to the worker.
        callback_builders : List[WorkerCallbackBuilder], default []
            The list of callback builders to be registered for the worker.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")
        # Implementation notes:
        # Concurrent workers are implemented as start workers in the underlying graph automa.
        super().add_func_as_worker(key=key, func=func, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
        super().add_dependency(self._MERGER_WORKER_KEY, key)

    @override
    def worker(
        self,
        *,
        key: Optional[str] = None,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
        callback_builders: List[WorkerCallbackBuilder] = [],
    ) -> Callable:
        """
        This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the worker. If not provided, the name of the decorated callable will be used.
        args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
            The rule for mapping input arguments to the worker.
        callback_builders : List[WorkerCallbackBuilder], default []
            The list of callback builders to be registered for the worker.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

        super_automa = super()
        def wrapper(func: Callable):
            super_automa.add_func_as_worker(key=key, func=func, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
            super_automa.add_dependency(self._MERGER_WORKER_KEY, key)

        return wrapper

    @override
    def remove_worker(self, key: str) -> None:
        """
        Remove a concurrent worker from the concurrent automa.

        Parameters
        ----------
        key : str
            The key of the worker to be removed.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the merge worker is not allowed to be removed from the concurrent automa")
        super().remove_worker(key=key)

    @override
    def add_dependency(
        self,
        key: str,
        dependency: str,
    ) -> None:
        raise AutomaRuntimeError(f"add_dependency() is not allowed to be called on a concurrent automa")

    def all_workers(self) -> List[str]:
        """
        Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

        Returns
        -------
        List[str]
            A list of concurrent worker keys.
        """
        keys_list = super().all_workers()
        # Implementation notes:
        # Hide the merger worker from the list of concurrent workers.
        return list(filter(lambda key: key != self._MERGER_WORKER_KEY, keys_list))

    def ferry_to(self, worker_key: str, /, *args, **kwargs):
        raise AutomaRuntimeError(f"ferry_to() is not allowed to be called on a concurrent automa")

    async def arun(
        self, 
        *args: Tuple[Any, ...],
        feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
        **kwargs: Dict[str, Any]
    ) -> List[Any]:
        result = await super().arun(
            *args,
            feedback_data=feedback_data,
            **kwargs
        )
        return cast(List[Any], result)

add_worker

add_worker(
    key: str,
    worker: Worker,
    args_mapping_rule: ArgsMappingRule = AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = [],
) -> None

Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
worker Worker

The worker instance to be registered.

required
args_mapping_rule ArgsMappingRule

The rule for mapping input arguments to the worker.

ArgsMappingRule.AS_IS
callback_builders List[WorkerCallbackBuilder]

The list of callback builders to be registered for the worker.

[]
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def add_worker(
    self,
    key: str,
    worker: Worker,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = [],
) -> None:
    """
    Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the worker.
    worker : Worker
        The worker instance to be registered.
    args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
        The rule for mapping input arguments to the worker.
    callback_builders : List[WorkerCallbackBuilder], default []
        The list of callback builders to be registered for the worker.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")
    # Implementation notes:
    # Concurrent workers are implemented as start workers in the underlying graph automa.
    super().add_worker(key=key, worker=worker, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
    super().add_dependency(self._MERGER_WORKER_KEY, key)

add_func_as_worker

add_func_as_worker(
    key: str,
    func: Callable,
    args_mapping_rule: ArgsMappingRule = AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = [],
) -> None

Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the function worker.

required
func Callable

The function to be added as a concurrent worker to the automa.

required
args_mapping_rule ArgsMappingRule

The rule for mapping input arguments to the worker.

ArgsMappingRule.AS_IS
callback_builders List[WorkerCallbackBuilder]

The list of callback builders to be registered for the worker.

[]
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def add_func_as_worker(
    self,
    key: str,
    func: Callable,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = [],
) -> None:
    """
    Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the function worker.
    func : Callable
        The function to be added as a concurrent worker to the automa.
    args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
        The rule for mapping input arguments to the worker.
    callback_builders : List[WorkerCallbackBuilder], default []
        The list of callback builders to be registered for the worker.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")
    # Implementation notes:
    # Concurrent workers are implemented as start workers in the underlying graph automa.
    super().add_func_as_worker(key=key, func=func, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
    super().add_dependency(self._MERGER_WORKER_KEY, key)

worker

worker(
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = []
) -> Callable

This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the worker. If not provided, the name of the decorated callable will be used.

None
args_mapping_rule ArgsMappingRule

The rule for mapping input arguments to the worker.

ArgsMappingRule.AS_IS
callback_builders List[WorkerCallbackBuilder]

The list of callback builders to be registered for the worker.

[]
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def worker(
    self,
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    callback_builders: List[WorkerCallbackBuilder] = [],
) -> Callable:
    """
    This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the worker. If not provided, the name of the decorated callable will be used.
    args_mapping_rule : ArgsMappingRule, default ArgsMappingRule.AS_IS
        The rule for mapping input arguments to the worker.
    callback_builders : List[WorkerCallbackBuilder], default []
        The list of callback builders to be registered for the worker.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

    super_automa = super()
    def wrapper(func: Callable):
        super_automa.add_func_as_worker(key=key, func=func, is_start=True, args_mapping_rule=args_mapping_rule, callback_builders=callback_builders)
        super_automa.add_dependency(self._MERGER_WORKER_KEY, key)

    return wrapper

remove_worker

remove_worker(key: str) -> None

Remove a concurrent worker from the concurrent automa.

Parameters:

Name Type Description Default
key str

The key of the worker to be removed.

required
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def remove_worker(self, key: str) -> None:
    """
    Remove a concurrent worker from the concurrent automa.

    Parameters
    ----------
    key : str
        The key of the worker to be removed.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the merge worker is not allowed to be removed from the concurrent automa")
    super().remove_worker(key=key)

all_workers

all_workers() -> List[str]

Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

Returns:

Type Description
List[str]

A list of concurrent worker keys.

Source code in bridgic/core/agentic/_concurrent_automa.py
def all_workers(self) -> List[str]:
    """
    Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

    Returns
    -------
    List[str]
        A list of concurrent worker keys.
    """
    keys_list = super().all_workers()
    # Implementation notes:
    # Hide the merger worker from the list of concurrent workers.
    return list(filter(lambda key: key != self._MERGER_WORKER_KEY, keys_list))

SequentialAutoma

Bases: GraphAutoma

This class is to provide an easy way to orchestrate workers in a strictly sequential manner.

Each worker within the SequentialAutoma is invoked in the precise order determined by their positional index, ensuring a linear workflow where the output of one worker can serve as the input to the next.

Upon the completion of all registered workers, the SequentialAutoma returns the output produced by the final worker in the sequence as the overall result to the caller. This design enforces ordered, step-wise processing, making the SequentialAutoma particularly suitable for use cases that require strict procedural dependencies among constituent tasks.

Source code in bridgic/core/agentic/_sequential_automa.py
class SequentialAutoma(GraphAutoma):
    """
    This class is to provide an easy way to orchestrate workers in a strictly 
    sequential manner.

    Each worker within the SequentialAutoma is invoked in the precise order determined 
    by their positional index, ensuring a linear workflow where the output of one worker 
    can serve as the input to the next.

    Upon the completion of all registered workers, the SequentialAutoma returns the output 
    produced by the final worker in the sequence as the overall result to the caller. This 
    design enforces ordered, step-wise processing, making the SequentialAutoma particularly 
    suitable for use cases that require strict procedural dependencies among constituent tasks.
    """

    # Automa type.
    AUTOMA_TYPE: ClassVar[AutomaType] = AutomaType.Sequential

    _TAIL_WORKER_KEY: Final[str] = "__tail__"
    _last_worker_key: Optional[str]

    def __init__(
        self,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        cls = type(self)
        self._last_worker_key = None
        if cls.AUTOMA_TYPE == AutomaType.Sequential:
            # The _registered_worker_funcs data are from @worker decorators.
            # Initialize the decorated sequential workers.
            for worker_key, worker_func in self._registered_worker_funcs.items():
                is_start = self._last_worker_key is None
                dependencies = [] if self._last_worker_key is None else [self._last_worker_key]
                super().add_func_as_worker(
                    key=worker_key,
                    func=worker_func,
                    dependencies=dependencies,
                    is_start=is_start,
                    args_mapping_rule=worker_func.__args_mapping_rule__,
                )
                self._last_worker_key = worker_key

        if self._last_worker_key is not None:
            # Add a hidden worker as the tail worker.
            super().add_func_as_worker(
                key=self._TAIL_WORKER_KEY,
                func=self._tail_worker,
                dependencies=[self._last_worker_key],
                is_output=True,
                args_mapping_rule=ArgsMappingRule.AS_IS,
            )

    def _tail_worker(self, result: Any) -> Any:
        # Return the result of the last worker without any modification.
        return result

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["last_worker_key"] = self._last_worker_key
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._last_worker_key = state_dict["last_worker_key"]

    def __add_worker_internal(
        self,
        key: str,
        func_or_worker: Union[Callable, Worker],
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        is_start = self._last_worker_key is None
        dependencies = [] if self._last_worker_key is None else [self._last_worker_key]
        if isinstance(func_or_worker, Callable):
            super().add_func_as_worker(
                key=key, 
                func=func_or_worker,
                dependencies=dependencies,
                is_start=is_start,
                args_mapping_rule=args_mapping_rule,
            )
        else:
            super().add_worker(
                key=key, 
                worker=func_or_worker,
                dependencies=dependencies,
                is_start=is_start,
                args_mapping_rule=args_mapping_rule,
            )
        if self._last_worker_key is not None:
            # Remove the old hidden tail worker.
            super().remove_worker(self._TAIL_WORKER_KEY)

        # Add a new hidden tail worker.
        self._last_worker_key = key
        super().add_func_as_worker(
            key=self._TAIL_WORKER_KEY,
            func=self._tail_worker,
            dependencies=[self._last_worker_key],
            is_output=True,
            args_mapping_rule=ArgsMappingRule.AS_IS,
        )

    @override
    def add_worker(
        self,
        key: str,
        worker: Worker,
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        """
        Add a sequential worker to the sequential automa at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker.
        worker : Worker
            The worker instance to be registered.
        args_mapping_rule : ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")

        self.__add_worker_internal(
            key, 
            worker, 
            args_mapping_rule=args_mapping_rule
        )

    @override
    def add_func_as_worker(
        self,
        key: str,
        func: Callable,
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        """
        Add a function or method as a sequential worker to the sequential automa at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker.
        func : Callable
            The function to be added as a sequential worker to the automa.
        args_mapping_rule : ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")

        self.__add_worker_internal(
            key, 
            func, 
            args_mapping_rule=args_mapping_rule
        )

    @override
    def worker(
        self,
        *,
        key: Optional[str] = None,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> Callable:
        """
        This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker. If not provided, the name of the decorated callable will be used.
        args_mapping_rule: ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

        def wrapper(func: Callable):
            self.__add_worker_internal(
                key, 
                func, 
                args_mapping_rule=args_mapping_rule
            )

        return wrapper

    @override
    def remove_worker(self, key: str) -> None:
        raise AutomaRuntimeError(f"remove_worker() is not allowed to be called on a sequential automa")

    @override
    def add_dependency(
        self,
        key: str,
        depends: str,
    ) -> None:
        raise AutomaRuntimeError(f"add_dependency() is not allowed to be called on a sequential automa")

    def ferry_to(self, worker_key: str, /, *args, **kwargs):
        raise AutomaRuntimeError(f"ferry_to() is not allowed to be called on a sequential automa")

add_worker

add_worker(
    key: str,
    worker: Worker,
    *,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> None

Add a sequential worker to the sequential automa at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
worker Worker

The worker instance to be registered.

required
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def add_worker(
    self,
    key: str,
    worker: Worker,
    *,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> None:
    """
    Add a sequential worker to the sequential automa at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker.
    worker : Worker
        The worker instance to be registered.
    args_mapping_rule : ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")

    self.__add_worker_internal(
        key, 
        worker, 
        args_mapping_rule=args_mapping_rule
    )

add_func_as_worker

add_func_as_worker(
    key: str,
    func: Callable,
    *,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> None

Add a function or method as a sequential worker to the sequential automa at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
func Callable

The function to be added as a sequential worker to the automa.

required
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def add_func_as_worker(
    self,
    key: str,
    func: Callable,
    *,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> None:
    """
    Add a function or method as a sequential worker to the sequential automa at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker.
    func : Callable
        The function to be added as a sequential worker to the automa.
    args_mapping_rule : ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")

    self.__add_worker_internal(
        key, 
        func, 
        args_mapping_rule=args_mapping_rule
    )

worker

worker(
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> Callable

This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker. If not provided, the name of the decorated callable will be used.

None
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def worker(
    self,
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> Callable:
    """
    This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker. If not provided, the name of the decorated callable will be used.
    args_mapping_rule: ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

    def wrapper(func: Callable):
        self.__add_worker_internal(
            key, 
            func, 
            args_mapping_rule=args_mapping_rule
        )

    return wrapper