Skip to content

agentic

The Agentic module provides core components for building intelligent agent systems.

This module contains various Automa implementations for orchestrating and executing LLM-based workflows or agents. These Automa implementations are typically composed together to build complex intelligent agents with advanced capabilities.

ConcurrentAutoma

Bases: GraphAutoma

This class is to provide concurrent execution of multiple workers.

In accordance with the defined "Concurrency Model of Worker", each worker within a ConcurrentAutoma can be configured to operate in one of two concurrency modes:

  1. Async Mode: Workers execute concurrently in an asynchronous fashion, driven by the event loop of the main thread. This execution mode corresponds to the arun() method of the Worker.
  2. Parallel Mode: Workers execute synchronously, each running in a dedicated thread within a thread pool managed by the ConcurrentAutoma. This execution mode corresponds to the run() method of the Worker.

Upon completion of all worker tasks, the concurrent automa instance aggregates the result outputs from each worker into a single list, which is then returned to the caller.

Source code in bridgic/core/agentic/_concurrent_automa.py
class ConcurrentAutoma(GraphAutoma):
    """
    This class is to provide concurrent execution of multiple workers.

    In accordance with the defined "Concurrency Model of Worker", each worker within 
    a ConcurrentAutoma can be configured to operate in one of two concurrency modes:

    1. **Async Mode**: Workers execute concurrently in an asynchronous fashion, driven 
    by the event loop of the main thread. This execution mode corresponds to the `arun()` 
    method of the Worker.
    2. **Parallel Mode**: Workers execute synchronously, each running in a dedicated 
    thread within a thread pool managed by the ConcurrentAutoma. This execution mode 
    corresponds to the `run()` method of the Worker.

    Upon completion of all worker tasks, the concurrent automa instance aggregates 
    the result outputs from each worker into a single list, which is then returned 
    to the caller.
    """

    # Automa type.
    AUTOMA_TYPE: ClassVar[AutomaType] = AutomaType.Concurrent

    _MERGER_WORKER_KEY: Final[str] = "__merger__"

    def __init__(
        self,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        # Implementation notes:
        # There are two types of workers in the concurrent automa:
        # 1. Concurrent workers: These workers will be concurrently executed with each other.
        # 2. The Merger worker: This worker will merge the results of all the concurrent workers.

        cls = type(self)
        if cls.AUTOMA_TYPE == AutomaType.Concurrent:
            # The _registered_worker_funcs data are from @worker decorators.
            # Initialize the decorated concurrent workers.
            for worker_key, worker_func in self._registered_worker_funcs.items():
                super().add_func_as_worker(
                    key=worker_key,
                    func=worker_func,
                    is_start=True,
                )

        # Add a hidden worker as the merger worker, which will merge the results of all the start workers.
        super().add_func_as_worker(
            key=self._MERGER_WORKER_KEY,
            func=self._merge_workers_results,
            dependencies=super().all_workers(),
            is_output=True,
            args_mapping_rule=ArgsMappingRule.MERGE,
        )

    def _merge_workers_results(self, results: List[Any]) -> List[Any]:
        return results

    @override
    def add_worker(
        self,
        key: str,
        worker: Worker,
    ) -> None:
        """
        Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the worker.
        worker : Worker
            The worker instance to be registered.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")
        # Implementation notes:
        # Concurrent workers are implemented as start workers in the underlying graph automa.
        super().add_worker(key=key, worker=worker, is_start=True)
        super().add_dependency(self._MERGER_WORKER_KEY, key)

    @override
    def add_func_as_worker(
        self,
        key: str,
        func: Callable,
    ) -> None:
        """
        Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the function worker.
        func : Callable
            The function to be added as a concurrent worker to the automa.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")
        # Implementation notes:
        # Concurrent workers are implemented as start workers in the underlying graph automa.
        super().add_func_as_worker(key=key, func=func, is_start=True)
        super().add_dependency(self._MERGER_WORKER_KEY, key)

    @override
    def worker(
        self,
        *,
        key: Optional[str] = None,
    ) -> Callable:
        """
        This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

        Parameters
        ----------
        key : str
            The key of the worker. If not provided, the name of the decorated callable will be used.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

        super_automa = super()
        def wrapper(func: Callable):
            super_automa.add_func_as_worker(key=key, func=func, is_start=True)
            super_automa.add_dependency(self._MERGER_WORKER_KEY, key)

        return wrapper

    @override
    def remove_worker(self, key: str) -> None:
        """
        Remove a concurrent worker from the concurrent automa.

        Parameters
        ----------
        key : str
            The key of the worker to be removed.
        """
        if key == self._MERGER_WORKER_KEY:
            raise AutomaRuntimeError(f"the merge worker is not allowed to be removed from the concurrent automa")
        super().remove_worker(key=key)

    @override
    def add_dependency(
        self,
        key: str,
        dependency: str,
    ) -> None:
        raise AutomaRuntimeError(f"add_dependency() is not allowed to be called on a concurrent automa")

    def all_workers(self) -> List[str]:
        """
        Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

        Returns
        -------
        List[str]
            A list of concurrent worker keys.
        """
        keys_list = super().all_workers()
        # Implementation notes:
        # Hide the merger worker from the list of concurrent workers.
        return list(filter(lambda key: key != self._MERGER_WORKER_KEY, keys_list))

    def ferry_to(self, worker_key: str, /, *args, **kwargs):
        raise AutomaRuntimeError(f"ferry_to() is not allowed to be called on a concurrent automa")

    async def arun(
        self, 
        *args: Tuple[Any, ...],
        feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
        **kwargs: Dict[str, Any]
    ) -> List[Any]:
        result = await super().arun(
            *args,
            feedback_data=feedback_data,
            **kwargs
        )
        return cast(List[Any], result)

add_worker

add_worker(key: str, worker: Worker) -> None

Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
worker Worker

The worker instance to be registered.

required
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def add_worker(
    self,
    key: str,
    worker: Worker,
) -> None:
    """
    Add a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the worker.
    worker : Worker
        The worker instance to be registered.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")
    # Implementation notes:
    # Concurrent workers are implemented as start workers in the underlying graph automa.
    super().add_worker(key=key, worker=worker, is_start=True)
    super().add_dependency(self._MERGER_WORKER_KEY, key)

add_func_as_worker

add_func_as_worker(key: str, func: Callable) -> None

Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the function worker.

required
func Callable

The function to be added as a concurrent worker to the automa.

required
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def add_func_as_worker(
    self,
    key: str,
    func: Callable,
) -> None:
    """
    Add a function or method as a concurrent worker to the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the function worker.
    func : Callable
        The function to be added as a concurrent worker to the automa.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")
    # Implementation notes:
    # Concurrent workers are implemented as start workers in the underlying graph automa.
    super().add_func_as_worker(key=key, func=func, is_start=True)
    super().add_dependency(self._MERGER_WORKER_KEY, key)

worker

worker(*, key: Optional[str] = None) -> Callable

This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

Parameters:

Name Type Description Default
key str

The key of the worker. If not provided, the name of the decorated callable will be used.

None
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def worker(
    self,
    *,
    key: Optional[str] = None,
) -> Callable:
    """
    This is a decorator to mark a function or method as a concurrent worker of the concurrent automa. This worker will be concurrently executed with other concurrent workers.

    Parameters
    ----------
    key : str
        The key of the worker. If not provided, the name of the decorated callable will be used.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

    super_automa = super()
    def wrapper(func: Callable):
        super_automa.add_func_as_worker(key=key, func=func, is_start=True)
        super_automa.add_dependency(self._MERGER_WORKER_KEY, key)

    return wrapper

remove_worker

remove_worker(key: str) -> None

Remove a concurrent worker from the concurrent automa.

Parameters:

Name Type Description Default
key str

The key of the worker to be removed.

required
Source code in bridgic/core/agentic/_concurrent_automa.py
@override
def remove_worker(self, key: str) -> None:
    """
    Remove a concurrent worker from the concurrent automa.

    Parameters
    ----------
    key : str
        The key of the worker to be removed.
    """
    if key == self._MERGER_WORKER_KEY:
        raise AutomaRuntimeError(f"the merge worker is not allowed to be removed from the concurrent automa")
    super().remove_worker(key=key)

all_workers

all_workers() -> List[str]

Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

Returns:

Type Description
List[str]

A list of concurrent worker keys.

Source code in bridgic/core/agentic/_concurrent_automa.py
def all_workers(self) -> List[str]:
    """
    Gets a list containing the keys of all concurrent workers registered in this concurrent automa.

    Returns
    -------
    List[str]
        A list of concurrent worker keys.
    """
    keys_list = super().all_workers()
    # Implementation notes:
    # Hide the merger worker from the list of concurrent workers.
    return list(filter(lambda key: key != self._MERGER_WORKER_KEY, keys_list))

SequentialAutoma

Bases: GraphAutoma

This class is to provide an easy way to orchestrate workers in a strictly sequential manner.

Each worker within the SequentialAutoma is invoked in the precise order determined by their positional index, ensuring a linear workflow where the output of one worker can serve as the input to the next.

Upon the completion of all registered workers, the SequentialAutoma returns the output produced by the final worker in the sequence as the overall result to the caller. This design enforces ordered, step-wise processing, making the SequentialAutoma particularly suitable for use cases that require strict procedural dependencies among constituent tasks.

Source code in bridgic/core/agentic/_sequential_automa.py
class SequentialAutoma(GraphAutoma):
    """
    This class is to provide an easy way to orchestrate workers in a strictly 
    sequential manner.

    Each worker within the SequentialAutoma is invoked in the precise order determined 
    by their positional index, ensuring a linear workflow where the output of one worker 
    can serve as the input to the next.

    Upon the completion of all registered workers, the SequentialAutoma returns the output 
    produced by the final worker in the sequence as the overall result to the caller. This 
    design enforces ordered, step-wise processing, making the SequentialAutoma particularly 
    suitable for use cases that require strict procedural dependencies among constituent tasks.
    """

    # Automa type.
    AUTOMA_TYPE: ClassVar[AutomaType] = AutomaType.Sequential

    _TAIL_WORKER_KEY: Final[str] = "__tail__"
    _last_worker_key: Optional[str]

    def __init__(
        self,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        cls = type(self)
        self._last_worker_key = None
        if cls.AUTOMA_TYPE == AutomaType.Sequential:
            # The _registered_worker_funcs data are from @worker decorators.
            # Initialize the decorated sequential workers.
            for worker_key, worker_func in self._registered_worker_funcs.items():
                is_start = self._last_worker_key is None
                dependencies = [] if self._last_worker_key is None else [self._last_worker_key]
                super().add_func_as_worker(
                    key=worker_key,
                    func=worker_func,
                    dependencies=dependencies,
                    is_start=is_start,
                    args_mapping_rule=worker_func.__args_mapping_rule__,
                )
                self._last_worker_key = worker_key

        if self._last_worker_key is not None:
            # Add a hidden worker as the tail worker.
            super().add_func_as_worker(
                key=self._TAIL_WORKER_KEY,
                func=self._tail_worker,
                dependencies=[self._last_worker_key],
                is_output=True,
                args_mapping_rule=ArgsMappingRule.AS_IS,
            )

    def _tail_worker(self, result: Any) -> Any:
        # Return the result of the last worker without any modification.
        return result

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["last_worker_key"] = self._last_worker_key
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._last_worker_key = state_dict["last_worker_key"]

    def __add_worker_internal(
        self,
        key: str,
        func_or_worker: Union[Callable, Worker],
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        is_start = self._last_worker_key is None
        dependencies = [] if self._last_worker_key is None else [self._last_worker_key]
        if isinstance(func_or_worker, Callable):
            super().add_func_as_worker(
                key=key, 
                func=func_or_worker,
                dependencies=dependencies,
                is_start=is_start,
                args_mapping_rule=args_mapping_rule,
            )
        else:
            super().add_worker(
                key=key, 
                worker=func_or_worker,
                dependencies=dependencies,
                is_start=is_start,
                args_mapping_rule=args_mapping_rule,
            )
        if self._last_worker_key is not None:
            # Remove the old hidden tail worker.
            super().remove_worker(self._TAIL_WORKER_KEY)

        # Add a new hidden tail worker.
        self._last_worker_key = key
        super().add_func_as_worker(
            key=self._TAIL_WORKER_KEY,
            func=self._tail_worker,
            dependencies=[self._last_worker_key],
            is_output=True,
            args_mapping_rule=ArgsMappingRule.AS_IS,
        )

    @override
    def add_worker(
        self,
        key: str,
        worker: Worker,
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        """
        Add a sequential worker to the sequential automa at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker.
        worker : Worker
            The worker instance to be registered.
        args_mapping_rule : ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")

        self.__add_worker_internal(
            key, 
            worker, 
            args_mapping_rule=args_mapping_rule
        )

    @override
    def add_func_as_worker(
        self,
        key: str,
        func: Callable,
        *,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> None:
        """
        Add a function or method as a sequential worker to the sequential automa at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker.
        func : Callable
            The function to be added as a sequential worker to the automa.
        args_mapping_rule : ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")

        self.__add_worker_internal(
            key, 
            func, 
            args_mapping_rule=args_mapping_rule
        )

    @override
    def worker(
        self,
        *,
        key: Optional[str] = None,
        args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
    ) -> Callable:
        """
        This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

        Parameters
        ----------
        key : str
            The key of the worker. If not provided, the name of the decorated callable will be used.
        args_mapping_rule: ArgsMappingRule
            The rule of arguments mapping.
        """
        if key == self._TAIL_WORKER_KEY:
            raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

        def wrapper(func: Callable):
            self.__add_worker_internal(
                key, 
                func, 
                args_mapping_rule=args_mapping_rule
            )

        return wrapper

    @override
    def remove_worker(self, key: str) -> None:
        raise AutomaRuntimeError(f"remove_worker() is not allowed to be called on a sequential automa")

    @override
    def add_dependency(
        self,
        key: str,
        depends: str,
    ) -> None:
        raise AutomaRuntimeError(f"add_dependency() is not allowed to be called on a sequential automa")

    def ferry_to(self, worker_key: str, /, *args, **kwargs):
        raise AutomaRuntimeError(f"ferry_to() is not allowed to be called on a sequential automa")

add_worker

add_worker(
    key: str,
    worker: Worker,
    *,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> None

Add a sequential worker to the sequential automa at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
worker Worker

The worker instance to be registered.

required
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def add_worker(
    self,
    key: str,
    worker: Worker,
    *,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> None:
    """
    Add a sequential worker to the sequential automa at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker.
    worker : Worker
        The worker instance to be registered.
    args_mapping_rule : ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_worker()`")

    self.__add_worker_internal(
        key, 
        worker, 
        args_mapping_rule=args_mapping_rule
    )

add_func_as_worker

add_func_as_worker(
    key: str,
    func: Callable,
    *,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> None

Add a function or method as a sequential worker to the sequential automa at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker.

required
func Callable

The function to be added as a sequential worker to the automa.

required
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def add_func_as_worker(
    self,
    key: str,
    func: Callable,
    *,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> None:
    """
    Add a function or method as a sequential worker to the sequential automa at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker.
    func : Callable
        The function to be added as a sequential worker to the automa.
    args_mapping_rule : ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `add_func_as_worker()`")

    self.__add_worker_internal(
        key, 
        func, 
        args_mapping_rule=args_mapping_rule
    )

worker

worker(
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = AS_IS
) -> Callable

This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

Parameters:

Name Type Description Default
key str

The key of the worker. If not provided, the name of the decorated callable will be used.

None
args_mapping_rule ArgsMappingRule

The rule of arguments mapping.

AS_IS
Source code in bridgic/core/agentic/_sequential_automa.py
@override
def worker(
    self,
    *,
    key: Optional[str] = None,
    args_mapping_rule: ArgsMappingRule = ArgsMappingRule.AS_IS,
) -> Callable:
    """
    This is a decorator to mark a function or method as a sequential worker of the sequential automa, at the end of the automa.

    Parameters
    ----------
    key : str
        The key of the worker. If not provided, the name of the decorated callable will be used.
    args_mapping_rule: ArgsMappingRule
        The rule of arguments mapping.
    """
    if key == self._TAIL_WORKER_KEY:
        raise AutomaRuntimeError(f"the reserved key `{key}` is not allowed to be used by `automa.worker()`")

    def wrapper(func: Callable):
        self.__add_worker_internal(
            key, 
            func, 
            args_mapping_rule=args_mapping_rule
        )

    return wrapper

ReActAutoma

Bases: GraphAutoma

A react automa is a subclass of graph automa that implements the ReAct prompting framework.

Parameters:

Name Type Description Default
llm ToolSelection

The LLM instance used by ReAct internal planning (i.e., used for tool selection).

required
system_prompt Optional[Union[str, SystemMessage]]

The system prompt used by ReAct. This argument can also be specified at runtime, i.e., when calling arun.

None
tools Optional[List[Union[Callable, Automa, ToolSpec]]]

The tools used by ReAct. A tool can be a function, an automa instance, or a ToolSpec instance. This argument can also be specified at runtime, i.e., when calling arun.

None
name Optional[str]

The name of the automa.

None
thread_pool Optional[ThreadPoolExecutor]

The thread pool for parallel running of I/O-bound or CPU-bound tasks.

None
running_options Optional[RunningOptions]

The running options for an automa instance (if needed).

None
max_iterations int

The maximum number of iterations to be executed.

DEFAULT_MAX_ITERATIONS
Source code in bridgic/core/agentic/react/_react_automa.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class ReActAutoma(GraphAutoma):
    """
    A react automa is a subclass of graph automa that implements the [ReAct](https://arxiv.org/abs/2210.03629) prompting framework.

    Parameters
    ----------
    llm : ToolSelection
        The LLM instance used by ReAct internal planning (i.e., used for tool selection).
    system_prompt : Optional[Union[str, SystemMessage]]
        The system prompt used by ReAct. This argument can also be specified at runtime, i.e., when calling `arun`.
    tools : Optional[List[Union[Callable, Automa, ToolSpec]]]
        The tools used by ReAct. A tool can be a function, an automa instance, or a `ToolSpec` instance. This argument can also be specified at runtime, i.e., when calling `arun`.
    name : Optional[str]
        The name of the automa.
    thread_pool : Optional[ThreadPoolExecutor]
        The thread pool for parallel running of I/O-bound or CPU-bound tasks.
    running_options : Optional[RunningOptions]
        The running options for an automa instance (if needed).
    max_iterations : int
        The maximum number of iterations to be executed.
    """

    _llm: ToolSelection
    """ The LLM to be used by the react automa. """
    _tools: Optional[List[ToolSpec]]
    """ The candidate tools to be used by the react automa. """
    _system_prompt: Optional[SystemMessage]
    """ The system prompt to be used by the react automa. """
    _max_iterations: int
    """ The maximum number of iterations for the react automa. """
    _prompt_template: str
    """ The template file for the react automa. """
    _jinja_env: Environment
    """ The Jinja environment to be used by the react automa. """
    _jinja_template: Template
    """ The Jinja template to be used by the react automa. """

    def __init__(
        self,
        llm: ToolSelection,
        system_prompt: Optional[Union[str, SystemMessage]] = None,
        tools: Optional[List[Union[Callable, Automa, ToolSpec]]] = None,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        prompt_template: str = DEFAULT_TEMPLATE_FILE,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        self._llm = llm
        if system_prompt:
            # Validate SystemMessage...
            if isinstance(system_prompt, str):
                system_prompt = SystemMessage(role="system", content=system_prompt)
            elif ("role" not in system_prompt) or (system_prompt["role"] != "system"):
                raise ValueError(f"Invalid `system_prompt` value received: {system_prompt}. It should contain `role`=`system`.")

        self._system_prompt = system_prompt
        if tools:
            self._tools = [self._ensure_tool_spec(tool) for tool in tools]
        else:
            self._tools = None
        self._max_iterations = max_iterations
        self._prompt_template = prompt_template
        self._jinja_env = Environment(loader=PackageLoader("bridgic.core.agentic.react"))
        self._jinja_template = self._jinja_env.get_template(prompt_template)

        self.add_worker(
            key="tool_selector",
            worker=ToolSelectionWorker(tool_selection_llm=llm),
            dependencies=["assemble_context"],
            args_mapping_rule=ArgsMappingRule.UNPACK,
        )

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["tools"] = self._tools
        state_dict["system_prompt"] = self._system_prompt
        state_dict["llm"] = self._llm
        state_dict["max_iterations"] = self._max_iterations
        state_dict["prompt_template"] = self._prompt_template
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._max_iterations = state_dict["max_iterations"]
        self._prompt_template = state_dict["prompt_template"]
        self._tools = state_dict["tools"]
        self._system_prompt = state_dict["system_prompt"]
        self._llm = state_dict["llm"]
        self._jinja_env = Environment(loader=PackageLoader("bridgic.core.agentic.react"))
        self._jinja_template = self._jinja_env.get_template(self._prompt_template)

    @property
    def max_iterations(self) -> int:
        return self._max_iterations

    @max_iterations.setter
    def max_iterations(self, max_iterations: int) -> None:
        self._max_iterations = max_iterations

    @property
    def prompt_template(self) -> str:
        return self._prompt_template

    @prompt_template.setter
    def prompt_template(self, prompt_template: str) -> None:
        self._prompt_template = prompt_template

    @override
    async def arun(
        self,
        user_msg: Optional[Union[str, UserTextMessage]] = None,
        *,
        chat_history: Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]] = None,
        messages: Optional[List[ChatMessage]] = None,
        tools: Optional[List[Union[Callable, Automa, ToolSpec]]] = None,
        feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
    ) -> Any:
        """
        The entry point for a `ReActAutoma` instance.

        Parameters
        ----------
        user_msg : Optional[Union[str, UserTextMessage]]
            The input message from user. If this `user_msg` messages is provided and the `messages` is NOT provided, the final prompt given to the LLM will be composed of three parts: `system_prompt` + `chat_history` + `user_msg`.
        chat_history : Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]]
            The chat history.
        messages : Optional[List[ChatMessage]]
            The whole message list to LLM. If this `messages` argument is provided, the final prompt given to the LLM will use this argument instead of the `user_msg` argument.
        tools : Optional[List[Union[Callable, Automa, ToolSpec]]]
            The tools used by ReAct. A tool can be a function, an automa instance, or a `ToolSpec` instance. This argument can also be specified during the initialization of a `ReActAutoma` instance.
        feedback_data : Optional[Union[InteractionFeedback, List[InteractionFeedback]]]
            Feedbacks that are received from one or multiple human interactions occurred before the
            Automa was paused. This argument may be of type `InteractionFeedback` or 
            `List[InteractionFeedback]`. If only one interaction occurred, `feedback_data` should be
            of type `InteractionFeedback`. If multiple interactions occurred simultaneously, 
            `feedback_data` should be of type `List[InteractionFeedback]`.

        Returns
        -------
        Any
            The execution result of the output-worker that has the setting `is_output=True`,
            otherwise None.
        """
        return await super().arun(
            user_msg=user_msg,
            chat_history=chat_history,
            messages=messages,
            tools=tools,
            feedback_data=feedback_data,
        )

    @worker(is_start=True)
    async def validate_and_transform(
        self,
        user_msg: Optional[Union[str, UserTextMessage]] = None,
        *,
        chat_history: Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]] = None,
        messages: Optional[List[ChatMessage]] = None,
        tools: Optional[List[Union[Callable, Automa, ToolSpec]]] = None,
    ) -> Dict[str, Any]:

        # Part One: validate and transform the input messages.
        # Unify input messages of various types to the `ChatMessage` format.
        chat_messages: List[ChatMessage] = []
        if messages:
            # If `messages` is provided, use it directly.
            chat_messages = messages
        elif user_msg:
            # Since `messages` is not provided, join the system prompt + `chat_history` + `user_msg`
            # First, append the `system_prompt`
            if self._system_prompt:
                chat_messages.append(self._system_prompt)

            # Second, append the `chat_history`
            if chat_history:
                for history_msg in chat_history:
                    # Validate the history messages...
                    role = history_msg["role"]
                    if role == "user" or role == "assistant" or role == "tool":
                        chat_messages.append(history_msg)
                    else:
                        raise ValueError(f"Invalid role: `{role}` received in history message: `{history_msg}`, expected `user`, `assistant`, or `tool`.")

            # Third, append the `user_msg`
            if isinstance(user_msg, str):
                chat_messages.append(UserTextMessage(role="user", content=user_msg))
            elif isinstance(user_msg, dict):
                if "role" in user_msg and user_msg["role"] == "user":
                    chat_messages.append(user_msg)
                else:
                    raise ValueError(f"`role` must be `user` in user message: `{user_msg}`.")
        else:
            raise ValueError(f"Either `messages` or `user_msg` must be provided.")

        # Part Two: validate and transform the intput tools.
        # Unify input tools of various types to the `ToolSpec` format.
        if self._tools:
            tool_spec_list = self._tools
        elif tools:
            tool_spec_list = [self._ensure_tool_spec(tool) for tool in tools]
        else:
            # TODO: whether to support empty tool list?
            tool_spec_list = []

        return {
            "initial_messages": chat_messages,
            "candidate_tools": tool_spec_list,
        }

    @worker(dependencies=["validate_and_transform"], args_mapping_rule=ArgsMappingRule.UNPACK)
    async def assemble_context(
        self,
        *,
        initial_messages: Optional[List[ChatMessage]] = None,
        candidate_tools: Optional[List[ToolSpec]] = None,
        tool_selection_outputs: Tuple[List[ToolCall], Optional[str]] = From("tool_selector", default=None),
        tool_result_messages: Optional[List[ToolMessage]] = None,
        rtx = System("runtime_context"),
    ) -> Dict[str, Any]:
        # print(f"\n******* ReActAutoma.assemble_context *******\n")
        # print(f"initial_messages: {initial_messages}")
        # print(f"candidate_tools: {candidate_tools}")
        # print(f"tool_selection_outputs: {tool_selection_outputs}")
        # print(f"tool_result_messages: {tool_result_messages}")

        local_space = self.get_local_space(rtx)
        # Build messages memory with help of local space.
        messages_memory: List[ChatMessage] = []
        if initial_messages:
            # If `messages` is provided, use it to re-initialize the messages memory.
            messages_memory = initial_messages.copy()
        else:
            messages_memory = local_space.get("messages_memory", [])
        if tool_selection_outputs:
            # Transform tools_calls format:
            tool_calls = tool_selection_outputs[0]
            tool_calls_list = [
                FunctionToolCall(
                    id=tool_call.id,
                    type="function",
                    function=Function(
                        name=tool_call.name,
                        arguments=tool_call.arguments,
                    ),
                ) for tool_call in tool_calls
            ]
            llm_response = tool_selection_outputs[1]
            assistant_message = AssistantTextMessage(
                role="assistant",
                # TOD: name?
                content=llm_response,
                tool_calls=tool_calls_list,
            )
            messages_memory.append(assistant_message)
        if tool_result_messages:
            messages_memory.extend(tool_result_messages)
        local_space["messages_memory"] = messages_memory
        # print("--------------------------------")
        # print(f"messages_memory: {messages_memory}")

        # Save & retrieve tools with help of local space.
        if candidate_tools:
            local_space["tools"] = candidate_tools
        else:
            candidate_tools = local_space.get("tools", [])

        # Note: here 'messages' and `tools` are injected into the template as variables.
        raw_prompt = self._jinja_template.render(messages=messages_memory, tools=candidate_tools)
        # print(f"\n ##### raw_prompt ##### \n{raw_prompt}")

        # Note: the jinjia template must conform to the TypedDict `ChatMessage` format (in json).
        llm_messages = cast(List[ChatMessage], json.loads(raw_prompt))
        llm_tools: List[Tool] = [tool.to_tool() for tool in candidate_tools]

        return {
            "messages": llm_messages,
            "tools": llm_tools,
        }

    @worker(dependencies=["tool_selector"], args_mapping_rule=ArgsMappingRule.UNPACK)
    async def plan_next_step(
        self,
        tool_calls: List[ToolCall],
        llm_response: Optional[str] = None,
        messages_and_tools: dict = From("validate_and_transform"),
        rtx = System("runtime_context"),
    ) -> None:
        local_space = self.get_local_space(rtx)
        iterations_count = local_space.get("iterations_count", 0)
        iterations_count += 1
        local_space["iterations_count"] = iterations_count
        if iterations_count > self._max_iterations:
            # TODO: how to report this to users?
            self.ferry_to(
                "finally_summarize", 
                final_answer=f"Sorry, I am unable to answer your question after {self._max_iterations} iterations. Please try again later."
            )
            return

        # TODO: maybe hand over the control flow to users?
        # print(f"\n******* ReActAutoma.plan_next_step *******\n")
        # print(f"tool_calls: {tool_calls}")
        # print(f"llm_response: {llm_response}")
        if tool_calls:
            tool_spec_list = messages_and_tools["candidate_tools"]
            matched_list = self._match_tool_calls_and_tool_specs(tool_calls, tool_spec_list)
            if matched_list:
                matched_tool_calls = []
                tool_worker_keys = []
                for tool_call, tool_spec in matched_list:
                    matched_tool_calls.append(tool_call)
                    tool_worker = tool_spec.create_worker()
                    worker_key = f"tool_{tool_call.name}_{tool_call.id}"
                    self.add_worker(
                        key=worker_key,
                        worker=tool_worker,
                    )
                    # TODO: convert tool_call.arguments to the tool parameters types
                    # TODO: validate the arguments against the tool parameters / json schema
                    self.ferry_to(worker_key, **tool_call.arguments)
                    tool_worker_keys.append(worker_key)
                self.add_func_as_worker(
                    key="merge_tools_results",
                    func=self.merge_tools_results,
                    dependencies=tool_worker_keys,
                    args_mapping_rule=ArgsMappingRule.MERGE,
                )
                return matched_tool_calls
            else:
                # TODO
                ...
        else:
            # Got final answer from the LLM.
            self.ferry_to("finally_summarize", final_answer=llm_response)

    async def merge_tools_results(
        self, 
        tool_results: List[Any],
        tool_calls: List[ToolCall] = From("plan_next_step"),
    ) -> List[ToolMessage]:
        # print(f"\n******* ReActAutoma.merge_tools_results *******\n")
        # print(f"tool_results: {tool_results}")
        # print(f"tool_calls: {tool_calls}")
        assert len(tool_results) == len(tool_calls)
        tool_messages = []
        for tool_result, tool_call in zip(tool_results, tool_calls):
            tool_messages.append(ToolMessage(
                role="tool", 
                # Note: Convert the tool result to string, since a tool can return any type of data.
                # TODO: maybe we can use a better way to serialize the tool result?
                content=str(tool_result), 
                tool_call_id=tool_call.id
            ))
            # Remove the tool workers
            self.remove_worker(f"tool_{tool_call.name}_{tool_call.id}")
        # Remove self...
        self.remove_worker("merge_tools_results")
        self.ferry_to("assemble_context", tool_result_messages=tool_messages)
        return tool_messages

    @worker(is_output=True)
    async def finally_summarize(self, final_answer: str) -> str:
        return final_answer

    def _ensure_tool_spec(self, tool: Union[Callable, Automa, ToolSpec]) -> ToolSpec:
        if isinstance(tool, ToolSpec):
            return tool
        elif isinstance(tool, type) and issubclass(tool, Automa):
            return AutomaToolSpec.from_raw(tool)
        elif isinstance(tool, Callable):
            # Note: this test against `Callable` should be placed at last.
            return FunctionToolSpec.from_raw(tool)
        else:
            raise TypeError(f"Invalid tool type: {type(tool)} detected, expected `Callable`, `Automa`, or `ToolSpec`.")

    def _match_tool_calls_and_tool_specs(
        self,
        tool_calls: List[ToolCall],
        tool_spec_list: List[ToolSpec],
    ) -> List[Tuple[ToolCall, ToolSpec]]:
        """
        This function is used to match the tool calls and the tool specs based on the tool name.

        Parameters
        ----------
        tool_calls : List[ToolCall]
            The tool calls to match.
        tool_spec_list : List[ToolSpec]
            The tool specs to match.

        Returns
        -------
        List[(ToolCall, ToolSpec)]
            The matched tool calls and tool specs.
        """
        matched_list: List[Tuple[ToolCall, ToolSpec]] = []
        for tool_call in tool_calls:
            for tool_spec in tool_spec_list:
                if tool_call.name == tool_spec.tool_name:
                    matched_list.append((tool_call, tool_spec))
        return matched_list

arun

async
arun(
    user_msg: Optional[Union[str, UserTextMessage]] = None,
    *,
    chat_history: Optional[
        List[
            Union[
                UserTextMessage,
                AssistantTextMessage,
                ToolMessage,
            ]
        ]
    ] = None,
    messages: Optional[List[ChatMessage]] = None,
    tools: Optional[
        List[Union[Callable, Automa, ToolSpec]]
    ] = None,
    feedback_data: Optional[
        Union[
            InteractionFeedback, List[InteractionFeedback]
        ]
    ] = None
) -> Any

The entry point for a ReActAutoma instance.

Parameters:

Name Type Description Default
user_msg Optional[Union[str, UserTextMessage]]

The input message from user. If this user_msg messages is provided and the messages is NOT provided, the final prompt given to the LLM will be composed of three parts: system_prompt + chat_history + user_msg.

None
chat_history Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]]

The chat history.

None
messages Optional[List[ChatMessage]]

The whole message list to LLM. If this messages argument is provided, the final prompt given to the LLM will use this argument instead of the user_msg argument.

None
tools Optional[List[Union[Callable, Automa, ToolSpec]]]

The tools used by ReAct. A tool can be a function, an automa instance, or a ToolSpec instance. This argument can also be specified during the initialization of a ReActAutoma instance.

None
feedback_data Optional[Union[InteractionFeedback, List[InteractionFeedback]]]

Feedbacks that are received from one or multiple human interactions occurred before the Automa was paused. This argument may be of type InteractionFeedback or List[InteractionFeedback]. If only one interaction occurred, feedback_data should be of type InteractionFeedback. If multiple interactions occurred simultaneously, feedback_data should be of type List[InteractionFeedback].

None

Returns:

Type Description
Any

The execution result of the output-worker that has the setting is_output=True, otherwise None.

Source code in bridgic/core/agentic/react/_react_automa.py
@override
async def arun(
    self,
    user_msg: Optional[Union[str, UserTextMessage]] = None,
    *,
    chat_history: Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]] = None,
    messages: Optional[List[ChatMessage]] = None,
    tools: Optional[List[Union[Callable, Automa, ToolSpec]]] = None,
    feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
) -> Any:
    """
    The entry point for a `ReActAutoma` instance.

    Parameters
    ----------
    user_msg : Optional[Union[str, UserTextMessage]]
        The input message from user. If this `user_msg` messages is provided and the `messages` is NOT provided, the final prompt given to the LLM will be composed of three parts: `system_prompt` + `chat_history` + `user_msg`.
    chat_history : Optional[List[Union[UserTextMessage, AssistantTextMessage, ToolMessage]]]
        The chat history.
    messages : Optional[List[ChatMessage]]
        The whole message list to LLM. If this `messages` argument is provided, the final prompt given to the LLM will use this argument instead of the `user_msg` argument.
    tools : Optional[List[Union[Callable, Automa, ToolSpec]]]
        The tools used by ReAct. A tool can be a function, an automa instance, or a `ToolSpec` instance. This argument can also be specified during the initialization of a `ReActAutoma` instance.
    feedback_data : Optional[Union[InteractionFeedback, List[InteractionFeedback]]]
        Feedbacks that are received from one or multiple human interactions occurred before the
        Automa was paused. This argument may be of type `InteractionFeedback` or 
        `List[InteractionFeedback]`. If only one interaction occurred, `feedback_data` should be
        of type `InteractionFeedback`. If multiple interactions occurred simultaneously, 
        `feedback_data` should be of type `List[InteractionFeedback]`.

    Returns
    -------
    Any
        The execution result of the output-worker that has the setting `is_output=True`,
        otherwise None.
    """
    return await super().arun(
        user_msg=user_msg,
        chat_history=chat_history,
        messages=messages,
        tools=tools,
        feedback_data=feedback_data,
    )