Skip to content

recent

The module provides core components for the ReCENT memory management Algorithm.

ReCENT Algorithm (Recursive Compressed Episodic Node Tree Algorithm) is an algorithm designed to address issues such as context explosion and goal drift, by employing a recursive memory compression mechanism to compress the memory when necessary.

This module provides an agentic automa and its corresponding memory and task configurations:

  • ReCentAutoma: The main automaton that implements the ReCENT algorithm.
  • ReCentMemoryConfig: Configuration for ReCENT memory management.
  • ObservationTaskConfig: Configuration for the observation task.
  • ToolTaskConfig: Configuration for the tool selection task.
  • AnswerTaskConfig: Configuration for the answer generation task.
  • StopCondition: Stop condition configuration for ReCentAutoma.

The core data structures are:

  • EpisodicNodeTree: Tree of episodic nodes which is the core data structure of ReCENT.
  • BaseEpisodicNode: Base class for all episodic nodes. It is inherited by:
    • GoalEpisodicNode: A goal node that represents the goal of the agent.
    • LeafEpisodicNode: A leaf node that represents a sequence of messages.
    • CompressionEpisodicNode: A compression node that summarizes a sequence of episodic nodes.

ReCentAutoma

Bases: GraphAutoma

ReCentAutoma is an automa that implements a ReAct-like process, leveraging the ReCENT memory algorithm to support stronger autonomous next-step planning, thus better achieving the pre-set goal.

This automa extends GraphAutoma to provide a memory-aware agentic automa that: - Maintains episodic memory with compression capabilities - Supports goal-oriented task execution - Dynamically creates tool workers based on LLM decisions - Manages memory compression to prevent context explosion

Parameters:

Name Type Description Default
llm BaseLlm

The LLM that serves as the default LLM for all tasks (if a dedicated LLM is not configured for a specific task).

required
tools Optional[List[Union[Callable, Automa, ToolSpec]]]

List of tools available to the automa. Can be functions, Automa instances, or ToolSpec instances.

None
tools_builders Optional[List[ToolSetBuilder]]

List of ToolSetBuilder instances used to dynamically create ToolSpec instances at initialization. This is useful when the automa needs to exclusive access to a resource-like tool set, like browser, terminal, etc.

None
stop_condition Optional[StopCondition]

Stop condition configuration. If None, uses default configuration: - max_iteration: -1 - max_consecutive_no_tool_selected: 3

None
memory_config Optional[ReCentMemoryConfig]

Memory configuration for ReCent memory management. If None, a default config will be created using the provided llm.

None
observation_task_config Optional[ObservationTaskConfig]

Configuration for the observation task. If None, uses default config with the provided llm. If provided but system_template or instruction_template is None, will use default templates.

None
tool_task_config Optional[ToolTaskConfig]

Configuration for the tool selection task. If None, uses default config with the provided llm. If provided but system_template or instruction_template is None, will use default templates.

None
answer_task_config Optional[AnswerTaskConfig]

Configuration for the answer generation task. If None, uses default config with the provided llm. If provided but system_template or instruction_template is None, will use default templates.

None
name Optional[str]

The name of the automa instance.

None
thread_pool Optional[ThreadPoolExecutor]

The thread pool for parallel execution of I/O-bound or CPU-bound tasks.

None
running_options Optional[RunningOptions]

The running options for the automa instance.

None
Source code in bridgic/core/agentic/recent/_recent_automa.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
class ReCentAutoma(GraphAutoma):
    """
    ReCentAutoma is an automa that implements a ReAct-like process, leveraging the ReCENT memory 
    algorithm to support stronger autonomous next-step planning, thus better achieving the pre-set goal.

    This automa extends GraphAutoma to provide a memory-aware agentic automa that:
    - Maintains episodic memory with compression capabilities
    - Supports goal-oriented task execution
    - Dynamically creates tool workers based on LLM decisions
    - Manages memory compression to prevent context explosion

    Parameters
    ----------
    llm : BaseLlm
        The LLM that serves as the default LLM for all tasks (if a dedicated LLM is not configured for a specific task).
    tools : Optional[List[Union[Callable, Automa, ToolSpec]]]
        List of tools available to the automa. Can be functions, Automa instances, or ToolSpec instances.
    tools_builders : Optional[List[ToolSetBuilder]]
        List of `ToolSetBuilder` instances used to dynamically create `ToolSpec` instances at initialization.
        This is useful when the automa needs to exclusive access to a resource-like tool set, like browser, terminal, etc.
    stop_condition : Optional[StopCondition]
        Stop condition configuration. If None, uses default configuration:
        - max_iteration: -1
        - max_consecutive_no_tool_selected: 3
    memory_config : Optional[ReCentMemoryConfig]
        Memory configuration for ReCent memory management. If None, a default config will be created using the provided llm.
    observation_task_config : Optional[ObservationTaskConfig]
        Configuration for the observation task. If None, uses default config with the provided `llm`.
        If provided but system_template or instruction_template is None, will use default templates.
    tool_task_config : Optional[ToolTaskConfig]
        Configuration for the tool selection task. If None, uses default config with the provided `llm`.
        If provided but system_template or instruction_template is None, will use default templates.
    answer_task_config : Optional[AnswerTaskConfig]
        Configuration for the answer generation task. If None, uses default config with the provided `llm`.
        If provided but system_template or instruction_template is None, will use default templates.
    name : Optional[str]
        The name of the automa instance.
    thread_pool : Optional[ThreadPoolExecutor]
        The thread pool for parallel execution of I/O-bound or CPU-bound tasks.
    running_options : Optional[RunningOptions]
        The running options for the automa instance.
    """

    _llm: BaseLlm
    """The main LLM which is used as fallback for the essential tasks."""

    _tool_specs: Optional[List[ToolSpec]]
    """List of tool specifications available to the automa."""

    _tools_builders: Optional[List[ToolSetBuilder]]
    """List of builders used to dynamically create ToolSpec instances."""

    _memory_manager: ReCentMemoryManager
    """Memory manager instance for managing episodic memory."""

    _observation_task_config: LlmTaskConfig
    """Configuration for the observation task."""

    _tool_task_config: LlmTaskConfig
    """Configuration for the tool selection task."""

    _answer_task_config: LlmTaskConfig
    """Configuration for the answer generation task."""

    _stop_condition: StopCondition
    """Stop condition configuration."""

    def __init__(
        self,
        llm: BaseLlm,
        tools: Optional[List[Union[Callable, Automa, ToolSpec]]] = None,
        tools_builders: Optional[List[ToolSetBuilder]] = None,
        stop_condition: Optional[StopCondition] = None,
        memory_config: Optional[ReCentMemoryConfig] = None,
        observation_task_config: Optional[ObservationTaskConfig] = None,
        tool_task_config: Optional[ToolTaskConfig] = None,
        answer_task_config: Optional[AnswerTaskConfig] = None,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)
        self._llm = llm

        # Initialize tool specs from direct tools.
        self._tool_specs = [self._ensure_tool_spec(tool) for tool in tools or []]

        # Initialize tools_builders and create ToolSpec instances from them.
        self._tools_builders = tools_builders or []
        if self._tools_builders:
            for builder in self._tools_builders:
                response = builder.build()
                self._tool_specs.extend(response.get("tool_specs", []))

        # Initialize memory manager with memory config.
        memory_config = memory_config or ReCentMemoryConfig(llm=llm)
        self._memory_manager = ReCentMemoryManager(compression_config=memory_config)

        # Initialize task-specific configs with defaults if not provided.
        if observation_task_config is None:
            observation_task_config = ObservationTaskConfig(llm=self._llm)
        self._observation_task_config = observation_task_config.to_llm_task_config()

        if tool_task_config is None:
            tool_task_config = ToolTaskConfig(llm=self._llm)
        self._tool_task_config = tool_task_config.to_llm_task_config()

        if answer_task_config is None:
            answer_task_config = AnswerTaskConfig(llm=self._llm)
        self._answer_task_config = answer_task_config.to_llm_task_config()

        # Initialize stop condition with defaults if not provided.
        self._stop_condition = stop_condition or StopCondition()

    def _ensure_tool_spec(self, tool: Union[Callable, Automa, ToolSpec]) -> ToolSpec:
        if isinstance(tool, ToolSpec):
            return tool
        elif isinstance(tool, type) and issubclass(tool, Automa):
            return AutomaToolSpec.from_raw(tool)
        elif isinstance(tool, Callable):
            # Note: this test against `Callable` should be placed at last.
            return FunctionToolSpec.from_raw(tool)
        else:
            raise TypeError(f"Invalid tool type: {type(tool)} detected, expected `Callable`, `Automa`, or `ToolSpec`.")

    @worker(is_start=True)
    async def initialize_task_goal(
        self,
        goal: str,
        guidance: Optional[str] = None,
    ):
        """
        Initialize the goal of the task and start the automa.

        This worker is the entry point of the automa. It creates a goal node as the first episodic 
        node in the memory sequence and optionally pushes initial user messages.

        Parameters
        ----------
        goal : str
            The task goal.
        guidance : Optional[str]
            The guidance for achieving the task goal.
        """
        if not goal:
            raise ValueError("Goal cannot be empty.")
        self._memory_manager.create_goal(goal, guidance)

        # Log task goal initialization in debug mode.
        top_options = self._get_top_running_options()
        if top_options.debug:
            msg = (
                f"[{type(self).__name__}]-[{self.name}] 🎯 Task Goal\n"
                f"{goal}" + (f"\n\n{guidance}" if guidance else '')
            )
            printer.print(msg)

        self.ferry_to("observe")

    @worker()
    async def observe(self, rtx = System("runtime_context")):
        """
        Observe the current state and determine if the goal has been achieved.

        This worker builds context from memory, uses LLM (with `StructuredOutput` protocol) to 
        determine if the goal has been achieved, and routes accordingly.
        """
        local_space = self.get_local_space(rtx)

        # 0. If iteration count is bigger than max_iteration, route to finalize_answer and stop.
        iteration_cnt = local_space.get("iteration_cnt", 0) + 1
        local_space["iteration_cnt"] = iteration_cnt

        if self._stop_condition.max_iteration >= 0:
            if iteration_cnt > self._stop_condition.max_iteration:
                self.ferry_to("finalize_answer")
                return

        # 1. Build context from memory.
        context: ReCentContext = await self._memory_manager.abuild_context()

        # 2. Build LLM message list for goal status evaluation.
        messages: List[Message] = []

        # Add system message for goal evaluation (if configured).
        if self._observation_task_config.system_template is not None:
            system_message = self._observation_task_config.system_template.format_message(
                role=Role.SYSTEM,
                goal_content=context["goal_content"],
                goal_guidance=context["goal_guidance"],
            )
            messages.append(system_message)

        # Add memory messages (observation history).
        if context["memory_messages"]:
            messages.extend(context["memory_messages"])

        # Add instruction message (if configured).
        if self._observation_task_config.instruction_template is not None:
            instruction_message = self._observation_task_config.instruction_template.format_message(role=Role.USER)
            messages.append(instruction_message)

        # 3. Call LLM with StructuredOutput to get goal status.
        observe_llm = self._observation_task_config.llm
        if not isinstance(observe_llm, StructuredOutput):
            raise TypeError(f"LLM must support StructuredOutput protocol, but {type(observe_llm)} does not.")

        goal_status: GoalStatus = await observe_llm.astructured_output(
            messages=messages,
            constraint=PydanticModel(model=GoalStatus),
        )
        observation_template = EjinjaPromptTemplate(
            "Goal Status:\n"
            "- Achieved: {%- if goal_status.goal_achieved %}goal_status.goal_achieved{% else %}No{% endif %}\n"
            "- Thinking: {%- if goal_status.brief_thinking %}{{ goal_status.brief_thinking }}{% else %}(No thinking){% endif %}"
        )
        observation_message = observation_template.format_message(
            role=Role.AI,
            goal_status=goal_status,
        )
        self._memory_manager.push_messages([observation_message])

        # Log observation result in debug mode.
        top_options = self._get_top_running_options()
        if top_options.debug:
            msg = (
                f"[{type(self).__name__}]-[{self.name}] 👀 Observation\n"
                f"    Iteration: {iteration_cnt}\n"
                f"    Achieved: {goal_status.goal_achieved}\n"
                f"    Thinking: {goal_status.brief_thinking}"
            )
            printer.print(msg, color="gray")

        # 4. Dynamic routing based on goal status.
        if goal_status.goal_achieved or not self._tool_specs:
            # Goal achieved, route to finalize_answer.
            self.ferry_to("finalize_answer")
        else:
            # Goal not achieved, prepare messages and tools, then route to select_tools.
            tool_select_messages = []

            # Add system message (if configured)
            if self._tool_task_config.system_template is not None:
                tool_system_message = self._tool_task_config.system_template.format_message(
                    role=Role.SYSTEM,
                    goal_content=context["goal_content"],
                    goal_guidance=context["goal_guidance"],
                )
                tool_select_messages.append(tool_system_message)

            # Add memory messages
            tool_select_messages.extend(context["memory_messages"])

            # Add instruction message (if configured).
            if self._tool_task_config.instruction_template is not None:
                tool_instruction_message = self._tool_task_config.instruction_template.format_message(role=Role.USER)
                tool_select_messages.append(tool_instruction_message)

            tools = [tool_spec.to_tool() for tool_spec in self._tool_specs]

            # Concurrently select tools and compress memory.
            self.ferry_to("select_tools", messages=tool_select_messages, tools=tools)
            self.ferry_to("compress_memory")

    @worker()
    async def select_tools(
        self,
        rtx = System("runtime_context"),
        *,
        messages: List[Message],
        tools: List[Tool],
    ) -> Tuple[List[ToolCall], Optional[str]]:
        """
        Select tools using LLM's tool selection capability.

        This method calls the LLM's aselect_tool method to select appropriate tools
        based on the conversation context.

        Parameters
        ----------
        messages : List[Message]
            The conversation history and current context.
        tools : List[Tool]
            Available tools that can be selected for use.

        Returns
        -------
        Tuple[List[ToolCall], Optional[str]]
            A tuple containing:
            - List of selected tool calls with determined parameters
            - Optional response text from the LLM
        """
        # Check if LLM supports ToolSelection protocol.
        tool_select_llm = self._tool_task_config.llm
        if not isinstance(tool_select_llm, ToolSelection):
            raise TypeError(f"LLM must support ToolSelection protocol, but {type(tool_select_llm)} does not.")

        # Call tool selection method.
        tool_calls, tool_response = await tool_select_llm.aselect_tool(
            messages=messages,
            tools=tools,
        )

        # Log selected tools in debug mode.
        top_options = self._get_top_running_options()
        if top_options.debug:
            tool_info_lines = []

            if tool_calls:
                for i, tool_call in enumerate(tool_calls, 1):
                    tool_info_lines.append(f"    Tool {i}: {tool_call.name}")
                    tool_info_lines.append(f"      id: {tool_call.id}")
                    tool_info_lines.append(f"      arguments: {tool_call.arguments}")
            else:
                tool_info_lines.append("    (No tools selected)")

            if tool_response:
                tool_info_lines.append(f"\n    LLM Response: {tool_response}")

            tool_info_lines_str = "\n".join(tool_info_lines)

            msg = (
                f"[{type(self).__name__}]-[{self.name}] 🔧 Tool Selection\n"
                f"{tool_info_lines_str}"
            )
            printer.print(msg, color="orange")

        # Match tool calls with tool specs.
        if tool_calls and self._tool_specs:
            matched_list = self._match_tool_calls_and_tool_specs(tool_calls, self._tool_specs)

            if matched_list:
                # Create tool workers dynamically.
                tool_worker_keys = []
                matched_tool_calls = []

                for tool_call, tool_spec in matched_list:
                    # Create the tool worker.
                    tool_worker_key = f"tool-<{tool_call.name}>-<{tool_call.id}>"
                    tool_worker_obj = tool_spec.create_worker()

                    # Register the tool worker.
                    self.add_worker(key=tool_worker_key, worker=tool_worker_obj)
                    tool_worker_keys.append(tool_worker_key)
                    matched_tool_calls.append(tool_call)

                    # Execute the tool worker in the next dynamic step (via ferry_to).
                    self.ferry_to(tool_worker_key, **tool_call.arguments)

                # Create collect_results worker dynamically.
                # After collecting, the tool calls and their results will be pushed to memory.
                def collect_wrapper(compression_timestep_and_tool_results: List[Any]) -> None:
                    tool_results = compression_timestep_and_tool_results[1:]
                    return self._collect_tools_results(tool_response, matched_tool_calls, tool_results)

                # To ensure that compression is performed based on the memory prior to tool selection, 
                # the collect_results worker must be started after the compress_memory worker.
                self.add_func_as_worker(
                    key=f"collect_results-<{uuid.uuid4().hex[:8]}>",
                    func=collect_wrapper,
                    dependencies=["compress_memory"] + tool_worker_keys,
                    args_mapping_rule=ArgsMappingRule.MERGE,
                    callback_builders=[WorkerCallbackBuilder(CollectResultsCleanupCallback)],
                )

                tool_selected = True
            else:
                tool_selected = False
        else:
            tool_selected = False

        local_space = self.get_local_space(rtx)

        # Count the number of times no tool is selected.
        if not tool_selected:
            no_tool_selected_cnt = local_space.get("no_tool_selected_cnt", 0)
            no_tool_selected_cnt = no_tool_selected_cnt + 1 if not tool_selected else 0
            local_space["no_tool_selected_cnt"] = no_tool_selected_cnt

            if no_tool_selected_cnt < self._stop_condition.max_consecutive_no_tool_selected:
                # If the limit is not exceeded, try one more time.
                self.ferry_to("observe")
            else:
                # If the limit is exceeded, finalize the answer.
                self.ferry_to("finalize_answer")

    @worker()
    async def compress_memory(self) -> Optional[int]:
        """
        Compress memory if necessary.

        Returns
        -------
        Optional[int]
            The timestep of the compression node if compression is needed, otherwise None.
        """
        top_options = self._get_top_running_options()
        should_compress = self._memory_manager.should_trigger_compression()

        if top_options.debug:
            msg = (
                f"[{type(self).__name__}]-[{self.name}] 🧭 Memory Check\n"
                f"    Compression Needed: {should_compress}"
            )
            printer.print(msg, color="olive")

        if should_compress:
            # Create the compression node. The summary is now complete.
            timestep = await self._memory_manager.acreate_compression()

            if top_options.debug:
                node: CompressionEpisodicNode = self._memory_manager.get_specified_memory_node(timestep)
                summary = await asyncio.wrap_future(node.summary)
                msg = (
                    f"[{type(self).__name__}]-[{self.name}] 📂 Memory Compression\n"
                    f"{summary}"
                )
                printer.print(msg, color="blue")

            return timestep
        else:
            return None

    @worker(is_output=True)
    async def finalize_answer(self) -> str:
        """
        Generate the final answer based on memory and goal using LLM.

        This worker is the output node of the automa. It builds context from memory,
        calls LLM to generate a comprehensive final answer based on the goal and
        conversation history.

        Returns
        -------
        str
            The final answer string.
        """
        # 1. Build context from memory.
        context: ReCentContext = await self._memory_manager.abuild_context()

        # 2. Build LLM message list for final answer generation.
        messages: List[Message] = []

        # Add system prompt for final answer generation (if configured).
        if self._answer_task_config.system_template is not None:
            system_message = self._answer_task_config.system_template.format_message(
                role=Role.SYSTEM,
                goal_content=context["goal_content"],
                goal_guidance=context["goal_guidance"],
            )
            messages.append(system_message)

        # Add memory messages (observation history).
        if context["memory_messages"]:
            messages.extend(context["memory_messages"])

        # Add instruction to generate final answer (if configured).
        if self._answer_task_config.instruction_template is not None:
            instruction_message = self._answer_task_config.instruction_template.format_message(role=Role.USER)
            messages.append(instruction_message)

        # 3. Call LLM to generate final answer.
        answer_llm = self._answer_task_config.llm
        response = await answer_llm.achat(messages=messages)
        final_answer = response.message.content

        # 4. Push final answer as a Message into the memory
        final_answer_msg = Message.from_text(text=final_answer, role=Role.AI)
        self._memory_manager.push_messages([final_answer_msg])

        return final_answer

    def _collect_tools_results(
        self,
        tool_response: Optional[str],
        tool_calls: Optional[List[ToolCall]],
        tool_results: List[Any],
    ) -> None:
        """
        Collect results from tool executions and push to memory.

        This method is used as a worker function to collect results from multiple tool workers. 
        It converts tool results to ToolResult messages and pushes them to memory.

        Parameters
        ----------
        tool_select_messages : List[Message]
            The input messages from the tool selection task.
        tool_response : Optional[str]
            The output response from the tool selection task, if any.
        tool_calls : Optional[List[ToolCall]]
            List of tool calls (optional, for building ToolResult messages).
        tool_results : List[Any]
            List of tool execution results (merged via ArgsMappingRule.MERGE).
        """
        # Push assistant message with tool calls and tool response to memory.
        if tool_calls or tool_response:
            assistant_message = Message.from_tool_call(tool_calls=tool_calls, text=tool_response)
            self._memory_manager.push_messages([assistant_message])

        # Convert tool results to ToolResult messages.
        tool_result_messages: List[Message] = []

        for tool_call, tool_result in zip(tool_calls, tool_results):
            # Convert tool result to string
            result_content = str(tool_result)
            # Create ToolResult message
            tool_result_message = Message.from_tool_result(
                tool_id=tool_call.id,
                content=result_content,
            )
            tool_result_messages.append(tool_result_message)

        # Push tool result messages to memory.
        if tool_result_messages:
            self._memory_manager.push_messages(tool_result_messages)

        # Log tool execution results in debug mode.
        top_options = self._get_top_running_options()
        if top_options.debug:
            result_lines = []
            for i, (tool_call, tool_result) in enumerate(zip(tool_calls, tool_results), 1):
                # Format tool result with MCP support
                result_content = stringify_tool_result(tool_result, verbose=top_options.verbose)
                result_content = result_content[:10000] + "..." if len(result_content) > 10000 else result_content
                result_lines.append(f"    Tool {i}: {tool_call.name}")
                result_lines.append(f"      id: {tool_call.id}")
                result_lines.append(f"      result: {result_content}")

            result_lines_str = "\n".join(result_lines)

            msg = (
                f"[{type(self).__name__}]-[{self.name}] 🚩 Tool Results\n"
                f"{result_lines_str}"
            )
            printer.print(msg, color="green")

        # Route to observe.
        self.ferry_to("observe")

    def _match_tool_calls_and_tool_specs(
        self,
        tool_calls: List[ToolCall],
        tool_specs: List[ToolSpec],
    ) -> List[Tuple[ToolCall, ToolSpec]]:
        matched = []
        for tool_call in tool_calls:
            for tool_spec in tool_specs:
                if tool_spec.tool_name == tool_call.name:
                    matched.append((tool_call, tool_spec))
                    break
        return matched

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["llm"] = self._llm

        # Only serialize ToolSpec instances that are not from builders.
        tool_specs_to_serialize = [
            tool_spec for tool_spec in self._tool_specs if not tool_spec._from_builder
        ]
        state_dict["tool_specs"] = tool_specs_to_serialize

        # Serialize tools_builders so they can be used to recreate ToolSpec instances during deserialization.
        state_dict["tools_builders"] = self._tools_builders or []

        state_dict["memory_manager"] = self._memory_manager
        state_dict["observation_task_config"] = self._observation_task_config
        state_dict["tool_task_config"] = self._tool_task_config
        state_dict["answer_task_config"] = self._answer_task_config
        state_dict["stop_condition"] = self._stop_condition
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._llm = state_dict["llm"]

        # Load ToolSpec instances that were directly serialized.
        self._tool_specs = state_dict.get("tool_specs") or []

        # Load tools_builders.
        self._tools_builders = state_dict.get("tools_builders") or []

        # Recreate ToolSpec instances from the deserialized builders.
        if self._tools_builders:
            for builder in self._tools_builders:
                response = builder.build()
                self._tool_specs.extend(response.get("tool_specs", []))

        self._stop_condition = state_dict["stop_condition"]
        self._memory_manager = state_dict["memory_manager"]

        self._observation_task_config = (
            state_dict.get("observation_task_config")
            or ObservationTaskConfig(llm=self._llm).to_llm_task_config()
        )
        self._tool_task_config = (
            state_dict.get("tool_task_config")
            or ToolTaskConfig(llm=self._llm).to_llm_task_config()
        )
        self._answer_task_config = (
            state_dict.get("answer_task_config")
            or AnswerTaskConfig(llm=self._llm).to_llm_task_config()
        )

initialize_task_goal

async
initialize_task_goal(
    goal: str, guidance: Optional[str] = None
)

Initialize the goal of the task and start the automa.

This worker is the entry point of the automa. It creates a goal node as the first episodic node in the memory sequence and optionally pushes initial user messages.

Parameters:

Name Type Description Default
goal str

The task goal.

required
guidance Optional[str]

The guidance for achieving the task goal.

None
Source code in bridgic/core/agentic/recent/_recent_automa.py
@worker(is_start=True)
async def initialize_task_goal(
    self,
    goal: str,
    guidance: Optional[str] = None,
):
    """
    Initialize the goal of the task and start the automa.

    This worker is the entry point of the automa. It creates a goal node as the first episodic 
    node in the memory sequence and optionally pushes initial user messages.

    Parameters
    ----------
    goal : str
        The task goal.
    guidance : Optional[str]
        The guidance for achieving the task goal.
    """
    if not goal:
        raise ValueError("Goal cannot be empty.")
    self._memory_manager.create_goal(goal, guidance)

    # Log task goal initialization in debug mode.
    top_options = self._get_top_running_options()
    if top_options.debug:
        msg = (
            f"[{type(self).__name__}]-[{self.name}] 🎯 Task Goal\n"
            f"{goal}" + (f"\n\n{guidance}" if guidance else '')
        )
        printer.print(msg)

    self.ferry_to("observe")

observe

async
observe(rtx=System('runtime_context'))

Observe the current state and determine if the goal has been achieved.

This worker builds context from memory, uses LLM (with StructuredOutput protocol) to determine if the goal has been achieved, and routes accordingly.

Source code in bridgic/core/agentic/recent/_recent_automa.py
@worker()
async def observe(self, rtx = System("runtime_context")):
    """
    Observe the current state and determine if the goal has been achieved.

    This worker builds context from memory, uses LLM (with `StructuredOutput` protocol) to 
    determine if the goal has been achieved, and routes accordingly.
    """
    local_space = self.get_local_space(rtx)

    # 0. If iteration count is bigger than max_iteration, route to finalize_answer and stop.
    iteration_cnt = local_space.get("iteration_cnt", 0) + 1
    local_space["iteration_cnt"] = iteration_cnt

    if self._stop_condition.max_iteration >= 0:
        if iteration_cnt > self._stop_condition.max_iteration:
            self.ferry_to("finalize_answer")
            return

    # 1. Build context from memory.
    context: ReCentContext = await self._memory_manager.abuild_context()

    # 2. Build LLM message list for goal status evaluation.
    messages: List[Message] = []

    # Add system message for goal evaluation (if configured).
    if self._observation_task_config.system_template is not None:
        system_message = self._observation_task_config.system_template.format_message(
            role=Role.SYSTEM,
            goal_content=context["goal_content"],
            goal_guidance=context["goal_guidance"],
        )
        messages.append(system_message)

    # Add memory messages (observation history).
    if context["memory_messages"]:
        messages.extend(context["memory_messages"])

    # Add instruction message (if configured).
    if self._observation_task_config.instruction_template is not None:
        instruction_message = self._observation_task_config.instruction_template.format_message(role=Role.USER)
        messages.append(instruction_message)

    # 3. Call LLM with StructuredOutput to get goal status.
    observe_llm = self._observation_task_config.llm
    if not isinstance(observe_llm, StructuredOutput):
        raise TypeError(f"LLM must support StructuredOutput protocol, but {type(observe_llm)} does not.")

    goal_status: GoalStatus = await observe_llm.astructured_output(
        messages=messages,
        constraint=PydanticModel(model=GoalStatus),
    )
    observation_template = EjinjaPromptTemplate(
        "Goal Status:\n"
        "- Achieved: {%- if goal_status.goal_achieved %}goal_status.goal_achieved{% else %}No{% endif %}\n"
        "- Thinking: {%- if goal_status.brief_thinking %}{{ goal_status.brief_thinking }}{% else %}(No thinking){% endif %}"
    )
    observation_message = observation_template.format_message(
        role=Role.AI,
        goal_status=goal_status,
    )
    self._memory_manager.push_messages([observation_message])

    # Log observation result in debug mode.
    top_options = self._get_top_running_options()
    if top_options.debug:
        msg = (
            f"[{type(self).__name__}]-[{self.name}] 👀 Observation\n"
            f"    Iteration: {iteration_cnt}\n"
            f"    Achieved: {goal_status.goal_achieved}\n"
            f"    Thinking: {goal_status.brief_thinking}"
        )
        printer.print(msg, color="gray")

    # 4. Dynamic routing based on goal status.
    if goal_status.goal_achieved or not self._tool_specs:
        # Goal achieved, route to finalize_answer.
        self.ferry_to("finalize_answer")
    else:
        # Goal not achieved, prepare messages and tools, then route to select_tools.
        tool_select_messages = []

        # Add system message (if configured)
        if self._tool_task_config.system_template is not None:
            tool_system_message = self._tool_task_config.system_template.format_message(
                role=Role.SYSTEM,
                goal_content=context["goal_content"],
                goal_guidance=context["goal_guidance"],
            )
            tool_select_messages.append(tool_system_message)

        # Add memory messages
        tool_select_messages.extend(context["memory_messages"])

        # Add instruction message (if configured).
        if self._tool_task_config.instruction_template is not None:
            tool_instruction_message = self._tool_task_config.instruction_template.format_message(role=Role.USER)
            tool_select_messages.append(tool_instruction_message)

        tools = [tool_spec.to_tool() for tool_spec in self._tool_specs]

        # Concurrently select tools and compress memory.
        self.ferry_to("select_tools", messages=tool_select_messages, tools=tools)
        self.ferry_to("compress_memory")

select_tools

async
select_tools(
    rtx=System("runtime_context"),
    *,
    messages: List[Message],
    tools: List[Tool]
) -> Tuple[List[ToolCall], Optional[str]]

Select tools using LLM's tool selection capability.

This method calls the LLM's aselect_tool method to select appropriate tools based on the conversation context.

Parameters:

Name Type Description Default
messages List[Message]

The conversation history and current context.

required
tools List[Tool]

Available tools that can be selected for use.

required

Returns:

Type Description
Tuple[List[ToolCall], Optional[str]]

A tuple containing: - List of selected tool calls with determined parameters - Optional response text from the LLM

Source code in bridgic/core/agentic/recent/_recent_automa.py
@worker()
async def select_tools(
    self,
    rtx = System("runtime_context"),
    *,
    messages: List[Message],
    tools: List[Tool],
) -> Tuple[List[ToolCall], Optional[str]]:
    """
    Select tools using LLM's tool selection capability.

    This method calls the LLM's aselect_tool method to select appropriate tools
    based on the conversation context.

    Parameters
    ----------
    messages : List[Message]
        The conversation history and current context.
    tools : List[Tool]
        Available tools that can be selected for use.

    Returns
    -------
    Tuple[List[ToolCall], Optional[str]]
        A tuple containing:
        - List of selected tool calls with determined parameters
        - Optional response text from the LLM
    """
    # Check if LLM supports ToolSelection protocol.
    tool_select_llm = self._tool_task_config.llm
    if not isinstance(tool_select_llm, ToolSelection):
        raise TypeError(f"LLM must support ToolSelection protocol, but {type(tool_select_llm)} does not.")

    # Call tool selection method.
    tool_calls, tool_response = await tool_select_llm.aselect_tool(
        messages=messages,
        tools=tools,
    )

    # Log selected tools in debug mode.
    top_options = self._get_top_running_options()
    if top_options.debug:
        tool_info_lines = []

        if tool_calls:
            for i, tool_call in enumerate(tool_calls, 1):
                tool_info_lines.append(f"    Tool {i}: {tool_call.name}")
                tool_info_lines.append(f"      id: {tool_call.id}")
                tool_info_lines.append(f"      arguments: {tool_call.arguments}")
        else:
            tool_info_lines.append("    (No tools selected)")

        if tool_response:
            tool_info_lines.append(f"\n    LLM Response: {tool_response}")

        tool_info_lines_str = "\n".join(tool_info_lines)

        msg = (
            f"[{type(self).__name__}]-[{self.name}] 🔧 Tool Selection\n"
            f"{tool_info_lines_str}"
        )
        printer.print(msg, color="orange")

    # Match tool calls with tool specs.
    if tool_calls and self._tool_specs:
        matched_list = self._match_tool_calls_and_tool_specs(tool_calls, self._tool_specs)

        if matched_list:
            # Create tool workers dynamically.
            tool_worker_keys = []
            matched_tool_calls = []

            for tool_call, tool_spec in matched_list:
                # Create the tool worker.
                tool_worker_key = f"tool-<{tool_call.name}>-<{tool_call.id}>"
                tool_worker_obj = tool_spec.create_worker()

                # Register the tool worker.
                self.add_worker(key=tool_worker_key, worker=tool_worker_obj)
                tool_worker_keys.append(tool_worker_key)
                matched_tool_calls.append(tool_call)

                # Execute the tool worker in the next dynamic step (via ferry_to).
                self.ferry_to(tool_worker_key, **tool_call.arguments)

            # Create collect_results worker dynamically.
            # After collecting, the tool calls and their results will be pushed to memory.
            def collect_wrapper(compression_timestep_and_tool_results: List[Any]) -> None:
                tool_results = compression_timestep_and_tool_results[1:]
                return self._collect_tools_results(tool_response, matched_tool_calls, tool_results)

            # To ensure that compression is performed based on the memory prior to tool selection, 
            # the collect_results worker must be started after the compress_memory worker.
            self.add_func_as_worker(
                key=f"collect_results-<{uuid.uuid4().hex[:8]}>",
                func=collect_wrapper,
                dependencies=["compress_memory"] + tool_worker_keys,
                args_mapping_rule=ArgsMappingRule.MERGE,
                callback_builders=[WorkerCallbackBuilder(CollectResultsCleanupCallback)],
            )

            tool_selected = True
        else:
            tool_selected = False
    else:
        tool_selected = False

    local_space = self.get_local_space(rtx)

    # Count the number of times no tool is selected.
    if not tool_selected:
        no_tool_selected_cnt = local_space.get("no_tool_selected_cnt", 0)
        no_tool_selected_cnt = no_tool_selected_cnt + 1 if not tool_selected else 0
        local_space["no_tool_selected_cnt"] = no_tool_selected_cnt

        if no_tool_selected_cnt < self._stop_condition.max_consecutive_no_tool_selected:
            # If the limit is not exceeded, try one more time.
            self.ferry_to("observe")
        else:
            # If the limit is exceeded, finalize the answer.
            self.ferry_to("finalize_answer")

compress_memory

async
compress_memory() -> Optional[int]

Compress memory if necessary.

Returns:

Type Description
Optional[int]

The timestep of the compression node if compression is needed, otherwise None.

Source code in bridgic/core/agentic/recent/_recent_automa.py
@worker()
async def compress_memory(self) -> Optional[int]:
    """
    Compress memory if necessary.

    Returns
    -------
    Optional[int]
        The timestep of the compression node if compression is needed, otherwise None.
    """
    top_options = self._get_top_running_options()
    should_compress = self._memory_manager.should_trigger_compression()

    if top_options.debug:
        msg = (
            f"[{type(self).__name__}]-[{self.name}] 🧭 Memory Check\n"
            f"    Compression Needed: {should_compress}"
        )
        printer.print(msg, color="olive")

    if should_compress:
        # Create the compression node. The summary is now complete.
        timestep = await self._memory_manager.acreate_compression()

        if top_options.debug:
            node: CompressionEpisodicNode = self._memory_manager.get_specified_memory_node(timestep)
            summary = await asyncio.wrap_future(node.summary)
            msg = (
                f"[{type(self).__name__}]-[{self.name}] 📂 Memory Compression\n"
                f"{summary}"
            )
            printer.print(msg, color="blue")

        return timestep
    else:
        return None

finalize_answer

async
finalize_answer() -> str

Generate the final answer based on memory and goal using LLM.

This worker is the output node of the automa. It builds context from memory, calls LLM to generate a comprehensive final answer based on the goal and conversation history.

Returns:

Type Description
str

The final answer string.

Source code in bridgic/core/agentic/recent/_recent_automa.py
@worker(is_output=True)
async def finalize_answer(self) -> str:
    """
    Generate the final answer based on memory and goal using LLM.

    This worker is the output node of the automa. It builds context from memory,
    calls LLM to generate a comprehensive final answer based on the goal and
    conversation history.

    Returns
    -------
    str
        The final answer string.
    """
    # 1. Build context from memory.
    context: ReCentContext = await self._memory_manager.abuild_context()

    # 2. Build LLM message list for final answer generation.
    messages: List[Message] = []

    # Add system prompt for final answer generation (if configured).
    if self._answer_task_config.system_template is not None:
        system_message = self._answer_task_config.system_template.format_message(
            role=Role.SYSTEM,
            goal_content=context["goal_content"],
            goal_guidance=context["goal_guidance"],
        )
        messages.append(system_message)

    # Add memory messages (observation history).
    if context["memory_messages"]:
        messages.extend(context["memory_messages"])

    # Add instruction to generate final answer (if configured).
    if self._answer_task_config.instruction_template is not None:
        instruction_message = self._answer_task_config.instruction_template.format_message(role=Role.USER)
        messages.append(instruction_message)

    # 3. Call LLM to generate final answer.
    answer_llm = self._answer_task_config.llm
    response = await answer_llm.achat(messages=messages)
    final_answer = response.message.content

    # 4. Push final answer as a Message into the memory
    final_answer_msg = Message.from_text(text=final_answer, role=Role.AI)
    self._memory_manager.push_messages([final_answer_msg])

    return final_answer

StopCondition

Bases: BaseModel

Stop condition configuration for ReCentAutoma.

The different stop conditions below are combined with logic "or". In other words, the process will stop if any condition is met.

Attributes:

Name Type Description
max_iteration int

Maximum number of times to enter the observe node before finalizing the answer. Defaults to -1 which means there is no limit to the number of iterations.

max_consecutive_no_tool_selected int

Maximum number of consecutive times no tool is selected before finalizing the answer. Defaults to 3.

Source code in bridgic/core/agentic/recent/_recent_automa.py
class StopCondition(BaseModel):
    """
    Stop condition configuration for ReCentAutoma.

    The different stop conditions below are combined with logic "or". In other words, the process 
    will stop if any condition is met.

    Attributes
    ----------
    max_iteration : int
        Maximum number of times to enter the observe node before finalizing the answer.
        Defaults to -1 which means there is no limit to the number of iterations.
    max_consecutive_no_tool_selected : int
        Maximum number of consecutive times no tool is selected before finalizing the answer.
        Defaults to 3.
    """

    max_iteration: int = Field(default=-1)
    """Maximum number of times to enter the observe node before finalizing the answer."""

    max_consecutive_no_tool_selected: int = Field(default=3)
    """Maximum number of consecutive times no tool is selected before finalizing the answer."""

max_iteration class-attribute instance-attribute

max_iteration: int = Field(default=-1)

Maximum number of times to enter the observe node before finalizing the answer.

max_consecutive_no_tool_selected class-attribute instance-attribute

max_consecutive_no_tool_selected: int = Field(default=3)

Maximum number of consecutive times no tool is selected before finalizing the answer.

ReCentMemoryConfig

Bases: Serializable

This configuration class defines the memory management strategy that will compress the conversation history when certain conditions are met.

Attributes:

Name Type Description
llm BaseLlm

The LLM instance used for memory compression operations.

max_node_size int

Maximum number of memory nodes before triggering compression. Defaults to 10.

max_token_size int

Maximum number of tokens before triggering compression. Defaults to 8192 (1024 * 8).

system_template str

Jinja2 prompt template for the system prompt used in memory compression, which accepts parameters: goal and guidance.

instruction_template str

Jinja2 prompt template for the instruction prompt used in memory compression.

token_count_callback Optional[Callable[[str], int]]

Optional callback function to calculate token count from text. If None, defaults to estimate_token_count which uses a simple approximation (character_count / 4). The callback should accept a text string and return the token count.

Source code in bridgic/core/agentic/recent/_recent_memory_config.py
class ReCentMemoryConfig(Serializable):
    """
    This configuration class defines the memory management strategy that will compress
    the conversation history when certain conditions are met.

    Attributes
    ----------
    llm : BaseLlm
        The LLM instance used for memory compression operations.
    max_node_size : int
        Maximum number of memory nodes before triggering compression.
        Defaults to 10.
    max_token_size : int
        Maximum number of tokens before triggering compression.
        Defaults to 8192 (1024 * 8).
    system_template : str
        Jinja2 prompt template for the system prompt used in memory compression, which accepts 
        parameters: `goal` and `guidance`.
    instruction_template : str
        Jinja2 prompt template for the instruction prompt used in memory compression.
    token_count_callback : Optional[Callable[[str], int]]
        Optional callback function to calculate token count from text.
        If None, defaults to `estimate_token_count` which uses a simple approximation
        (character_count / 4). The callback should accept a text string and return the token count.
    """

    llm: BaseLlm
    """The LLM used for memory compression."""

    max_node_size: int
    """Threshold for the number of memory nodes to trigger memory compression."""

    max_token_size: int
    """Threshold for the number of tokens to trigger memory compression."""

    system_template: EjinjaPromptTemplate
    """Template for system prompt used in memory compression."""

    instruction_template: EjinjaPromptTemplate
    """Instruction prompt template used in memory compression."""

    token_count_callback: Callable[[str], int]
    """Callback function to calculate token count from text. Defaults to estimate_token_count."""

    def __init__(
        self,
        llm: BaseLlm,
        max_node_size: int = 20,
        max_token_size: int = 1024 * 16,
        system_template: Optional[str] = DEFAULT_SYSTEM_PROMPT_TEMPLATE,
        instruction_template: Optional[str] = DEFAULT_INSTRUCTION_PROMPT_TEMPLATE,
        token_count_callback: Optional[Callable[[str], int]] = estimate_token_count,
    ):
        self.llm = llm
        self.max_node_size = max_node_size
        self.max_token_size = max_token_size
        self.token_count_callback = token_count_callback if token_count_callback is not None else estimate_token_count

        # Convert string templates to EjinjaPromptTemplate instances
        # If None is explicitly passed, use default templates
        if system_template is None:
            system_template = DEFAULT_SYSTEM_PROMPT_TEMPLATE
        self.system_template = EjinjaPromptTemplate(system_template)

        if instruction_template is None:
            instruction_template = DEFAULT_INSTRUCTION_PROMPT_TEMPLATE
        self.instruction_template = EjinjaPromptTemplate(instruction_template)

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = {}
        state_dict["llm"] = self.llm
        state_dict["max_node_size"] = self.max_node_size
        state_dict["max_token_size"] = self.max_token_size
        state_dict["token_count_callback"] = self.token_count_callback.__module__ + "." + self.token_count_callback.__qualname__
        state_dict["system_template"] = self.system_template
        state_dict["instruction_template"] = self.instruction_template
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        self.llm = state_dict["llm"]
        self.max_node_size = state_dict["max_node_size"]
        self.max_token_size = state_dict["max_token_size"]
        self.token_count_callback = load_qualified_class_or_func(state_dict["token_count_callback"])
        self.system_template = state_dict["system_template"]
        self.instruction_template = state_dict["instruction_template"]

llm instance-attribute

llm: BaseLlm = llm

The LLM used for memory compression.

max_node_size instance-attribute

max_node_size: int = max_node_size

Threshold for the number of memory nodes to trigger memory compression.

max_token_size instance-attribute

max_token_size: int = max_token_size

Threshold for the number of tokens to trigger memory compression.

system_template instance-attribute

system_template: EjinjaPromptTemplate = (
    EjinjaPromptTemplate(system_template)
)

Template for system prompt used in memory compression.

instruction_template instance-attribute

instruction_template: EjinjaPromptTemplate = (
    EjinjaPromptTemplate(instruction_template)
)

Instruction prompt template used in memory compression.

token_count_callback instance-attribute

token_count_callback: Callable[[str], int] = (
    token_count_callback
    if token_count_callback is not None
    else estimate_token_count
)

Callback function to calculate token count from text. Defaults to estimate_token_count.

ObservationTaskConfig

Configuration for the observation task in ReCentAutoma.

This class allows configuring the LLM and prompt templates for the observation task. When system_template or instruction_template is None, the default template will be used.

Attributes:

Name Type Description
llm BaseLlm

The LLM instance to use for this task.

system_template Optional[Union[str, EjinjaPromptTemplate]]

System prompt template. If None, uses DEFAULT_OBSERVE_SYSTEM_TEMPLATE.

instruction_template Optional[Union[str, EjinjaPromptTemplate]]

Instruction prompt template. If None, uses DEFAULT_OBSERVE_INSTRUCTION_TEMPLATE.

Source code in bridgic/core/agentic/recent/_recent_task_configs.py
class ObservationTaskConfig:
    """
    Configuration for the observation task in ReCentAutoma.

    This class allows configuring the LLM and prompt templates for the observation task.
    When system_template or instruction_template is None, the default template will be used.

    Attributes
    ----------
    llm : BaseLlm
        The LLM instance to use for this task.
    system_template : Optional[Union[str, EjinjaPromptTemplate]]
        System prompt template. If None, uses DEFAULT_OBSERVE_SYSTEM_TEMPLATE.
    instruction_template : Optional[Union[str, EjinjaPromptTemplate]]
        Instruction prompt template. If None, uses DEFAULT_OBSERVE_INSTRUCTION_TEMPLATE.
    """

    def __init__(
        self,
        llm: BaseLlm,
        system_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_OBSERVATION_SYSTEM_TEMPLATE,
        instruction_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_OBSERVATION_INSTRUCTION_TEMPLATE,
    ):
        self.llm = llm
        self.system_template = system_template or DEFAULT_OBSERVATION_SYSTEM_TEMPLATE
        self.instruction_template = instruction_template or DEFAULT_OBSERVATION_INSTRUCTION_TEMPLATE

    def to_llm_task_config(self) -> LlmTaskConfig:
        return LlmTaskConfig(
            llm=self.llm,
            system_template=self.system_template,
            instruction_template=self.instruction_template,
        )

ToolTaskConfig

Configuration for the tool selection task in ReCentAutoma.

This class allows configuring the LLM and prompt templates for the tool selection task. When system_template or instruction_template is None, the default template will be used.

Attributes:

Name Type Description
llm BaseLlm

The LLM instance to use for this task.

system_template Optional[Union[str, EjinjaPromptTemplate]]

System prompt template. If None, uses DEFAULT_TOOL_SELECTION_SYSTEM_TEMPLATE.

instruction_template Optional[Union[str, EjinjaPromptTemplate]]

Instruction prompt template. If None, uses DEFAULT_TOOL_SELECTION_INSTRUCTION_TEMPLATE.

Source code in bridgic/core/agentic/recent/_recent_task_configs.py
class ToolTaskConfig:
    """
    Configuration for the tool selection task in ReCentAutoma.

    This class allows configuring the LLM and prompt templates for the tool selection task.
    When system_template or instruction_template is None, the default template will be used.

    Attributes
    ----------
    llm : BaseLlm
        The LLM instance to use for this task.
    system_template : Optional[Union[str, EjinjaPromptTemplate]]
        System prompt template. If None, uses DEFAULT_TOOL_SELECTION_SYSTEM_TEMPLATE.
    instruction_template : Optional[Union[str, EjinjaPromptTemplate]]
        Instruction prompt template. If None, uses DEFAULT_TOOL_SELECTION_INSTRUCTION_TEMPLATE.
    """

    def __init__(
        self,
        llm: BaseLlm,
        system_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_TOOL_SELECTION_SYSTEM_TEMPLATE,
        instruction_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_TOOL_SELECTION_INSTRUCTION_TEMPLATE,
    ):
        self.llm = llm
        self.system_template = system_template or DEFAULT_TOOL_SELECTION_SYSTEM_TEMPLATE
        self.instruction_template = instruction_template or DEFAULT_TOOL_SELECTION_INSTRUCTION_TEMPLATE

    def to_llm_task_config(self) -> LlmTaskConfig:
        return LlmTaskConfig(
            llm=self.llm,
            system_template=self.system_template,
            instruction_template=self.instruction_template,
        )

AnswerTaskConfig

Configuration for the answer generation task in ReCentAutoma.

This class allows configuring the LLM and prompt templates for the answer generation task. When system_template or instruction_template is None, the default template will be used.

Attributes:

Name Type Description
llm BaseLlm

The LLM instance to use for this task.

system_template Optional[Union[str, EjinjaPromptTemplate]]

System prompt template. If None, uses DEFAULT_ANSWER_SYSTEM_TEMPLATE.

instruction_template Optional[Union[str, EjinjaPromptTemplate]]

Instruction prompt template. If None, uses DEFAULT_ANSWER_INSTRUCTION_TEMPLATE.

Source code in bridgic/core/agentic/recent/_recent_task_configs.py
class AnswerTaskConfig:
    """
    Configuration for the answer generation task in ReCentAutoma.

    This class allows configuring the LLM and prompt templates for the answer generation task.
    When system_template or instruction_template is None, the default template will be used.

    Attributes
    ----------
    llm : BaseLlm
        The LLM instance to use for this task.
    system_template : Optional[Union[str, EjinjaPromptTemplate]]
        System prompt template. If None, uses DEFAULT_ANSWER_SYSTEM_TEMPLATE.
    instruction_template : Optional[Union[str, EjinjaPromptTemplate]]
        Instruction prompt template. If None, uses DEFAULT_ANSWER_INSTRUCTION_TEMPLATE.
    """

    def __init__(
        self,
        llm: BaseLlm,
        system_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_ANSWER_SYSTEM_TEMPLATE,
        instruction_template: Optional[Union[str, EjinjaPromptTemplate]] = DEFAULT_ANSWER_INSTRUCTION_TEMPLATE,
    ):
        self.llm = llm
        self.system_template = system_template or DEFAULT_ANSWER_SYSTEM_TEMPLATE
        self.instruction_template = instruction_template or DEFAULT_ANSWER_INSTRUCTION_TEMPLATE

    def to_llm_task_config(self) -> LlmTaskConfig:
        return LlmTaskConfig(
            llm=self.llm,
            system_template=self.system_template,
            instruction_template=self.instruction_template,
        )

EpisodicNodeTree

Bases: Serializable

EpisodicNodeTree is a data structure responsible for managing the sequence of episodic memory nodes, which is the core data structure of the ReCENT Algorithm.

ReCENT Algorithm (Recursive Compressed Episodic Node Tree Algorithm) is an algorithm designed to address issues such as context explosion and goal drift, by employing a recursive memory compression mechanism. In this algorithm, each episodic node will serve as a container of memory and could be tightly organized together to form a more efficient and reliable memory for the higher agentic system.

Notes:
  • This data structure only supports appending new nodes; deletion or insertion is not allowed.
  • All write operations are protected by a lock to ensure atomicity and preserve ordered nature of the structure.
  • The data structure does not and should not perform any computationally expensive operations such as summarization.
Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
class EpisodicNodeTree(Serializable):
    """
    EpisodicNodeTree is a data structure responsible for managing the sequence of episodic memory nodes, 
    which is the core data structure of the ReCENT Algorithm.

    **ReCENT Algorithm** (Recursive Compressed Episodic Node Tree Algorithm) is an algorithm designed to 
    address issues such as context explosion and goal drift, by employing a recursive memory compression 
    mechanism. In this algorithm, each episodic node will serve as a container of memory and could be 
    tightly organized together to form a more efficient and reliable memory for the higher agentic system.

    Notes:
    ------
    - This data structure only supports appending new nodes; deletion or insertion is not allowed.
    - All write operations are protected by a lock to ensure atomicity and preserve ordered nature of the structure.
    - The data structure does not and should not perform any computationally expensive operations such as summarization.
    """

    _lock: RLock
    """Reentrant lock for thread-safe node write operations."""

    _node_sequence: List[BaseEpisodicNode]
    """The sequence of nodes in the episodic node tree."""

    _goal_node_timestep: int
    """The timestep of the current goal node. If no goal node exists, it will be -1."""
    _non_goal_node_timesteps: List[int]
    """The timesteps of the non-goal nodes."""

    def __init__(self):
        self._node_sequence = []
        self._goal_node_timestep = -1
        self._non_goal_node_timesteps = []
        self._lock = RLock()

    def get_node(self, timestep: int) -> Optional[BaseEpisodicNode]:
        """
        Get a node by its timestep.

        Parameters
        ----------
        timestep : int
            The timestep of the node.

        Returns
        -------
        Optional[BaseEpisodicNode]
            The node with the given timestep, or None if not found.
        """
        if timestep < len(self._node_sequence):
            return self._node_sequence[timestep]
        return None

    def get_goal_node(self) -> Optional[GoalEpisodicNode]:
        """
        Get the current goal node.

        Returns
        -------
        Optional[GoalEpisodicNode]
            The current goal node, or None if no goal node exists.
        """
        if self._goal_node_timestep != -1:
            return cast(GoalEpisodicNode, self.get_node(self._goal_node_timestep))
        return None

    def get_non_goal_nodes(self) -> List[BaseEpisodicNode]:
        """
        Get all directly accessible non-goal nodes (sorted by timestep).

        Returns
        -------
        List[BaseEpisodicNode]
            List of non-goal nodes sorted by timestep.
        """
        nodes = []
        for timestep in self._non_goal_node_timesteps:
            nodes.append(self.get_node(timestep))
        return nodes

    def _get_next_timestep(self) -> int:
        """
        Get the next available timestep.

        Returns
        -------
        int
            The next available timestep.
        """
        return len(self._node_sequence)

    def _mark_tail_leaf_node_not_appendable(self) -> None:
        """
        Mark the tail appendable leaf node as not appendable.
        """
        if self._node_sequence:
            last_node = self._node_sequence[-1]
            if isinstance(last_node, LeafEpisodicNode) and last_node.message_appendable:
                last_node.message_appendable = False

    def add_goal_node(self, goal: str, guidance: Optional[str] = None) -> int:
        """
        Add a new goal node.

        If a previous goal node exists, its timestep will be linked in the new goal node.
        The tail appendable leaf node (if exists) will be closed before adding the new goal node.

        Parameters
        ----------
        goal : str
            The goal content.
        guidance : Optional[str]
            Optional execution guidance.

        Returns
        -------
        int
            The timestep of the new goal node.
        """
        with self._lock:
            # Close the tail appendable leaf node.
            self._mark_tail_leaf_node_not_appendable()

            # Get the next timestep.
            new_timestep = self._get_next_timestep()

            # Create the new goal node and record the timestep of the previous goal node.
            goal_node = GoalEpisodicNode(
                timestep=new_timestep,
                goal=goal,
                guidance=guidance,
                previous_goal_node_timestep=self._goal_node_timestep
            )

            # Add the new goal node to the sequence and update the goal timestep.
            self._node_sequence.append(goal_node)
            self._goal_node_timestep = new_timestep

        return new_timestep

    def add_leaf_node(self, messages: List[Message]) -> int:
        """
        Add a new leaf node that is appendable to new messages.

        The tail appendable leaf node will be closed before adding the new node.

        Parameters
        ----------
        messages : List[Message]
            The original message sequence.

        Returns
        -------
        int
            The timestep of the new leaf node.
        """
        with self._lock:
            # Close the tail appendable leaf node.
            self._mark_tail_leaf_node_not_appendable()

            # Get the next timestep.
            new_timestep = self._get_next_timestep()

            # Create a new appendable leaf node.
            leaf_node = LeafEpisodicNode(
                timestep=new_timestep,
                messages=messages
            )

            # Add the new leaf node to the sequence and update the non-goal node timesteps.
            self._node_sequence.append(leaf_node)
            self._non_goal_node_timesteps.append(new_timestep)

        return new_timestep

    def add_compression_node(self, compressed_timesteps: List[int], summary: Optional[str] = None) -> int:
        """
        Add a new compression node that summarizes the given non-goal nodes.

        Before creating the compression node, close the last leaf node if it is still appendable.
        The `compressed_timesteps` list tells which nodes to summarize. Those nodes are then removed 
        from the active list, and the new compression node replaces them in the active non-goal node list.

        Parameters
        ----------
        compressed_timesteps : List[int]
            List of timesteps of the compressed nodes.
        summary : Optional[str]
            The compression summary content. If not provided, an unset concurrent.futures.Future of summary will be created.

        Returns
        -------
        int
            The timestep of the new compression node.
        """
        with self._lock:
            # Close the tail appendable leaf node.
            self._mark_tail_leaf_node_not_appendable()

            # Get the next timestep.
            new_timestep = self._get_next_timestep()

            # Create the compression node.
            compression_node = CompressionEpisodicNode(
                timestep=new_timestep,
                compressed_timesteps=compressed_timesteps,
                summary=summary,
            )

            # Add the new compression node to the sequence and update the non-goal node timesteps.
            self._node_sequence.append(compression_node)
            self._non_goal_node_timesteps.append(new_timestep)

            # Remove the timesteps of the compressed nodes from _non_goal_node_timesteps.
            self._non_goal_node_timesteps = [
                t for t in self._non_goal_node_timesteps 
                if t not in compressed_timesteps
            ]

        return new_timestep

    def get_tail_appendable_leaf_node(self) -> Optional[LeafEpisodicNode]:
        """
        Get the tail appendable leaf node if it exists.

        Returns
        -------
        Optional[LeafEpisodicNode]
            The tail appendable leaf node, or None if not found.
        """
        if self._node_sequence:
            last_node = self._node_sequence[-1]
            if isinstance(last_node, LeafEpisodicNode) and last_node.message_appendable:
                return last_node
        return None

    def dump_to_dict(self) -> Dict[str, Any]:
        return {
            "node_sequence": [node.dump_to_dict() for node in self._node_sequence],
            "goal_node_timestep": self._goal_node_timestep,
            "non_goal_node_timesteps": self._non_goal_node_timesteps,
        }

    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        self._node_sequence = []
        node_dicts = state_dict.get("node_sequence", [])

        for node_dict in node_dicts:
            node_type = NodeType(node_dict["node_type"])
            node = None

            if node_type == NodeType.GOAL:
                node = GoalEpisodicNode(timestep=0, goal="")
            elif node_type == NodeType.LEAF:
                node = LeafEpisodicNode(timestep=0)
            elif node_type == NodeType.COMPRESSION:
                node = CompressionEpisodicNode(timestep=0, compressed_timesteps=[])
            else:
                raise ValueError(f"Invalid node type: {node_type}")

            node.load_from_dict(node_dict)
            self._node_sequence.append(node)

        self._goal_node_timestep = state_dict.get("goal_node_timestep", -1)
        self._non_goal_node_timesteps = state_dict.get("non_goal_node_timesteps", [])

get_node

get_node(timestep: int) -> Optional[BaseEpisodicNode]

Get a node by its timestep.

Parameters:

Name Type Description Default
timestep int

The timestep of the node.

required

Returns:

Type Description
Optional[BaseEpisodicNode]

The node with the given timestep, or None if not found.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def get_node(self, timestep: int) -> Optional[BaseEpisodicNode]:
    """
    Get a node by its timestep.

    Parameters
    ----------
    timestep : int
        The timestep of the node.

    Returns
    -------
    Optional[BaseEpisodicNode]
        The node with the given timestep, or None if not found.
    """
    if timestep < len(self._node_sequence):
        return self._node_sequence[timestep]
    return None

get_goal_node

get_goal_node() -> Optional[GoalEpisodicNode]

Get the current goal node.

Returns:

Type Description
Optional[GoalEpisodicNode]

The current goal node, or None if no goal node exists.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def get_goal_node(self) -> Optional[GoalEpisodicNode]:
    """
    Get the current goal node.

    Returns
    -------
    Optional[GoalEpisodicNode]
        The current goal node, or None if no goal node exists.
    """
    if self._goal_node_timestep != -1:
        return cast(GoalEpisodicNode, self.get_node(self._goal_node_timestep))
    return None

get_non_goal_nodes

get_non_goal_nodes() -> List[BaseEpisodicNode]

Get all directly accessible non-goal nodes (sorted by timestep).

Returns:

Type Description
List[BaseEpisodicNode]

List of non-goal nodes sorted by timestep.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def get_non_goal_nodes(self) -> List[BaseEpisodicNode]:
    """
    Get all directly accessible non-goal nodes (sorted by timestep).

    Returns
    -------
    List[BaseEpisodicNode]
        List of non-goal nodes sorted by timestep.
    """
    nodes = []
    for timestep in self._non_goal_node_timesteps:
        nodes.append(self.get_node(timestep))
    return nodes

add_goal_node

add_goal_node(
    goal: str, guidance: Optional[str] = None
) -> int

Add a new goal node.

If a previous goal node exists, its timestep will be linked in the new goal node. The tail appendable leaf node (if exists) will be closed before adding the new goal node.

Parameters:

Name Type Description Default
goal str

The goal content.

required
guidance Optional[str]

Optional execution guidance.

None

Returns:

Type Description
int

The timestep of the new goal node.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def add_goal_node(self, goal: str, guidance: Optional[str] = None) -> int:
    """
    Add a new goal node.

    If a previous goal node exists, its timestep will be linked in the new goal node.
    The tail appendable leaf node (if exists) will be closed before adding the new goal node.

    Parameters
    ----------
    goal : str
        The goal content.
    guidance : Optional[str]
        Optional execution guidance.

    Returns
    -------
    int
        The timestep of the new goal node.
    """
    with self._lock:
        # Close the tail appendable leaf node.
        self._mark_tail_leaf_node_not_appendable()

        # Get the next timestep.
        new_timestep = self._get_next_timestep()

        # Create the new goal node and record the timestep of the previous goal node.
        goal_node = GoalEpisodicNode(
            timestep=new_timestep,
            goal=goal,
            guidance=guidance,
            previous_goal_node_timestep=self._goal_node_timestep
        )

        # Add the new goal node to the sequence and update the goal timestep.
        self._node_sequence.append(goal_node)
        self._goal_node_timestep = new_timestep

    return new_timestep

add_leaf_node

add_leaf_node(messages: List[Message]) -> int

Add a new leaf node that is appendable to new messages.

The tail appendable leaf node will be closed before adding the new node.

Parameters:

Name Type Description Default
messages List[Message]

The original message sequence.

required

Returns:

Type Description
int

The timestep of the new leaf node.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def add_leaf_node(self, messages: List[Message]) -> int:
    """
    Add a new leaf node that is appendable to new messages.

    The tail appendable leaf node will be closed before adding the new node.

    Parameters
    ----------
    messages : List[Message]
        The original message sequence.

    Returns
    -------
    int
        The timestep of the new leaf node.
    """
    with self._lock:
        # Close the tail appendable leaf node.
        self._mark_tail_leaf_node_not_appendable()

        # Get the next timestep.
        new_timestep = self._get_next_timestep()

        # Create a new appendable leaf node.
        leaf_node = LeafEpisodicNode(
            timestep=new_timestep,
            messages=messages
        )

        # Add the new leaf node to the sequence and update the non-goal node timesteps.
        self._node_sequence.append(leaf_node)
        self._non_goal_node_timesteps.append(new_timestep)

    return new_timestep

add_compression_node

add_compression_node(
    compressed_timesteps: List[int],
    summary: Optional[str] = None,
) -> int

Add a new compression node that summarizes the given non-goal nodes.

Before creating the compression node, close the last leaf node if it is still appendable. The compressed_timesteps list tells which nodes to summarize. Those nodes are then removed from the active list, and the new compression node replaces them in the active non-goal node list.

Parameters:

Name Type Description Default
compressed_timesteps List[int]

List of timesteps of the compressed nodes.

required
summary Optional[str]

The compression summary content. If not provided, an unset concurrent.futures.Future of summary will be created.

None

Returns:

Type Description
int

The timestep of the new compression node.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def add_compression_node(self, compressed_timesteps: List[int], summary: Optional[str] = None) -> int:
    """
    Add a new compression node that summarizes the given non-goal nodes.

    Before creating the compression node, close the last leaf node if it is still appendable.
    The `compressed_timesteps` list tells which nodes to summarize. Those nodes are then removed 
    from the active list, and the new compression node replaces them in the active non-goal node list.

    Parameters
    ----------
    compressed_timesteps : List[int]
        List of timesteps of the compressed nodes.
    summary : Optional[str]
        The compression summary content. If not provided, an unset concurrent.futures.Future of summary will be created.

    Returns
    -------
    int
        The timestep of the new compression node.
    """
    with self._lock:
        # Close the tail appendable leaf node.
        self._mark_tail_leaf_node_not_appendable()

        # Get the next timestep.
        new_timestep = self._get_next_timestep()

        # Create the compression node.
        compression_node = CompressionEpisodicNode(
            timestep=new_timestep,
            compressed_timesteps=compressed_timesteps,
            summary=summary,
        )

        # Add the new compression node to the sequence and update the non-goal node timesteps.
        self._node_sequence.append(compression_node)
        self._non_goal_node_timesteps.append(new_timestep)

        # Remove the timesteps of the compressed nodes from _non_goal_node_timesteps.
        self._non_goal_node_timesteps = [
            t for t in self._non_goal_node_timesteps 
            if t not in compressed_timesteps
        ]

    return new_timestep

get_tail_appendable_leaf_node

get_tail_appendable_leaf_node() -> (
    Optional[LeafEpisodicNode]
)

Get the tail appendable leaf node if it exists.

Returns:

Type Description
Optional[LeafEpisodicNode]

The tail appendable leaf node, or None if not found.

Source code in bridgic/core/agentic/recent/_episodic_node_tree.py
def get_tail_appendable_leaf_node(self) -> Optional[LeafEpisodicNode]:
    """
    Get the tail appendable leaf node if it exists.

    Returns
    -------
    Optional[LeafEpisodicNode]
        The tail appendable leaf node, or None if not found.
    """
    if self._node_sequence:
        last_node = self._node_sequence[-1]
        if isinstance(last_node, LeafEpisodicNode) and last_node.message_appendable:
            return last_node
    return None

BaseEpisodicNode

Bases: Serializable, ABC

BaseEpisodicNode represents a single memory unit in the memory sequence in the ReCENT Algorithm.

Source code in bridgic/core/agentic/recent/_episodic_node.py
class BaseEpisodicNode(Serializable, ABC):
    """
    BaseEpisodicNode represents a single memory unit in the memory sequence in the ReCENT Algorithm.
    """

    node_type: NodeType
    """The type of the node."""

    timestep: int
    """The timestep of the node."""
    timestamp: str
    """The timestamp of the node."""

    def __init__(self, timestep: int):
        self.timestep = timestep
        self.timestamp = datetime.now().isoformat()

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        return {
            "node_type": self.node_type.value,
            "timestep": self.timestep,
            "timestamp": self.timestamp,
        }

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        self.node_type = NodeType(state_dict["node_type"])
        self.timestep = state_dict["timestep"]
        self.timestamp = state_dict["timestamp"]

node_type instance-attribute

node_type: NodeType

The type of the node.

timestep instance-attribute

timestep: int = timestep

The timestep of the node.

timestamp instance-attribute

timestamp: str = isoformat()

The timestamp of the node.

GoalEpisodicNode

Bases: BaseEpisodicNode

Source code in bridgic/core/agentic/recent/_episodic_node.py
class GoalEpisodicNode(BaseEpisodicNode):

    goal: str
    """The content of the goal."""
    guidance: str
    """The guidance to achieve the goal."""

    previous_goal_node_timestep: int
    """The timestep of the previous goal node (the goal node that was replaced by this one)."""

    def __init__(
        self,
        timestep: int,
        goal: str,
        guidance: Optional[str] = None,
        previous_goal_node_timestep: Optional[int] = None,
    ):
        super().__init__(timestep)
        self.node_type = NodeType.GOAL
        self.goal = goal
        self.guidance = guidance if guidance is not None else ""
        self.previous_goal_node_timestep = previous_goal_node_timestep if previous_goal_node_timestep is not None else -1

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        result = super().dump_to_dict()
        result["content"] = self.goal
        result["previous_goal_node_timestep"] = self.previous_goal_node_timestep
        return result

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self.goal = state_dict["content"]
        self.previous_goal_node_timestep = state_dict.get("previous_goal_node_timestep", -1)

goal instance-attribute

goal: str = goal

The content of the goal.

guidance instance-attribute

guidance: str = guidance if guidance is not None else ''

The guidance to achieve the goal.

previous_goal_node_timestep instance-attribute

previous_goal_node_timestep: int = (
    previous_goal_node_timestep
    if previous_goal_node_timestep is not None
    else -1
)

The timestep of the previous goal node (the goal node that was replaced by this one).

LeafEpisodicNode

Bases: BaseEpisodicNode

Source code in bridgic/core/agentic/recent/_episodic_node.py
class LeafEpisodicNode(BaseEpisodicNode):

    messages: List[Message]
    """The original messages of the leaf episodic node."""

    message_appendable: bool
    """Whether new message could be appended to the leaf episodic node."""

    def __init__(self, timestep: int, messages: Optional[List[Message]] = None):
        super().__init__(timestep)
        self.node_type = NodeType.LEAF
        self.messages = messages if messages is not None else []
        self.message_appendable = True

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        result = super().dump_to_dict()
        result["messages"] = self.messages
        result["message_appendable"] = self.message_appendable
        return result

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self.messages = state_dict.get("messages")
        self.message_appendable = state_dict.get("message_appendable")

messages instance-attribute

messages: List[Message] = (
    messages if messages is not None else []
)

The original messages of the leaf episodic node.

message_appendable instance-attribute

message_appendable: bool = True

Whether new message could be appended to the leaf episodic node.

CompressionEpisodicNode

Bases: BaseEpisodicNode

Compression node that compresses a sequence of episodic nodes.

A compression node is created to summarize and compress multiple episodic nodes. The compression node contains a summary of the compressed nodes and records their timesteps.

Source code in bridgic/core/agentic/recent/_episodic_node.py
class CompressionEpisodicNode(BaseEpisodicNode):
    """
    Compression node that compresses a sequence of episodic nodes.

    A compression node is created to summarize and compress multiple episodic nodes. The compression 
    node contains a summary of the compressed nodes and records their timesteps.
    """

    summary: Future[str]
    """The summary of the compression node, which is a concurrent.futures.Future for cross-thread/event-loop dependency handling."""

    compressed_node_timesteps: List[int]
    """The timesteps of the compressed nodes (the nodes that were compressed by this compression node)."""

    def __init__(self, timestep: int, compressed_timesteps: List[int], summary: Optional[str] = None):
        super().__init__(timestep)
        self.node_type = NodeType.COMPRESSION
        self.summary = Future()
        if summary:
            self.summary.set_result(summary)
        self.compressed_node_timesteps = compressed_timesteps if compressed_timesteps is not None else []

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        result = super().dump_to_dict()

        # Future cannot be serialized, so we try to get the result if done, otherwise use empty string
        if self.summary.done():
            try:
                result["summary"] = self.summary.result()
            except Exception:
                result["summary"] = ""
                warnings.warn("Failed to get the result of the summary of the compression episodic node, thus empty summary will be stored.")
        else:
            result["summary"] = ""
            warnings.warn("The summary of the compression episodic node is not ready yet, thus empty summary will be stored.", FutureWarning)

        # Record the timesteps of the nodes that were compressed.
        result["compressed_node_timesteps"] = self.compressed_node_timesteps
        return result

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self.summary = Future()
        self.summary.set_result(state_dict.get("summary", ""))
        self.compressed_node_timesteps = state_dict.get("compressed_node_timesteps", [])

summary instance-attribute

summary: Future[str] = Future()

The summary of the compression node, which is a concurrent.futures.Future for cross-thread/event-loop dependency handling.

compressed_node_timesteps instance-attribute

compressed_node_timesteps: List[int] = (
    compressed_timesteps
    if compressed_timesteps is not None
    else []
)

The timesteps of the compressed nodes (the nodes that were compressed by this compression node).