Skip to content

amphibious

Amphibious Agent Framework — Dual-Mode Agent Orchestration.

A framework for building agents that can operate in both LLM-driven (agent) and deterministic (workflow) modes, with automatic fallback between them.

Architecture Layers

Abstraction Layer (Data Exposure): - Exposure: Base abstraction for field-level data management - LayeredExposure: Supports progressive disclosure (summary + per-item details) - EntireExposure: Summary only, no per-item detail queries - Context: Base class for agent context with automatic Exposure field detection

Implementation Layer — Context: - Step: A single execution step with content, result, and metadata - Skill: A skill definition following SKILL.md format - CognitiveTools: Tool management (EntireExposure) - CognitiveSkills: Skill management with progressive disclosure (LayeredExposure) - CognitiveHistory: Execution history with layered memory (LayeredExposure) - CognitiveContext: The default cognitive context combining all above

Implementation Layer — Worker (Think Unit): - CognitiveWorker: Pure thinking unit of one observe-think-act cycle. Cognitive policies (acquiring, rehearsal, reflection) enable multi-round thinking within a single call.

Orchestration Layer: - AmphibiousAutoma: Dual-mode agent engine (agent mode + workflow mode) - think_unit: Descriptor for declaring think units (used in on_agent) - ActionCall, HumanCall, AgentCall: Workflow yield types (used in on_workflow) - ErrorStrategy: Error handling strategies (RAISE, IGNORE, RETRY)

Example

class MyAgent(AmphibiousAutoma[CognitiveContext]): ... main_think = think_unit(CognitiveWorker.inline("Execute step"), max_attempts=20) ... async def on_agent(self, ctx): ... await self.main_think ... ctx = await MyAgent(llm=llm).arun(goal="Complete the task")

Exposure

Bases: ABC, Generic[T]

Abstract base class for field-level data exposure.

Manages list-like data (e.g., history records, tool lists) with a unified interface. Subclasses determine the exposure strategy: - LayeredExposure: supports progressive disclosure (summary + per-item details) - EntireExposure: only provides summary (no per-item details)

Methods:

Name Description
add

Add an element and return its index.

summary

Return a list of summary strings for all elements.

get_all

Return a copy of all elements.

Source code in bridgic/amphibious/_context.py
class Exposure(ABC, Generic[T]):
    """
    Abstract base class for field-level data exposure.

    Manages list-like data (e.g., history records, tool lists) with a unified interface.
    Subclasses determine the exposure strategy:
    - LayeredExposure: supports progressive disclosure (summary + per-item details)
    - EntireExposure: only provides summary (no per-item details)

    Methods
    -------
    add(item)
        Add an element and return its index.
    summary()
        Return a list of summary strings for all elements.
    get_all()
        Return a copy of all elements.
    """

    def __init__(self):
        self._items: List[T] = []
        self._llm: Optional[Any] = None

    def __len__(self) -> int:
        return len(self._items)

    def __getitem__(self, index: int) -> T:
        return self._items[index]

    def __iter__(self) -> Iterator[T]:
        return iter(self._items)

    def add(self, item: T) -> int:
        """
        Add an element to the collection.

        Parameters
        ----------
        item : T
            The element to add.

        Returns
        -------
        int
            Index of the newly added element (0-based).
        """
        self._items.append(item)
        return len(self._items) - 1

    def get_all(self) -> List[T]:
        """
        Return a copy of all elements.

        Returns
        -------
        List[T]
            A copy of all elements.
        """
        return self._items.copy()

    def set_llm(self, llm: Any) -> None:
        """
        Set the LLM for this exposure.

        Stored as ``self._llm`` and available to subclasses that need it
        (e.g. ``CognitiveHistory`` uses it for compression).
        Called automatically by ``Context.set_llm()`` for fields marked with
        ``json_schema_extra={"use_llm": True}``.

        Parameters
        ----------
        llm : Any
            LLM instance to store.
        """
        self._llm = llm

add

add(item: T) -> int

Add an element to the collection.

Parameters:

Name Type Description Default
item T

The element to add.

required

Returns:

Type Description
int

Index of the newly added element (0-based).

Source code in bridgic/amphibious/_context.py
def add(self, item: T) -> int:
    """
    Add an element to the collection.

    Parameters
    ----------
    item : T
        The element to add.

    Returns
    -------
    int
        Index of the newly added element (0-based).
    """
    self._items.append(item)
    return len(self._items) - 1

get_all

get_all() -> List[T]

Return a copy of all elements.

Returns:

Type Description
List[T]

A copy of all elements.

Source code in bridgic/amphibious/_context.py
def get_all(self) -> List[T]:
    """
    Return a copy of all elements.

    Returns
    -------
    List[T]
        A copy of all elements.
    """
    return self._items.copy()

set_llm

set_llm(llm: Any) -> None

Set the LLM for this exposure.

Stored as self._llm and available to subclasses that need it (e.g. CognitiveHistory uses it for compression). Called automatically by Context.set_llm() for fields marked with json_schema_extra={"use_llm": True}.

Parameters:

Name Type Description Default
llm Any

LLM instance to store.

required
Source code in bridgic/amphibious/_context.py
def set_llm(self, llm: Any) -> None:
    """
    Set the LLM for this exposure.

    Stored as ``self._llm`` and available to subclasses that need it
    (e.g. ``CognitiveHistory`` uses it for compression).
    Called automatically by ``Context.set_llm()`` for fields marked with
    ``json_schema_extra={"use_llm": True}``.

    Parameters
    ----------
    llm : Any
        LLM instance to store.
    """
    self._llm = llm

LayeredExposure

Bases: Exposure[T]

Exposure with progressive disclosure support.

Provides two-level information architecture: - summary(): overview of all items - get_details(index): detailed information for a specific item

Use this for data where the LLM may need to request details about specific items (e.g., execution history, skills).

Disclosure state is owned internally: once revealed, an item's detail is cached in _revealed. Call reset_revealed() to clear all cached reveals (e.g., at phase boundaries in a multi-phase agent).

Source code in bridgic/amphibious/_context.py
class LayeredExposure(Exposure[T]):
    """
    Exposure with progressive disclosure support.

    Provides two-level information architecture:
    - summary(): overview of all items
    - get_details(index): detailed information for a specific item

    Use this for data where the LLM may need to request details
    about specific items (e.g., execution history, skills).

    Disclosure state is owned internally: once revealed, an item's detail
    is cached in _revealed. Call reset_revealed() to clear all cached reveals
    (e.g., at phase boundaries in a multi-phase agent).
    """

    def __init__(self):
        super().__init__()
        self._revealed: Dict[int, str] = {}  # index → cached detail string

    def reveal(self, index: int) -> Optional[str]:
        """
        Get and cache detailed information for a specific element.

        Returns the cached value if already revealed; otherwise calls
        get_details(index), stores the result, and returns it.

        Parameters
        ----------
        index : int
            Element index (0-based).

        Returns
        -------
        Optional[str]
            Detailed information string, or None if index is invalid.
        """
        if index in self._revealed:
            return self._revealed[index]
        detail = self.get_details(index)
        if detail is not None:
            self._revealed[index] = detail
        return detail

    def reset_revealed(self) -> None:
        """
        Clear all cached reveals.

        Use at phase boundaries to allow the LLM to re-request details
        that were disclosed in a previous phase.
        """
        self._revealed.clear()

    @abstractmethod
    def summary(self) -> List[str]:
        """
        Generate summary strings for all elements.

        Returns
        -------
        List[str]
            One summary string per element.
        """
        ...

    @abstractmethod
    def get_details(self, index: int) -> Optional[str]:
        """
        Get detailed information for a specific element.

        Parameters
        ----------
        index : int
            Element index (0-based).

        Returns
        -------
        Optional[str]
            Detailed information string, or None if index is invalid.
        """
        ...

reveal

reveal(index: int) -> Optional[str]

Get and cache detailed information for a specific element.

Returns the cached value if already revealed; otherwise calls get_details(index), stores the result, and returns it.

Parameters:

Name Type Description Default
index int

Element index (0-based).

required

Returns:

Type Description
Optional[str]

Detailed information string, or None if index is invalid.

Source code in bridgic/amphibious/_context.py
def reveal(self, index: int) -> Optional[str]:
    """
    Get and cache detailed information for a specific element.

    Returns the cached value if already revealed; otherwise calls
    get_details(index), stores the result, and returns it.

    Parameters
    ----------
    index : int
        Element index (0-based).

    Returns
    -------
    Optional[str]
        Detailed information string, or None if index is invalid.
    """
    if index in self._revealed:
        return self._revealed[index]
    detail = self.get_details(index)
    if detail is not None:
        self._revealed[index] = detail
    return detail

reset_revealed

reset_revealed() -> None

Clear all cached reveals.

Use at phase boundaries to allow the LLM to re-request details that were disclosed in a previous phase.

Source code in bridgic/amphibious/_context.py
def reset_revealed(self) -> None:
    """
    Clear all cached reveals.

    Use at phase boundaries to allow the LLM to re-request details
    that were disclosed in a previous phase.
    """
    self._revealed.clear()

summary

abstractmethod
summary() -> List[str]

Generate summary strings for all elements.

Returns:

Type Description
List[str]

One summary string per element.

Source code in bridgic/amphibious/_context.py
@abstractmethod
def summary(self) -> List[str]:
    """
    Generate summary strings for all elements.

    Returns
    -------
    List[str]
        One summary string per element.
    """
    ...

get_details

abstractmethod
get_details(index: int) -> Optional[str]

Get detailed information for a specific element.

Parameters:

Name Type Description Default
index int

Element index (0-based).

required

Returns:

Type Description
Optional[str]

Detailed information string, or None if index is invalid.

Source code in bridgic/amphibious/_context.py
@abstractmethod
def get_details(self, index: int) -> Optional[str]:
    """
    Get detailed information for a specific element.

    Parameters
    ----------
    index : int
        Element index (0-based).

    Returns
    -------
    Optional[str]
        Detailed information string, or None if index is invalid.
    """
    ...

EntireExposure

Bases: Exposure[T]

Exposure without progressive disclosure.

Only provides summary() - all information is exposed at once. Use this for data where per-item details are not needed or the full information should always be available (e.g., tools).

Source code in bridgic/amphibious/_context.py
class EntireExposure(Exposure[T]):
    """
    Exposure without progressive disclosure.

    Only provides summary() - all information is exposed at once.
    Use this for data where per-item details are not needed
    or the full information should always be available (e.g., tools).
    """
    @abstractmethod
    def summary(self) -> List[str]:
        """
        Generate summary strings for all elements.

        Returns
        -------
        List[str]
            One summary string per element.
        """
        ...

summary

abstractmethod
summary() -> List[str]

Generate summary strings for all elements.

Returns:

Type Description
List[str]

One summary string per element.

Source code in bridgic/amphibious/_context.py
@abstractmethod
def summary(self) -> List[str]:
    """
    Generate summary strings for all elements.

    Returns
    -------
    List[str]
        One summary string per element.
    """
    ...

Context

Bases: BaseModel

Base class for agent context with automatic Exposure field detection.

Provides unified access to context data through summary and detail retrieval. Automatically discovers Exposure-typed fields and distinguishes between LayeredExposure (supports details) and EntireExposure (summary only).

Methods:

Name Description
summary

Get a dictionary of all field values or Exposure summaries.

get_details

Get detailed information for a specific LayeredExposure item.

get_revealed_items

Get all (field, index) pairs that have been revealed so far.

reset_revealed

Clear reveal caches on all LayeredExposure fields.

Examples:

>>> class MyContext(Context):
...     model_config = ConfigDict(arbitrary_types_allowed=True)
...     goal: str
...     history: CognitiveHistory = Field(default_factory=CognitiveHistory)  # LayeredExposure
...     tools: CognitiveTools = Field(default_factory=CognitiveTools)  # EntireExposure
...
>>> ctx = MyContext(goal="Complete task")
>>> ctx.history.add(Step(content="Step 1", status=True))
>>> ctx.summary()  # Returns dict with goal and summaries of history/tools
>>> ctx.get_details("history", 0)  # Works - history is LayeredExposure
>>> ctx.get_details("tools", 0)  # Returns None - tools is EntireExposure
Source code in bridgic/amphibious/_context.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
class Context(BaseModel):
    """
    Base class for agent context with automatic Exposure field detection.

    Provides unified access to context data through summary and detail retrieval.
    Automatically discovers Exposure-typed fields and distinguishes between
    LayeredExposure (supports details) and EntireExposure (summary only).

    Methods
    -------
    summary()
        Get a dictionary of all field values or Exposure summaries.
    get_details(field, idx)
        Get detailed information for a specific LayeredExposure item.
    get_revealed_items()
        Get all (field, index) pairs that have been revealed so far.
    reset_revealed()
        Clear reveal caches on all LayeredExposure fields.

    Examples
    --------
    >>> class MyContext(Context):
    ...     model_config = ConfigDict(arbitrary_types_allowed=True)
    ...     goal: str
    ...     history: CognitiveHistory = Field(default_factory=CognitiveHistory)  # LayeredExposure
    ...     tools: CognitiveTools = Field(default_factory=CognitiveTools)  # EntireExposure
    ...
    >>> ctx = MyContext(goal="Complete task")
    >>> ctx.history.add(Step(content="Step 1", status=True))
    >>> ctx.summary()  # Returns dict with goal and summaries of history/tools
    >>> ctx.get_details("history", 0)  # Works - history is LayeredExposure
    >>> ctx.get_details("tools", 0)  # Returns None - tools is EntireExposure
    """

    # Class-level cache for detected fields
    _exposure_fields: Optional[Dict[str, Dict[str, Any]]] = None

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        cls._exposure_fields = None

    def model_post_init(self, __context: Any) -> None:
        super().model_post_init(__context)
        if self.__class__._exposure_fields is None:
            self.__class__._exposure_fields = self._detect_exposure_fields()

        # Call __post_init__ if defined in subclass (provides dataclass-like API)
        if hasattr(self, '__post_init__') and callable(getattr(self, '__post_init__')):
            self.__post_init__()

    @classmethod
    def _detect_exposure_fields(cls) -> Dict[str, Dict[str, Any]]:
        """Detect all Exposure fields and classify them."""
        exposure_fields = {}

        def _get_exposure_type(field_type: Any) -> Optional[str]:
            """Return 'layered', 'entire', or None."""
            if inspect.isclass(field_type):
                if issubclass(field_type, LayeredExposure):
                    return 'layered'
                elif issubclass(field_type, EntireExposure):
                    return 'entire'
                elif issubclass(field_type, Exposure):
                    # Base Exposure - treat as entire (no details)
                    return 'entire'
                else:
                    return None

            origin = get_origin(field_type)
            if origin and inspect.isclass(origin):
                if issubclass(origin, LayeredExposure):
                    return 'layered'
                elif issubclass(origin, EntireExposure):
                    return 'entire'
                elif issubclass(origin, Exposure):
                    return 'entire'
                else:
                    return None

            return None

        for field_name, field_info in cls.model_fields.items():
            exposure_type = _get_exposure_type(field_info.annotation)
            if exposure_type:
                exposure_fields[field_name] = {
                    'field_name': field_name,
                    'exposure_type': exposure_type,  # 'layered' or 'entire'
                }

        return exposure_fields

    @classmethod
    def get_hidden_fields(cls) -> List[str]:
        """
        Get list of field names that should be hidden from summary.

        Returns
        -------
        List[str]
            Field names where display is explicitly set to False.

        Examples
        --------
        >>> class MyContext(Context):
        ...     visible: str = Field(default="")
        ...     hidden: str = Field(default="", json_schema_extra={"display": False})
        >>> MyContext.get_hidden_fields()
        ['hidden']
        """
        hidden = []
        for field_name, field_info in cls.model_fields.items():
            extra = field_info.json_schema_extra or {}
            display = extra.get("display", True)
            if not display:
                hidden.append(field_name)
        return hidden

    def set_llm(self, llm: Any) -> None:
        """
        Propagate an LLM to all Exposure fields that opt in via ``use_llm=True``.

        Called by ``AmphibiousAutoma`` at run start. Fields opt in by declaring:

            my_field: MyExposure = Field(
                ..., json_schema_extra={"use_llm": True}
            )

        Only Exposure fields are considered; non-Exposure fields are ignored.
        Exposure fields without ``use_llm=True`` are also ignored.

        Parameters
        ----------
        llm : Any
            The LLM instance to propagate.
        """
        exposure_fields = self.__class__._exposure_fields or {}
        for field_name in exposure_fields:
            field_info = self.__class__.model_fields.get(field_name)
            if field_info is None:
                continue
            extra = field_info.json_schema_extra or {}
            if extra.get("use_llm", False):
                field_value = getattr(self, field_name, None)
                if field_value is not None:
                    field_value.set_llm(llm)

    def summary(self) -> Dict[str, str]:
        """
        Generate a summary dictionary with formatted strings for each field.

        Automatically filters out fields marked with json_schema_extra={"display": False}.
        Subclasses should override this to provide custom formatting for each field.
        The returned dictionary maps field names to their formatted string representations.

        Returns
        -------
        Dict[str, str]
            Field name to formatted summary string mapping.
            Each value should be a complete, formatted string ready for prompt inclusion.
            Hidden fields (display=False) are excluded.
        """
        result = {}
        exposure_fields = self.__class__._exposure_fields or {}
        hidden_fields = set(self.get_hidden_fields())

        # Add non-Exposure fields as simple string (skip hidden fields)
        for field_name in self.__class__.model_fields:
            if field_name not in exposure_fields and field_name not in hidden_fields:
                value = getattr(self, field_name)
                if value is not None:
                    # Get field description if available
                    field_info = self.__class__.model_fields.get(field_name)
                    description = field_info.description if field_info and field_info.description else None

                    if description:
                        result[field_name] = f"{field_name} ({description}):\n {value}"
                    else:
                        result[field_name] = f"{field_name}:\n {value}"

        # Add Exposure field summaries (skip hidden fields)
        for field_name in exposure_fields:
            if field_name not in hidden_fields:
                field_value = getattr(self, field_name)
                if field_value and len(field_value) > 0:
                    # Get field description if available
                    field_info = self.__class__.model_fields.get(field_name)
                    description = field_info.description if field_info and field_info.description else None
                    summaries = field_value.summary()
                    if description:
                        result[field_name] = f"{field_name} ({description}):\n" + "\n".join(f"  {s}" for s in summaries)
                    else:
                        result[field_name] = f"{field_name}:\n" + "\n".join(f"  {s}" for s in summaries)

        return result

    def format_summary(
        self,
        include: Optional[List[str]] = None,
        exclude: Optional[List[str]] = None,
        separator: str = "\n"
    ) -> str:
        """
        Format the summary dictionary into a string with field selection.

        Parameters
        ----------
        include : Optional[List[str]]
            If provided, only include these fields (takes priority over exclude).
        exclude : Optional[List[str]]
            If provided, exclude these fields from the output.
        separator : str
            Separator between field summaries. Default is newline.

        Returns
        -------
        str
            Formatted summary string with selected fields.

        Examples
        --------
        >>> ctx.format_summary()  # All fields
        >>> ctx.format_summary(include=['tools', 'skills'])  # Only capabilities
        >>> ctx.format_summary(exclude=['tools', 'skills'])  # Only task context
        """
        summary_dict = self.summary()

        if include is not None:
            fields = [f for f in include if f in summary_dict]
        elif exclude is not None:
            fields = [f for f in summary_dict if f not in exclude]
        else:
            fields = list(summary_dict.keys())

        return separator.join(summary_dict[f] for f in fields if summary_dict.get(f))

    def get_field(self, field: str) -> Tuple[Optional[List[str]], Any]:
        """
        Get field information with type-aware return.

        Parameters
        ----------
        field : str
            Name of the field.

        Returns
        -------
        Tuple[Optional[List[str]], Any]
            - If field is an Exposure: (summary list, all items via get_all())
            - Otherwise: (None, raw field value)
        """
        exposure_fields = self.__class__._exposure_fields or {}
        field_value = getattr(self, field, None)

        if field in exposure_fields:
            # Exposure field: return (summary, get_all())
            if field_value and hasattr(field_value, 'summary') and hasattr(field_value, 'get_all'):
                return (field_value.summary(), field_value.get_all())
            return ([], [])
        else:
            # Non-Exposure field: return (None, value)
            return (None, field_value)

    def get_details(self, field: str, idx: int) -> Optional[str]:
        """
        Get detailed information for a LayeredExposure field item.

        Only works for LayeredExposure fields. Returns None for EntireExposure.

        Parameters
        ----------
        field : str
            Name of the Exposure field.
        idx : int
            Item index within the field (0-based).

        Returns
        -------
        Optional[str]
            Detailed information string, or None if:
            - Field doesn't exist
            - Field is not a LayeredExposure
            - Index is invalid
        """
        exposure_fields = self.__class__._exposure_fields or {}

        if field not in exposure_fields:
            return None

        # Only LayeredExposure supports get_details
        if exposure_fields[field].get('exposure_type') != 'layered':
            return None

        field_value = getattr(self, field, None)
        if field_value and hasattr(field_value, 'reveal'):
            return field_value.reveal(idx)

        return None

    def get_revealed_items(self) -> List[Tuple[str, int]]:
        """
        Return all (field_name, index) pairs that have been revealed across
        all LayeredExposure fields on this context.

        Returns
        -------
        List[Tuple[str, int]]
            Ordered list of (field_name, item_index) that have cached reveals.
        """
        result: List[Tuple[str, int]] = []
        for fname, fval in self:
            if isinstance(fval, LayeredExposure):
                for idx in fval._revealed:
                    result.append((fname, idx))
        return result

    def reset_revealed(self) -> None:
        """
        Reset reveal state on all LayeredExposure fields.

        Equivalent to calling field.reset_revealed() on every LayeredExposure
        field. Use at phase boundaries so the next phase can request fresh details.
        """
        for _, fval in self:
            if isinstance(fval, LayeredExposure):
                fval.reset_revealed()

    def __iter__(self) -> Iterator[Tuple[str, Any]]:
        """Yield ``(field_name, field_value)`` for every model field on this context."""
        for field_name in type(self).model_fields:
            yield field_name, getattr(self, field_name)

    def __str__(self) -> str:
        """
        Return a formatted string representation of the context.

        Automatically formats all defined fields:
        - Exposure fields: displays their summary
        - Other fields: displays field name and value

        Returns
        -------
        str
            Formatted string representation.
        """
        exposure_fields = self.__class__._exposure_fields or {}
        lines = []
        separator = "-" * 50

        lines.append(f"{'=' * 50}")
        lines.append(f"  {self.__class__.__name__}")
        lines.append(f"{'=' * 50}")

        # Format non-Exposure fields first
        for field_name in self.__class__.model_fields:
            if field_name not in exposure_fields:
                value = getattr(self, field_name)
                if value is not None:
                    lines.append(f"\n[{field_name}]")
                    lines.append(f"  {value}")

        # Format Exposure fields
        for field_name, field_info in exposure_fields.items():
            field_value = getattr(self, field_name, None)
            lines.append(f"\n[{field_name}] ({field_info.get('exposure_type', 'unknown')})")
            if field_value and len(field_value) > 0:
                for i, summary in enumerate(field_value.summary()):
                    lines.append(f"  [{i}] {summary}")
            else:
                lines.append("  (empty)")

        lines.append(f"\n{'=' * 50}")
        return "\n".join(lines)

    def __repr__(self) -> str:
        """Return a concise representation of the context."""
        exposure_fields = self.__class__._exposure_fields or {}
        parts = []

        for field_name in self.__class__.model_fields:
            if field_name not in exposure_fields:
                value = getattr(self, field_name)
                if value is not None:
                    parts.append(f"{field_name}={value!r}")

        for field_name in exposure_fields:
            field_value = getattr(self, field_name, None)
            count = len(field_value) if field_value else 0
            parts.append(f"{field_name}=[{count} items]")

        return f"{self.__class__.__name__}({', '.join(parts)})"

get_hidden_fields

classmethod
get_hidden_fields() -> List[str]

Get list of field names that should be hidden from summary.

Returns:

Type Description
List[str]

Field names where display is explicitly set to False.

Examples:

1
2
3
4
5
>>> class MyContext(Context):
...     visible: str = Field(default="")
...     hidden: str = Field(default="", json_schema_extra={"display": False})
>>> MyContext.get_hidden_fields()
['hidden']
Source code in bridgic/amphibious/_context.py
@classmethod
def get_hidden_fields(cls) -> List[str]:
    """
    Get list of field names that should be hidden from summary.

    Returns
    -------
    List[str]
        Field names where display is explicitly set to False.

    Examples
    --------
    >>> class MyContext(Context):
    ...     visible: str = Field(default="")
    ...     hidden: str = Field(default="", json_schema_extra={"display": False})
    >>> MyContext.get_hidden_fields()
    ['hidden']
    """
    hidden = []
    for field_name, field_info in cls.model_fields.items():
        extra = field_info.json_schema_extra or {}
        display = extra.get("display", True)
        if not display:
            hidden.append(field_name)
    return hidden

set_llm

set_llm(llm: Any) -> None

Propagate an LLM to all Exposure fields that opt in via use_llm=True.

Called by AmphibiousAutoma at run start. Fields opt in by declaring:

1
2
3
my_field: MyExposure = Field(
    ..., json_schema_extra={"use_llm": True}
)

Only Exposure fields are considered; non-Exposure fields are ignored. Exposure fields without use_llm=True are also ignored.

Parameters:

Name Type Description Default
llm Any

The LLM instance to propagate.

required
Source code in bridgic/amphibious/_context.py
def set_llm(self, llm: Any) -> None:
    """
    Propagate an LLM to all Exposure fields that opt in via ``use_llm=True``.

    Called by ``AmphibiousAutoma`` at run start. Fields opt in by declaring:

        my_field: MyExposure = Field(
            ..., json_schema_extra={"use_llm": True}
        )

    Only Exposure fields are considered; non-Exposure fields are ignored.
    Exposure fields without ``use_llm=True`` are also ignored.

    Parameters
    ----------
    llm : Any
        The LLM instance to propagate.
    """
    exposure_fields = self.__class__._exposure_fields or {}
    for field_name in exposure_fields:
        field_info = self.__class__.model_fields.get(field_name)
        if field_info is None:
            continue
        extra = field_info.json_schema_extra or {}
        if extra.get("use_llm", False):
            field_value = getattr(self, field_name, None)
            if field_value is not None:
                field_value.set_llm(llm)

summary

summary() -> Dict[str, str]

Generate a summary dictionary with formatted strings for each field.

Automatically filters out fields marked with json_schema_extra={"display": False}. Subclasses should override this to provide custom formatting for each field. The returned dictionary maps field names to their formatted string representations.

Returns:

Type Description
Dict[str, str]

Field name to formatted summary string mapping. Each value should be a complete, formatted string ready for prompt inclusion. Hidden fields (display=False) are excluded.

Source code in bridgic/amphibious/_context.py
def summary(self) -> Dict[str, str]:
    """
    Generate a summary dictionary with formatted strings for each field.

    Automatically filters out fields marked with json_schema_extra={"display": False}.
    Subclasses should override this to provide custom formatting for each field.
    The returned dictionary maps field names to their formatted string representations.

    Returns
    -------
    Dict[str, str]
        Field name to formatted summary string mapping.
        Each value should be a complete, formatted string ready for prompt inclusion.
        Hidden fields (display=False) are excluded.
    """
    result = {}
    exposure_fields = self.__class__._exposure_fields or {}
    hidden_fields = set(self.get_hidden_fields())

    # Add non-Exposure fields as simple string (skip hidden fields)
    for field_name in self.__class__.model_fields:
        if field_name not in exposure_fields and field_name not in hidden_fields:
            value = getattr(self, field_name)
            if value is not None:
                # Get field description if available
                field_info = self.__class__.model_fields.get(field_name)
                description = field_info.description if field_info and field_info.description else None

                if description:
                    result[field_name] = f"{field_name} ({description}):\n {value}"
                else:
                    result[field_name] = f"{field_name}:\n {value}"

    # Add Exposure field summaries (skip hidden fields)
    for field_name in exposure_fields:
        if field_name not in hidden_fields:
            field_value = getattr(self, field_name)
            if field_value and len(field_value) > 0:
                # Get field description if available
                field_info = self.__class__.model_fields.get(field_name)
                description = field_info.description if field_info and field_info.description else None
                summaries = field_value.summary()
                if description:
                    result[field_name] = f"{field_name} ({description}):\n" + "\n".join(f"  {s}" for s in summaries)
                else:
                    result[field_name] = f"{field_name}:\n" + "\n".join(f"  {s}" for s in summaries)

    return result

format_summary

format_summary(
    include: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
    separator: str = "\n",
) -> str

Format the summary dictionary into a string with field selection.

Parameters:

Name Type Description Default
include Optional[List[str]]

If provided, only include these fields (takes priority over exclude).

None
exclude Optional[List[str]]

If provided, exclude these fields from the output.

None
separator str

Separator between field summaries. Default is newline.

'\n'

Returns:

Type Description
str

Formatted summary string with selected fields.

Examples:

1
2
3
>>> ctx.format_summary()  # All fields
>>> ctx.format_summary(include=['tools', 'skills'])  # Only capabilities
>>> ctx.format_summary(exclude=['tools', 'skills'])  # Only task context
Source code in bridgic/amphibious/_context.py
def format_summary(
    self,
    include: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
    separator: str = "\n"
) -> str:
    """
    Format the summary dictionary into a string with field selection.

    Parameters
    ----------
    include : Optional[List[str]]
        If provided, only include these fields (takes priority over exclude).
    exclude : Optional[List[str]]
        If provided, exclude these fields from the output.
    separator : str
        Separator between field summaries. Default is newline.

    Returns
    -------
    str
        Formatted summary string with selected fields.

    Examples
    --------
    >>> ctx.format_summary()  # All fields
    >>> ctx.format_summary(include=['tools', 'skills'])  # Only capabilities
    >>> ctx.format_summary(exclude=['tools', 'skills'])  # Only task context
    """
    summary_dict = self.summary()

    if include is not None:
        fields = [f for f in include if f in summary_dict]
    elif exclude is not None:
        fields = [f for f in summary_dict if f not in exclude]
    else:
        fields = list(summary_dict.keys())

    return separator.join(summary_dict[f] for f in fields if summary_dict.get(f))

get_field

get_field(field: str) -> Tuple[Optional[List[str]], Any]

Get field information with type-aware return.

Parameters:

Name Type Description Default
field str

Name of the field.

required

Returns:

Type Description
Tuple[Optional[List[str]], Any]
  • If field is an Exposure: (summary list, all items via get_all())
  • Otherwise: (None, raw field value)
Source code in bridgic/amphibious/_context.py
def get_field(self, field: str) -> Tuple[Optional[List[str]], Any]:
    """
    Get field information with type-aware return.

    Parameters
    ----------
    field : str
        Name of the field.

    Returns
    -------
    Tuple[Optional[List[str]], Any]
        - If field is an Exposure: (summary list, all items via get_all())
        - Otherwise: (None, raw field value)
    """
    exposure_fields = self.__class__._exposure_fields or {}
    field_value = getattr(self, field, None)

    if field in exposure_fields:
        # Exposure field: return (summary, get_all())
        if field_value and hasattr(field_value, 'summary') and hasattr(field_value, 'get_all'):
            return (field_value.summary(), field_value.get_all())
        return ([], [])
    else:
        # Non-Exposure field: return (None, value)
        return (None, field_value)

get_details

get_details(field: str, idx: int) -> Optional[str]

Get detailed information for a LayeredExposure field item.

Only works for LayeredExposure fields. Returns None for EntireExposure.

Parameters:

Name Type Description Default
field str

Name of the Exposure field.

required
idx int

Item index within the field (0-based).

required

Returns:

Type Description
Optional[str]

Detailed information string, or None if: - Field doesn't exist - Field is not a LayeredExposure - Index is invalid

Source code in bridgic/amphibious/_context.py
def get_details(self, field: str, idx: int) -> Optional[str]:
    """
    Get detailed information for a LayeredExposure field item.

    Only works for LayeredExposure fields. Returns None for EntireExposure.

    Parameters
    ----------
    field : str
        Name of the Exposure field.
    idx : int
        Item index within the field (0-based).

    Returns
    -------
    Optional[str]
        Detailed information string, or None if:
        - Field doesn't exist
        - Field is not a LayeredExposure
        - Index is invalid
    """
    exposure_fields = self.__class__._exposure_fields or {}

    if field not in exposure_fields:
        return None

    # Only LayeredExposure supports get_details
    if exposure_fields[field].get('exposure_type') != 'layered':
        return None

    field_value = getattr(self, field, None)
    if field_value and hasattr(field_value, 'reveal'):
        return field_value.reveal(idx)

    return None

get_revealed_items

get_revealed_items() -> List[Tuple[str, int]]

Return all (field_name, index) pairs that have been revealed across all LayeredExposure fields on this context.

Returns:

Type Description
List[Tuple[str, int]]

Ordered list of (field_name, item_index) that have cached reveals.

Source code in bridgic/amphibious/_context.py
def get_revealed_items(self) -> List[Tuple[str, int]]:
    """
    Return all (field_name, index) pairs that have been revealed across
    all LayeredExposure fields on this context.

    Returns
    -------
    List[Tuple[str, int]]
        Ordered list of (field_name, item_index) that have cached reveals.
    """
    result: List[Tuple[str, int]] = []
    for fname, fval in self:
        if isinstance(fval, LayeredExposure):
            for idx in fval._revealed:
                result.append((fname, idx))
    return result

reset_revealed

reset_revealed() -> None

Reset reveal state on all LayeredExposure fields.

Equivalent to calling field.reset_revealed() on every LayeredExposure field. Use at phase boundaries so the next phase can request fresh details.

Source code in bridgic/amphibious/_context.py
def reset_revealed(self) -> None:
    """
    Reset reveal state on all LayeredExposure fields.

    Equivalent to calling field.reset_revealed() on every LayeredExposure
    field. Use at phase boundaries so the next phase can request fresh details.
    """
    for _, fval in self:
        if isinstance(fval, LayeredExposure):
            fval.reset_revealed()

Step

Bases: BaseModel

A single execution step with content, result, and metadata.

Used by: _context.py (CognitiveHistory, CognitiveContext.add_info), _amphibious_automa.py (_action, _record_trace_step)

Source code in bridgic/amphibious/_type.py
class Step(BaseModel):
    """A single execution step with content, result, and metadata.

    Used by: _context.py (CognitiveHistory, CognitiveContext.add_info),
             _amphibious_automa.py (_action, _record_trace_step)
    """
    model_config = ConfigDict(extra="forbid")

    content: str = ""
    result: Optional[Any] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)
    status: Optional[bool] = None  # Optional status flag for backward compatibility

Skill

Bases: BaseModel

A skill definition following SKILL.md format.

Used by: _context.py (CognitiveSkills)

Source code in bridgic/amphibious/_type.py
class Skill(BaseModel):
    """A skill definition following SKILL.md format.

    Used by: _context.py (CognitiveSkills)
    """
    model_config = ConfigDict(extra="forbid")

    name: str
    description: str = ""
    content: str = ""
    metadata: Dict[str, Any] = Field(default_factory=dict)

CognitiveTools

Bases: EntireExposure[ToolSpec]

Manages available tools (EntireExposure - no progressive disclosure).

All tool information is exposed in summary. Use get_all() to access the full ToolSpec list when detailed information is needed.

Source code in bridgic/amphibious/_context.py
class CognitiveTools(EntireExposure[ToolSpec]):
    """
    Manages available tools (EntireExposure - no progressive disclosure).

    All tool information is exposed in summary. Use get_all() to access
    the full ToolSpec list when detailed information is needed.
    """

    def summary(self) -> List[str]:
        """
        Generate summary strings for all tools.

        Returns
        -------
        List[str]
            Summary for each tool in format: "• {name}: {description}".
        """
        result = []
        for tool in self._items:
            desc = tool.tool_description
            result.append(f"• {tool.tool_name}: {desc}")
        return result

summary

summary() -> List[str]

Generate summary strings for all tools.

Returns:

Type Description
List[str]

Summary for each tool in format: "• {name}: {description}".

Source code in bridgic/amphibious/_context.py
def summary(self) -> List[str]:
    """
    Generate summary strings for all tools.

    Returns
    -------
    List[str]
        Summary for each tool in format: "• {name}: {description}".
    """
    result = []
    for tool in self._items:
        desc = tool.tool_description
        result.append(f"• {tool.tool_name}: {desc}")
    return result

CognitiveSkills

Bases: LayeredExposure[Skill]

Manages available skills with progressive disclosure (LayeredExposure).

Provides skill storage, summary generation (name + description), and detailed content retrieval (full SKILL.md content).

Methods:

Name Description
add

Add a skill from Skill object, file path, or SKILL.md markdown text.

add_from_markdown

Parse and add a skill from SKILL.md format.

get_by_name

Get a skill by its name.

Source code in bridgic/amphibious/_context.py
class CognitiveSkills(LayeredExposure[Skill]):
    """
    Manages available skills with progressive disclosure (LayeredExposure).

    Provides skill storage, summary generation (name + description),
    and detailed content retrieval (full SKILL.md content).

    Methods
    -------
    add(item)
        Add a skill from Skill object, file path, or SKILL.md markdown text.
    add_from_markdown(markdown_text)
        Parse and add a skill from SKILL.md format.
    get_by_name(name)
        Get a skill by its name.
    """

    def add(self, item) -> int:
        """
        Add a skill from various input types.

        Parameters
        ----------
        item : Union[Skill, str]
            - Skill object: added directly.
            - str: if the string is an existing file path, loaded via add_from_file();
              otherwise parsed as SKILL.md markdown text via add_from_markdown().

        Returns
        -------
        int
            Index of the newly added skill.

        Raises
        ------
        TypeError
            If item is not a Skill or str.
        """
        if isinstance(item, Skill):
            return super().add(item)
        elif isinstance(item, str):
            from pathlib import Path
            if Path(item).is_file():
                return self.add_from_file(item)
            return self.add_from_markdown(item)
        else:
            raise TypeError(
                f"CognitiveSkills.add() expected Skill or str, "
                f"got {type(item).__name__}"
            )

    def add_from_markdown(self, markdown_text: str) -> int:
        """
        Parse a SKILL.md file and add it as a skill.

        Parameters
        ----------
        markdown_text : str
            Full content of a SKILL.md file (YAML frontmatter + markdown).

        Returns
        -------
        int
            Index of the newly added skill.

        Raises
        ------
        ValueError
            If the markdown doesn't contain valid YAML frontmatter or required fields.
        """
        import yaml

        # Split frontmatter and content
        parts = markdown_text.split('---')
        if len(parts) < 3:
            raise ValueError("Invalid SKILL.md format: missing YAML frontmatter")

        frontmatter_text = parts[1].strip()
        content = '---'.join(parts[2:]).strip()

        # Parse YAML frontmatter
        frontmatter = yaml.safe_load(frontmatter_text)

        # Validate required fields
        if not isinstance(frontmatter, dict):
            raise ValueError("Frontmatter must be a YAML dictionary")
        if 'name' not in frontmatter:
            raise ValueError("Missing required field: name")
        if 'description' not in frontmatter:
            raise ValueError("Missing required field: description")

        # Extract fields
        name = frontmatter.pop('name')
        description = frontmatter.pop('description')
        metadata = frontmatter  # Remaining fields go to metadata

        # Create and add skill
        skill = Skill(
            name=name,
            description=description,
            content=content,
            metadata=metadata
        )
        return self.add(skill)

    def add_from_file(self, file_path: str) -> int:
        """
        Load and add a skill from a SKILL.md file.

        Parameters
        ----------
        file_path : str
            Path to the SKILL.md file.

        Returns
        -------
        int
            Index of the newly added skill.

        Raises
        ------
        FileNotFoundError
            If the file doesn't exist.
        ValueError
            If the file doesn't contain valid SKILL.md format.
        """
        from pathlib import Path

        path = Path(file_path)
        if not path.exists():
            raise FileNotFoundError(f"Skill file not found: {file_path}")

        markdown_text = path.read_text(encoding='utf-8')
        return self.add_from_markdown(markdown_text)

    def load_from_directory(self, directory: str, pattern: str = "**/SKILL.md") -> int:
        """
        Load all SKILL.md files from a directory recursively.

        Parameters
        ----------
        directory : str
            Path to the directory containing SKILL.md files.
        pattern : str, optional
            Glob pattern for finding skill files (default: "**/SKILL.md").

        Returns
        -------
        int
            Number of skills successfully loaded.
        """
        from pathlib import Path

        dir_path = Path(directory)
        if not dir_path.exists():
            raise FileNotFoundError(f"Directory not found: {directory}")

        count = 0
        for skill_file in dir_path.glob(pattern):
            self.add_from_file(str(skill_file))
            count += 1

        return count

    def summary(self) -> List[str]:
        """
        Generate summary strings for all skills.

        Returns
        -------
        List[str]
            Summary for each skill in format: "/{name} - {description}".
        """
        result = []
        for skill in self._items:
            result.append(f"{skill.name} - {skill.description}")
        return result

    def get_details(self, index: int) -> Optional[str]:
        """
        Get detailed information for a specific skill (full SKILL.md content).

        Parameters
        ----------
        index : int
            Skill index (0-based).

        Returns
        -------
        Optional[str]
            Full skill details including frontmatter and markdown content.
            Returns None if index is out of range.
        """
        if index < 0 or index >= len(self._items):
            return None

        skill = self._items[index]
        lines = [
            f"Skill: {skill.name}",
            f"Description: {skill.description}",
            ""
        ]

        # Add metadata if present
        if skill.metadata:
            lines.append("Metadata:")
            for key, value in skill.metadata.items():
                lines.append(f"  {key}: {value}")
            lines.append("")

        # Add full markdown content
        lines.append("Instructions:")
        lines.append("-" * 40)
        lines.append(skill.content)

        return "\n".join(lines)

add

add(item) -> int

Add a skill from various input types.

Parameters:

Name Type Description Default
item Union[Skill, str]
  • Skill object: added directly.
  • str: if the string is an existing file path, loaded via add_from_file(); otherwise parsed as SKILL.md markdown text via add_from_markdown().
required

Returns:

Type Description
int

Index of the newly added skill.

Raises:

Type Description
TypeError

If item is not a Skill or str.

Source code in bridgic/amphibious/_context.py
def add(self, item) -> int:
    """
    Add a skill from various input types.

    Parameters
    ----------
    item : Union[Skill, str]
        - Skill object: added directly.
        - str: if the string is an existing file path, loaded via add_from_file();
          otherwise parsed as SKILL.md markdown text via add_from_markdown().

    Returns
    -------
    int
        Index of the newly added skill.

    Raises
    ------
    TypeError
        If item is not a Skill or str.
    """
    if isinstance(item, Skill):
        return super().add(item)
    elif isinstance(item, str):
        from pathlib import Path
        if Path(item).is_file():
            return self.add_from_file(item)
        return self.add_from_markdown(item)
    else:
        raise TypeError(
            f"CognitiveSkills.add() expected Skill or str, "
            f"got {type(item).__name__}"
        )

add_from_markdown

add_from_markdown(markdown_text: str) -> int

Parse a SKILL.md file and add it as a skill.

Parameters:

Name Type Description Default
markdown_text str

Full content of a SKILL.md file (YAML frontmatter + markdown).

required

Returns:

Type Description
int

Index of the newly added skill.

Raises:

Type Description
ValueError

If the markdown doesn't contain valid YAML frontmatter or required fields.

Source code in bridgic/amphibious/_context.py
def add_from_markdown(self, markdown_text: str) -> int:
    """
    Parse a SKILL.md file and add it as a skill.

    Parameters
    ----------
    markdown_text : str
        Full content of a SKILL.md file (YAML frontmatter + markdown).

    Returns
    -------
    int
        Index of the newly added skill.

    Raises
    ------
    ValueError
        If the markdown doesn't contain valid YAML frontmatter or required fields.
    """
    import yaml

    # Split frontmatter and content
    parts = markdown_text.split('---')
    if len(parts) < 3:
        raise ValueError("Invalid SKILL.md format: missing YAML frontmatter")

    frontmatter_text = parts[1].strip()
    content = '---'.join(parts[2:]).strip()

    # Parse YAML frontmatter
    frontmatter = yaml.safe_load(frontmatter_text)

    # Validate required fields
    if not isinstance(frontmatter, dict):
        raise ValueError("Frontmatter must be a YAML dictionary")
    if 'name' not in frontmatter:
        raise ValueError("Missing required field: name")
    if 'description' not in frontmatter:
        raise ValueError("Missing required field: description")

    # Extract fields
    name = frontmatter.pop('name')
    description = frontmatter.pop('description')
    metadata = frontmatter  # Remaining fields go to metadata

    # Create and add skill
    skill = Skill(
        name=name,
        description=description,
        content=content,
        metadata=metadata
    )
    return self.add(skill)

add_from_file

add_from_file(file_path: str) -> int

Load and add a skill from a SKILL.md file.

Parameters:

Name Type Description Default
file_path str

Path to the SKILL.md file.

required

Returns:

Type Description
int

Index of the newly added skill.

Raises:

Type Description
FileNotFoundError

If the file doesn't exist.

ValueError

If the file doesn't contain valid SKILL.md format.

Source code in bridgic/amphibious/_context.py
def add_from_file(self, file_path: str) -> int:
    """
    Load and add a skill from a SKILL.md file.

    Parameters
    ----------
    file_path : str
        Path to the SKILL.md file.

    Returns
    -------
    int
        Index of the newly added skill.

    Raises
    ------
    FileNotFoundError
        If the file doesn't exist.
    ValueError
        If the file doesn't contain valid SKILL.md format.
    """
    from pathlib import Path

    path = Path(file_path)
    if not path.exists():
        raise FileNotFoundError(f"Skill file not found: {file_path}")

    markdown_text = path.read_text(encoding='utf-8')
    return self.add_from_markdown(markdown_text)

load_from_directory

load_from_directory(
    directory: str, pattern: str = "**/SKILL.md"
) -> int

Load all SKILL.md files from a directory recursively.

Parameters:

Name Type Description Default
directory str

Path to the directory containing SKILL.md files.

required
pattern str

Glob pattern for finding skill files (default: "**/SKILL.md").

'**/SKILL.md'

Returns:

Type Description
int

Number of skills successfully loaded.

Source code in bridgic/amphibious/_context.py
def load_from_directory(self, directory: str, pattern: str = "**/SKILL.md") -> int:
    """
    Load all SKILL.md files from a directory recursively.

    Parameters
    ----------
    directory : str
        Path to the directory containing SKILL.md files.
    pattern : str, optional
        Glob pattern for finding skill files (default: "**/SKILL.md").

    Returns
    -------
    int
        Number of skills successfully loaded.
    """
    from pathlib import Path

    dir_path = Path(directory)
    if not dir_path.exists():
        raise FileNotFoundError(f"Directory not found: {directory}")

    count = 0
    for skill_file in dir_path.glob(pattern):
        self.add_from_file(str(skill_file))
        count += 1

    return count

summary

summary() -> List[str]

Generate summary strings for all skills.

Returns:

Type Description
List[str]

Summary for each skill in format: "/{name} - {description}".

Source code in bridgic/amphibious/_context.py
def summary(self) -> List[str]:
    """
    Generate summary strings for all skills.

    Returns
    -------
    List[str]
        Summary for each skill in format: "/{name} - {description}".
    """
    result = []
    for skill in self._items:
        result.append(f"{skill.name} - {skill.description}")
    return result

get_details

get_details(index: int) -> Optional[str]

Get detailed information for a specific skill (full SKILL.md content).

Parameters:

Name Type Description Default
index int

Skill index (0-based).

required

Returns:

Type Description
Optional[str]

Full skill details including frontmatter and markdown content. Returns None if index is out of range.

Source code in bridgic/amphibious/_context.py
def get_details(self, index: int) -> Optional[str]:
    """
    Get detailed information for a specific skill (full SKILL.md content).

    Parameters
    ----------
    index : int
        Skill index (0-based).

    Returns
    -------
    Optional[str]
        Full skill details including frontmatter and markdown content.
        Returns None if index is out of range.
    """
    if index < 0 or index >= len(self._items):
        return None

    skill = self._items[index]
    lines = [
        f"Skill: {skill.name}",
        f"Description: {skill.description}",
        ""
    ]

    # Add metadata if present
    if skill.metadata:
        lines.append("Metadata:")
        for key, value in skill.metadata.items():
            lines.append(f"  {key}: {value}")
        lines.append("")

    # Add full markdown content
    lines.append("Instructions:")
    lines.append("-" * 40)
    lines.append(skill.content)

    return "\n".join(lines)

CognitiveHistory

Bases: LayeredExposure[Step]

Manages execution history with layered memory architecture.

Memory layers: - Working memory (most recent N steps): Full details displayed directly - Short-term memory (next N steps): Summary only, details available on request - Long-term memory (older steps): - Pending buffer: brief summaries, waiting for batch compression - Compressed summary: LLM-generated concise summary

Compression is batched: LLM is only called when the pending buffer reaches compress_threshold, reducing LLM calls by a factor of compress_threshold.

Parameters:

Name Type Description Default
working_memory_size int

Number of recent steps to show with full details (default: 5).

5
short_term_size int

Number of steps to show as summary before working memory (default: 20).

20
compress_threshold int

Number of pending long-term steps to accumulate before triggering one batch LLM compression (default: 5).

10
Source code in bridgic/amphibious/_context.py
class CognitiveHistory(LayeredExposure[Step]):
    """
    Manages execution history with layered memory architecture.

    Memory layers:
    - Working memory (most recent N steps): Full details displayed directly
    - Short-term memory (next N steps): Summary only, details available on request
    - Long-term memory (older steps):
        - Pending buffer: brief summaries, waiting for batch compression
        - Compressed summary: LLM-generated concise summary

    Compression is batched: LLM is only called when the pending buffer reaches
    compress_threshold, reducing LLM calls by a factor of compress_threshold.

    Parameters
    ----------
    working_memory_size : int
        Number of recent steps to show with full details (default: 5).
    short_term_size : int
        Number of steps to show as summary before working memory (default: 20).
    compress_threshold : int
        Number of pending long-term steps to accumulate before triggering
        one batch LLM compression (default: 5).
    """

    def __init__(
        self,
        working_memory_size: int = 5,
        short_term_size: int = 20,
        compress_threshold: int = 10,
    ):
        super().__init__()
        self.working_memory_size = working_memory_size
        self.short_term_size = short_term_size
        self.compress_threshold = compress_threshold

        # Compression state
        self.compressed_summary: str = ""
        self.compressed_count: int = 0

    def add(self, item: Step) -> int:
        """
        Add a step to history.

        Parameters
        ----------
        item : Step
            The step to add.

        Returns
        -------
        int
            Index of the newly added step.
        """
        index = super().add(item)
        return index

    async def compress_if_needed(self) -> bool:
        """
        Compress old history if needed.

        Call this after adding steps to check and perform compression.
        Requires LLM to be set via set_llm().

        Returns
        -------
        bool
            True if compression was performed, False otherwise.
        """
        if not self.needs_compression():
            return False

        if self._llm is None:
            return False

        await self._do_compress()
        return True

    def needs_compression(self) -> bool:
        """Check if pending long-term steps have reached the compression threshold."""
        return len(self._get_steps_to_compress()) >= self.compress_threshold

    def _get_steps_to_compress(self) -> List[Step]:
        """Get steps that should be compressed."""
        total = len(self._items)
        working_start = max(0, total - self.working_memory_size)
        short_term_start = max(0, working_start - self.short_term_size)

        # Steps to compress: from compressed_count to short_term_start
        if short_term_start > self.compressed_count:
            return self._items[self.compressed_count:short_term_start]
        return []

    async def _do_compress(self) -> None:
        """Perform compression using LLM."""
        steps_to_compress = self._get_steps_to_compress()
        if not steps_to_compress:
            return

        # Build compression prompt
        formatted_steps = self._format_steps_for_compression(steps_to_compress)

        system_prompt = (
            "You are a history compression assistant. Compress the execution history "
            "into a concise summary while preserving critical information.\n"
            "- Keep key data (IDs, numbers, names) that may be needed later\n"
            "- Note any failed attempts\n"
            "- Integrate with existing summary if present\n"
            "- Output a single concise paragraph"
        )

        user_parts = []
        if self.compressed_summary:
            user_parts.append(f"Existing Summary: {self.compressed_summary}")
        user_parts.append(f"New Steps:\n{formatted_steps}")
        user_parts.append("Compressed summary:")
        user_prompt = "\n\n".join(user_parts)

        # Call LLM
        new_summary = await self._llm.agenerate(
            messages=[
                Message.from_text(text=system_prompt, role="system"),
                Message.from_text(text=user_prompt, role="user")
            ]
        )

        # Update compression state
        self.compressed_summary = new_summary
        self.compressed_count += len(steps_to_compress)

    def _format_steps_for_compression(self, steps: List[Step]) -> str:
        """Format steps for compression prompt."""
        lines = []
        for i, step in enumerate(steps):
            lines.append(f"{i+1}. {step.content}")
            if step.result:
                result_str = str(step.result)[:200]
                lines.append(f"   Result: {result_str}")
        return "\n".join(lines)

    def _format_step_detail(self, step: Step, max_result_len: int = 500) -> str:
        """Format a step with full details for working memory display."""
        lines = [step.content]

        if step.result is not None:
            result_str = str(step.result)
            if len(result_str) > max_result_len:
                result_str = result_str[:max_result_len] + "..."
            lines.append(f"   Result: {result_str}")

        return "\n".join(lines)

    def _format_step_summary(self, step: Step) -> str:
        """Format a step as brief summary for short-term memory display."""
        return step.content

    def summary(self) -> List[str]:
        """
        Generate layered summary.

        Returns
        -------
        List[str]
            Formatted strings for each memory layer:
            - Long-term compressed: LLM-generated summary (if exists)
            - Long-term pending: brief summaries of steps awaiting compression
            - Short-term: step summaries with indices (queryable)
            - Working: full step details with indices
        """
        result = []
        total = len(self._items)

        # Calculate boundaries
        working_start = max(0, total - self.working_memory_size)
        short_term_start = max(0, working_start - self.short_term_size)

        # 1. Long-term memory: compressed summary
        if self.compressed_summary:
            result.append(f"[History Summary] {self.compressed_summary}")

        # 2. Long-term memory: pending (uncompressed, awaiting batch compression)
        if self.compressed_count < short_term_start:
            result.append(f"[Long-term Pending ({self.compressed_count}-{short_term_start - 1})]")
            for i in range(self.compressed_count, short_term_start):
                step = self._items[i]
                summary = self._format_step_summary(step)
                result.append(f"  [{i}] {summary}")

        # 3. Short-term memory: summary only, queryable for details
        if short_term_start < working_start:
            result.append(f"[Short-term Memory ({short_term_start}-{working_start - 1}), query details via 'details']")
            for i in range(short_term_start, working_start):
                step = self._items[i]
                summary = self._format_step_summary(step)
                result.append(f"  [{i}] {summary}")

        # 4. Working memory: full details
        if working_start < total:
            result.append(f"[Working Memory ({working_start}-{total - 1})]")
            for i in range(working_start, total):
                step = self._items[i]
                detail = self._format_step_detail(step)
                result.append(f"  [{i}] {detail}")

        return result

    def get_details(self, index: int) -> Optional[str]:
        """
        Get detailed information for a specific step.

        Parameters
        ----------
        index : int
            Step index (0-based).

        Returns
        -------
        Optional[str]
            Formatted step details, or None if index is out of range.
        """
        if index < 0 or index >= len(self._items):
            return None

        step = self._items[index]
        lines = [
            f"Content: {step.content}",
        ]

        if step.result is not None:
            result_str = str(step.result)
            lines.append(f"Result: {result_str}")

        if step.metadata:
            lines.append("Metadata:")
            for key, value in step.metadata.items():
                val_str = str(value)
                if len(val_str) > 150:
                    val_str = val_str[:150] + "..."
                lines.append(f"  {key}: {val_str}")

        return "\n".join(lines)

add

add(item: Step) -> int

Add a step to history.

Parameters:

Name Type Description Default
item Step

The step to add.

required

Returns:

Type Description
int

Index of the newly added step.

Source code in bridgic/amphibious/_context.py
def add(self, item: Step) -> int:
    """
    Add a step to history.

    Parameters
    ----------
    item : Step
        The step to add.

    Returns
    -------
    int
        Index of the newly added step.
    """
    index = super().add(item)
    return index

compress_if_needed

async
compress_if_needed() -> bool

Compress old history if needed.

Call this after adding steps to check and perform compression. Requires LLM to be set via set_llm().

Returns:

Type Description
bool

True if compression was performed, False otherwise.

Source code in bridgic/amphibious/_context.py
async def compress_if_needed(self) -> bool:
    """
    Compress old history if needed.

    Call this after adding steps to check and perform compression.
    Requires LLM to be set via set_llm().

    Returns
    -------
    bool
        True if compression was performed, False otherwise.
    """
    if not self.needs_compression():
        return False

    if self._llm is None:
        return False

    await self._do_compress()
    return True

needs_compression

needs_compression() -> bool

Check if pending long-term steps have reached the compression threshold.

Source code in bridgic/amphibious/_context.py
def needs_compression(self) -> bool:
    """Check if pending long-term steps have reached the compression threshold."""
    return len(self._get_steps_to_compress()) >= self.compress_threshold

summary

summary() -> List[str]

Generate layered summary.

Returns:

Type Description
List[str]

Formatted strings for each memory layer: - Long-term compressed: LLM-generated summary (if exists) - Long-term pending: brief summaries of steps awaiting compression - Short-term: step summaries with indices (queryable) - Working: full step details with indices

Source code in bridgic/amphibious/_context.py
def summary(self) -> List[str]:
    """
    Generate layered summary.

    Returns
    -------
    List[str]
        Formatted strings for each memory layer:
        - Long-term compressed: LLM-generated summary (if exists)
        - Long-term pending: brief summaries of steps awaiting compression
        - Short-term: step summaries with indices (queryable)
        - Working: full step details with indices
    """
    result = []
    total = len(self._items)

    # Calculate boundaries
    working_start = max(0, total - self.working_memory_size)
    short_term_start = max(0, working_start - self.short_term_size)

    # 1. Long-term memory: compressed summary
    if self.compressed_summary:
        result.append(f"[History Summary] {self.compressed_summary}")

    # 2. Long-term memory: pending (uncompressed, awaiting batch compression)
    if self.compressed_count < short_term_start:
        result.append(f"[Long-term Pending ({self.compressed_count}-{short_term_start - 1})]")
        for i in range(self.compressed_count, short_term_start):
            step = self._items[i]
            summary = self._format_step_summary(step)
            result.append(f"  [{i}] {summary}")

    # 3. Short-term memory: summary only, queryable for details
    if short_term_start < working_start:
        result.append(f"[Short-term Memory ({short_term_start}-{working_start - 1}), query details via 'details']")
        for i in range(short_term_start, working_start):
            step = self._items[i]
            summary = self._format_step_summary(step)
            result.append(f"  [{i}] {summary}")

    # 4. Working memory: full details
    if working_start < total:
        result.append(f"[Working Memory ({working_start}-{total - 1})]")
        for i in range(working_start, total):
            step = self._items[i]
            detail = self._format_step_detail(step)
            result.append(f"  [{i}] {detail}")

    return result

get_details

get_details(index: int) -> Optional[str]

Get detailed information for a specific step.

Parameters:

Name Type Description Default
index int

Step index (0-based).

required

Returns:

Type Description
Optional[str]

Formatted step details, or None if index is out of range.

Source code in bridgic/amphibious/_context.py
def get_details(self, index: int) -> Optional[str]:
    """
    Get detailed information for a specific step.

    Parameters
    ----------
    index : int
        Step index (0-based).

    Returns
    -------
    Optional[str]
        Formatted step details, or None if index is out of range.
    """
    if index < 0 or index >= len(self._items):
        return None

    step = self._items[index]
    lines = [
        f"Content: {step.content}",
    ]

    if step.result is not None:
        result_str = str(step.result)
        lines.append(f"Result: {result_str}")

    if step.metadata:
        lines.append("Metadata:")
        for key, value in step.metadata.items():
            val_str = str(value)
            if len(val_str) > 150:
                val_str = val_str[:150] + "..."
            lines.append(f"  {key}: {val_str}")

    return "\n".join(lines)

CognitiveContext

Bases: Context

Default cognitive context providing all fields needed by CognitiveWorker.

Combines goal, tools, skills, execution history, and observation into a unified context. Users can extend this class to add custom fields.

Attributes:

Name Type Description
goal str

The goal to achieve.

tools CognitiveTools

Available tools (EntireExposure — summary only, no per-item details).

skills CognitiveSkills

Available skills (LayeredExposure — supports progressive disclosure).

cognitive_history CognitiveHistory

History of cognitive steps (LayeredExposure — supports progressive disclosure).

observation Optional[str]

Current observation from the last observation phase (hidden from summary).

Examples:

1
2
3
4
>>> ctx = CognitiveContext(goal="Complete task")
>>> ctx.tools.add(tool_spec)  # Add tools
>>> ctx.skills.add_from_file("skills/travel-planning/SKILL.md")  # Add skills
>>> ctx.add_info(Step(content="Step 1", status=True))
Source code in bridgic/amphibious/_context.py
class CognitiveContext(Context):
    """
    Default cognitive context providing all fields needed by CognitiveWorker.

    Combines goal, tools, skills, execution history, and observation into
    a unified context. Users can extend this class to add custom fields.

    Attributes
    ----------
    goal : str
        The goal to achieve.
    tools : CognitiveTools
        Available tools (EntireExposure — summary only, no per-item details).
    skills : CognitiveSkills
        Available skills (LayeredExposure — supports progressive disclosure).
    cognitive_history : CognitiveHistory
        History of cognitive steps (LayeredExposure — supports progressive disclosure).
    observation : Optional[str]
        Current observation from the last observation phase (hidden from summary).

    Examples
    --------
    >>> ctx = CognitiveContext(goal="Complete task")
    >>> ctx.tools.add(tool_spec)  # Add tools
    >>> ctx.skills.add_from_file("skills/travel-planning/SKILL.md")  # Add skills
    >>> ctx.add_info(Step(content="Step 1", status=True))
    """
    model_config = ConfigDict(arbitrary_types_allowed=True)

    goal: str = Field(default="", description="The goal to achieve")
    tools: CognitiveTools = Field(default_factory=CognitiveTools, description="Available tools")
    skills: CognitiveSkills = Field(default_factory=CognitiveSkills, description="Available skills")
    cognitive_history: CognitiveHistory = Field(
        default_factory=CognitiveHistory,
        description="History of cognitive steps with layered memory",
        json_schema_extra={"use_llm": True},
    )

    # observation: saved from _observation worker method (not displayed in summary)
    observation: Optional[str] = Field(
        default=None,
        json_schema_extra={"display": False},
        description="Current observation from the last observation phase"
    )

    def summary(self) -> Dict[str, str]:
        """
        Generate a summary dictionary with formatted strings for each field.

        Returns a dictionary where each key is a field name and each value is
        a formatted string ready for prompt inclusion. Includes previously
        disclosed details from all LayeredExposure fields.

        Returns
        -------
        Dict[str, str]
            Field name to formatted summary string mapping:
            - goal: "Goal: {goal}"
            - tools: formatted tool list
            - skills: formatted skill list with indices
            - cognitive_history: formatted history with indices
            - disclosed_details: previously disclosed details (if any)
        """
        result = super().summary()

        # Format goal
        result['goal'] = f"Goal: {self.goal}"

        # Format tools (EntireExposure - no detail queries)
        if len(self.tools) > 0:
            lines = ["Available Tools:"]
            for tool_summary in self.tools.summary():
                lines.append(f"  {tool_summary}")
            result['tools'] = "\n".join(lines)

        # Format skills (LayeredExposure - with indices for detail queries)
        if len(self.skills) > 0:
            lines = ["Available Skills (request details via details: {field: 'skills', index: N}):"]
            for i, skill_summary in enumerate(self.skills.summary()):
                lines.append(f"  [{i}] {skill_summary}")
            result['skills'] = "\n".join(lines)

        # Format history (layered memory architecture)
        if len(self.cognitive_history) > 0:
            total = len(self.cognitive_history)
            working_start = max(0, total - self.cognitive_history.working_memory_size)
            short_term_start = max(0, working_start - self.cognitive_history.short_term_size)

            lines = ["Execution History:"]
            if self.cognitive_history.compressed_summary:
                lines.append("  (Older history compressed into summary)")
            if short_term_start < working_start:
                lines.append(f"  (Steps [{short_term_start}-{working_start-1}]: summary only, query details via details)")

            # history.summary() already returns formatted layered output
            for summary_line in self.cognitive_history.summary():
                lines.append(f"  {summary_line}")

            result['cognitive_history'] = "\n".join(lines)
        else:
            result['cognitive_history'] = "Execution History: (none)"

        # Format disclosed details from LayeredExposure fields' _revealed dicts
        disclosed_lines = ["Previously Disclosed Details:"]
        has_disclosed = False
        for fname, fval in self:
            if not isinstance(fval, LayeredExposure):
                continue
            for idx, detail in fval._revealed.items():
                disclosed_lines.append(f"\n[{fname}[{idx}]]:\n{detail}")
                has_disclosed = True
        if has_disclosed:
            result['disclosed_details'] = "\n".join(disclosed_lines)

        return result

    def add_info(self, info: Step) -> int:
        """
        Add an execution step to history.

        Parameters
        ----------
        info : Step
            The step to add.

        Returns
        -------
        int
            Index of the added step.
        """
        return self.cognitive_history.add(info)

summary

summary() -> Dict[str, str]

Generate a summary dictionary with formatted strings for each field.

Returns a dictionary where each key is a field name and each value is a formatted string ready for prompt inclusion. Includes previously disclosed details from all LayeredExposure fields.

Returns:

Type Description
Dict[str, str]

Field name to formatted summary string mapping: - goal: "Goal: {goal}" - tools: formatted tool list - skills: formatted skill list with indices - cognitive_history: formatted history with indices - disclosed_details: previously disclosed details (if any)

Source code in bridgic/amphibious/_context.py
def summary(self) -> Dict[str, str]:
    """
    Generate a summary dictionary with formatted strings for each field.

    Returns a dictionary where each key is a field name and each value is
    a formatted string ready for prompt inclusion. Includes previously
    disclosed details from all LayeredExposure fields.

    Returns
    -------
    Dict[str, str]
        Field name to formatted summary string mapping:
        - goal: "Goal: {goal}"
        - tools: formatted tool list
        - skills: formatted skill list with indices
        - cognitive_history: formatted history with indices
        - disclosed_details: previously disclosed details (if any)
    """
    result = super().summary()

    # Format goal
    result['goal'] = f"Goal: {self.goal}"

    # Format tools (EntireExposure - no detail queries)
    if len(self.tools) > 0:
        lines = ["Available Tools:"]
        for tool_summary in self.tools.summary():
            lines.append(f"  {tool_summary}")
        result['tools'] = "\n".join(lines)

    # Format skills (LayeredExposure - with indices for detail queries)
    if len(self.skills) > 0:
        lines = ["Available Skills (request details via details: {field: 'skills', index: N}):"]
        for i, skill_summary in enumerate(self.skills.summary()):
            lines.append(f"  [{i}] {skill_summary}")
        result['skills'] = "\n".join(lines)

    # Format history (layered memory architecture)
    if len(self.cognitive_history) > 0:
        total = len(self.cognitive_history)
        working_start = max(0, total - self.cognitive_history.working_memory_size)
        short_term_start = max(0, working_start - self.cognitive_history.short_term_size)

        lines = ["Execution History:"]
        if self.cognitive_history.compressed_summary:
            lines.append("  (Older history compressed into summary)")
        if short_term_start < working_start:
            lines.append(f"  (Steps [{short_term_start}-{working_start-1}]: summary only, query details via details)")

        # history.summary() already returns formatted layered output
        for summary_line in self.cognitive_history.summary():
            lines.append(f"  {summary_line}")

        result['cognitive_history'] = "\n".join(lines)
    else:
        result['cognitive_history'] = "Execution History: (none)"

    # Format disclosed details from LayeredExposure fields' _revealed dicts
    disclosed_lines = ["Previously Disclosed Details:"]
    has_disclosed = False
    for fname, fval in self:
        if not isinstance(fval, LayeredExposure):
            continue
        for idx, detail in fval._revealed.items():
            disclosed_lines.append(f"\n[{fname}[{idx}]]:\n{detail}")
            has_disclosed = True
    if has_disclosed:
        result['disclosed_details'] = "\n".join(disclosed_lines)

    return result

add_info

add_info(info: Step) -> int

Add an execution step to history.

Parameters:

Name Type Description Default
info Step

The step to add.

required

Returns:

Type Description
int

Index of the added step.

Source code in bridgic/amphibious/_context.py
def add_info(self, info: Step) -> int:
    """
    Add an execution step to history.

    Parameters
    ----------
    info : Step
        The step to add.

    Returns
    -------
    int
        Index of the added step.
    """
    return self.cognitive_history.add(info)

CognitiveWorker

Bases: GraphAutoma

Cognitive worker: pure thinking unit of an agent.

A CognitiveWorker represents one thinking cycle and is responsible for "what to think and how to think". Observation and action execution are handled by AmphibiousAutoma as shared infrastructure.

Subclass and override template methods to customize behavior.

Parameters:

Name Type Description Default
llm Optional[BaseLlm]

LLM used for thinking. Can be None if the agent will inject one via set_llm().

None
enable_rehearsal bool

Enable rehearsal policy (predict tool execution outcomes). Default is False.

False
enable_reflection bool

Enable reflection policy (assess information quality). Default is False.

False
verbose bool

Enable logging of thinking process. Default is False.

None
verbose_prompt bool

Enable logging of full prompts sent to LLM. Default is False.

None
Class Attributes

output_schema : Optional[Type[BaseModel]] If set, the worker produces a typed Pydantic instance directly using the output_schema as the LLM constraint. The agent's action() phase is skipped entirely. await think_unit returns the typed instance. Policy rounds (rehearsal/reflection) still run if enabled.

Template Methods (override in subclasses)

thinking() -> str Return the thinking prompt (how to decide next steps). Must be implemented.

build_messages(think_prompt, tools_description, output_instructions, context_info) Assemble the final messages for the thinking phase. Returns List[Message].

observation(context, default_observation) -> Union[str, _DELEGATE] Enhance or customize observation. Default returns _DELEGATE. Return _DELEGATE to delegate observation to AmphibiousAutoma.

before_action(decision_result, context) -> Any Verify/adjust the decision before execution. Called by AmphibiousAutoma._action().

Examples:

>>> class ReactWorker(CognitiveWorker):
...     def __init__(self, llm):
...         super().__init__(llm, enable_rehearsal=True)
...
...     async def thinking(self):
...         return "Plan ONE immediate next step with appropriate tools."
...
>>> class PlannerWorker(CognitiveWorker):
...     output_schema = PlanningResult  # skip tool loop, return typed instance
...
...     async def thinking(self):
...         return "Analyze the goal and produce a phased execution plan."
Source code in bridgic/amphibious/_cognitive_worker.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
class CognitiveWorker(GraphAutoma):
    """
    Cognitive worker: pure thinking unit of an agent.

    A CognitiveWorker represents one thinking cycle and is responsible for
    "what to think and how to think". Observation and action execution are
    handled by AmphibiousAutoma as shared infrastructure.

    Subclass and override template methods to customize behavior.

    Parameters
    ----------
    llm : Optional[BaseLlm]
        LLM used for thinking. Can be None if the agent will inject one via set_llm().
    enable_rehearsal : bool, optional
        Enable rehearsal policy (predict tool execution outcomes). Default is False.
    enable_reflection : bool, optional
        Enable reflection policy (assess information quality). Default is False.
    verbose : bool, optional
        Enable logging of thinking process. Default is False.
    verbose_prompt : bool, optional
        Enable logging of full prompts sent to LLM. Default is False.

    Class Attributes
    ----------------
    output_schema : Optional[Type[BaseModel]]
        If set, the worker produces a typed Pydantic instance directly using the
        output_schema as the LLM constraint. The agent's action() phase is skipped
        entirely. ``await think_unit`` returns the typed instance.
        Policy rounds (rehearsal/reflection) still run if enabled.

    Template Methods (override in subclasses)
    -----------------------------------------
    thinking() -> str
        Return the thinking prompt (how to decide next steps). Must be implemented.

    build_messages(think_prompt, tools_description, output_instructions, context_info)
        Assemble the final messages for the thinking phase. Returns List[Message].

    observation(context, default_observation) -> Union[str, _DELEGATE]
        Enhance or customize observation. Default returns _DELEGATE.
        Return _DELEGATE to delegate observation to AmphibiousAutoma.

    before_action(decision_result, context) -> Any
        Verify/adjust the decision before execution. Called by AmphibiousAutoma._action().

    Examples
    --------
    >>> class ReactWorker(CognitiveWorker):
    ...     def __init__(self, llm):
    ...         super().__init__(llm, enable_rehearsal=True)
    ...
    ...     async def thinking(self):
    ...         return "Plan ONE immediate next step with appropriate tools."
    ...
    >>> class PlannerWorker(CognitiveWorker):
    ...     output_schema = PlanningResult  # skip tool loop, return typed instance
    ...
    ...     async def thinking(self):
    ...         return "Analyze the goal and produce a phased execution plan."
    """

    # Class-level cache: (enable_rehearsal, enable_reflection, enable_acquiring, output_schema) → model
    _think_model_cache: Dict[Tuple, Type[BaseModel]] = {}

    # Subclasses set this to a Pydantic model to produce typed output directly
    output_schema: Optional[Type[BaseModel]] = None

    def __init__(
        self,
        llm: Optional[BaseLlm] = None,
        enable_rehearsal: bool = False,
        enable_reflection: bool = False,
        verbose: Optional[bool] = None,
        verbose_prompt: Optional[bool] = None,
        output_schema: Optional[Type[BaseModel]] = None,
    ):
        super().__init__()
        self._llm = llm

        # Policy flags
        self.enable_rehearsal = enable_rehearsal
        self.enable_reflection = enable_reflection

        # Instance-level output_schema overrides the class attribute when provided
        if output_schema is not None:
            self.output_schema = output_schema

        # Logging runtime (None = inherit from AmphibiousAutoma)
        self._verbose = verbose
        self._verbose_prompt = verbose_prompt

        # Usage stats
        self.spent_tokens = 0
        self.spent_time = 0

    def set_llm(self, llm: BaseLlm) -> None:
        """
        Set the LLM used for thinking and tool selection.

        Parameters
        ----------
        llm : BaseLlm
            LLM instance to use. Replaces any previously set LLM.
        """
        self._llm = llm

    @classmethod
    def _create_think_model(
        cls,
        enable_rehearsal: bool = False,
        enable_reflection: bool = False,
        enable_acquiring: bool = True,
        output_schema: Optional[Type[BaseModel]] = None,
    ) -> Type[BaseModel]:
        """
        Unified factory for all ThinkModel variants.

        Builds and caches a Pydantic model with _ThinkBase as its base, adding
        fields dynamically:
        - output: List[StepToolCall] when output_schema is None, else Optional[output_schema]
        - details: only when enable_acquiring=True
        - rehearsal: only when enable_rehearsal=True
        - reflection: only when enable_reflection=True

        All variants are cached by (enable_rehearsal, enable_reflection,
        enable_acquiring, output_schema).
        """
        key = (enable_rehearsal, enable_reflection, enable_acquiring, output_schema)
        if key in cls._think_model_cache:
            return cls._think_model_cache[key]

        extra_fields: Dict[str, Any] = {}

        # output field — type depends on output_schema
        if output_schema is None:
            extra_fields['output'] = (
                Annotated[List[StepToolCall], BeforeValidator(_coerce_none_to_list)],
                Field(
                    default_factory=list,
                    description="Tool calls to execute as [{tool, tool_arguments: [{name: 'param_name', value: 'param_value'}]}]"
                )
            )
        else:
            extra_fields['output'] = (
                Optional[output_schema],
                Field(default=None, description="Structured result matching the required output schema.")
            )

        # Information that can be requested for progressive disclosure to assist in decision-making.
        if enable_acquiring:
            extra_fields['details'] = (
                Annotated[List[DetailRequest], BeforeValidator(_coerce_none_to_list)],
                Field(
                    default_factory=list,
                    description="Request details before deciding. Example: [{field: 'cognitive_history', index: 0}]"
                )
            )

        # Can predict what will happen after the operation and assist in decision-making.
        if enable_rehearsal:
            extra_fields['rehearsal'] = (
                Optional[str],
                Field(
                    default=None,
                    description="(Optional) Predict what will happen if you execute the planned tools. What results will they return? Any potential issues?"
                )
            )

        # Can reflect on the current situation to assist in decision-making.
        if enable_reflection:
            extra_fields['reflection'] = (
                Optional[str],
                Field(
                    default=None,
                    description="(Optional) Assess information quality. Is it sufficient? Any contradictions or gaps?"
                )
            )

        model = create_model('ThinkModel', __base__=_ThinkBase, **extra_fields)
        cls._think_model_cache[key] = model
        return model

    @property
    def _ThinkResultModel(self) -> Type[BaseModel]:
        """Model for normal rounds: acquiring open, all enabled policies active."""
        return self._create_think_model(
            enable_rehearsal=self.enable_rehearsal,
            enable_reflection=self.enable_reflection,
            enable_acquiring=True,
            output_schema=None,
        )

    @property
    def _ThinkDecisionModel(self) -> Type[BaseModel]:
        """Model for forced-decision rounds: all policy operators closed."""
        return self._create_think_model(
            enable_rehearsal=False,
            enable_reflection=False,
            enable_acquiring=False,
            output_schema=self.output_schema,
        )


    ############################################################################
    # Core methods
    ############################################################################

    @worker(is_start=True, is_output=True)
    async def _thinking(self, context: CognitiveContext) -> Any:
        """
        Thinking phase: decide what to do next (thinking + tool selection in one call).

        Reads observation from context.observation (set by AmphibiousAutoma.run() before
        calling arun). Returns the decision directly; AmphibiousAutoma.run() reads the
        arun() return value (no side-channel via _last_decision).
        """
        if not isinstance(context, CognitiveContext):
            raise TypeError(
                f"Expected CognitiveContext, got {type(context).__name__}. "
                "CognitiveWorker requires CognitiveContext or its subclass."
            )
        if self._llm is None:
            raise RuntimeError(
                "CognitiveWorker has no LLM set. Either pass llm= in __init__ "
                "or use set_llm() before running."
            )

        observation = context.observation
        return await self._run_thinking(observation, context)

    async def _run_thinking(self, observation: Optional[str], context: CognitiveContext) -> Any:
        """Thinking phase with cognitive policies support. Returns the final decision."""

        ###############################
        # Call the LLM to get the decision
        ###############################

        # Get the custom thinking prompt from the worker's thinking() method.
        think_prompt = await self.thinking()

        # Accumulated policy outputs injected into subsequent rounds' prompts
        policy_context = []

        # Per-operator open flags — each fires at most once then closes
        # acquiring is a built-in framework capability, disabled when output_schema is set
        acquiring_open = (self.output_schema is None)
        rehearsal_open = self.enable_rehearsal
        reflection_open = self.enable_reflection

        while True:
            # Build messages
            messages = await self._build_messages(
                think_prompt=think_prompt,
                context=context,
                observation=observation,
                policy_context=policy_context,
                acquiring_open=acquiring_open,
                rehearsal_open=rehearsal_open,
                reflection_open=reflection_open,
            )
            # Build model from current per-operator state
            model = self._create_think_model(
                enable_rehearsal=rehearsal_open,
                enable_reflection=reflection_open,
                enable_acquiring=acquiring_open,
                output_schema=self.output_schema,
            )
            # LLM call
            think_result = await self._llm.astructured_output(
                messages=messages,
                constraint=PydanticModel(model=model)
            )
            self._log_prompt("Think", messages)

            ###############################
            # Analyze the LLM response and decide what to do next
            ###############################

            re_think = False  # Whether any operator was activated this round

            # Operator 1: acquiring — fetch external details (one-shot, only when non-empty)
            if acquiring_open:
                reqs = getattr(think_result, 'details', None) or []
                if reqs:
                    for req in reqs:
                        context.get_details(req.field, req.index)
                    acquiring_open = False  # close — never fires again
                    re_think = True

            # Operator 2: rehearsal — inject prediction into next round (one-shot, only when filled)
            if rehearsal_open:
                rehearsal_val = getattr(think_result, 'rehearsal', None)
                if rehearsal_val is not None:
                    policy_context.append(f"## Mental Simulation (Rehearsal):\n{rehearsal_val}")
                    rehearsal_open = False  # close
                    re_think = True

            # Operator 3: reflection — inject quality assessment into next round (one-shot, only when filled)
            if reflection_open:
                reflection_val = getattr(think_result, 'reflection', None)
                if reflection_val is not None:
                    policy_context.append(f"## Information Assessment (Reflection):\n{reflection_val}")
                    reflection_open = False  # close
                    re_think = True

            if re_think:
                continue  # Re-think with narrowed model + accumulated context

            break  # No operator activated — LLM gave a direct decision

        return think_result

    ############################################################################
    # Internal helpers
    ############################################################################

    def _log_prompt(self, stage: str, messages: List[Message]):
        """Log prompts with timestamp and caller location if verbose_prompt is enabled."""
        if not self._verbose_prompt:
            return
        import inspect
        from datetime import datetime
        from os.path import basename

        frame = inspect.currentframe()
        try:
            caller = frame.f_back if frame is not None else None
            if caller is not None:
                filename = basename(caller.f_code.co_filename)
                lineno = caller.f_lineno
            else:
                filename, lineno = "?", 0
        finally:
            del frame

        ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        total_tokens = sum(self._count_tokens(m.content) for m in messages)
        for i, msg in enumerate(messages):
            tokens = self._count_tokens(msg.content)
            printer.print(f"[{ts}] [{stage}] ({filename}:{lineno}) Message {i+1} ({msg.role}, {tokens} tokens):", color="cyan")
            printer.print(msg.content, color="gray")
        printer.print(f"[{ts}] [{stage}] ({filename}:{lineno}) Total: {total_tokens} tokens (cumulative: {self.spent_tokens})", color="yellow")

    @staticmethod
    def _generate_schema_example(schema: dict, defs: dict = None) -> Any:
        """
        Recursively build a compact example value from a JSON Schema node.

        Handles $ref, anyOf (Optional), object, array, scalar types, enum, and const.
        Arrays are represented as a single-element list to keep examples concise.
        """
        if defs is None:
            defs = schema.get('$defs', {})

        # Resolve $ref
        if '$ref' in schema:
            ref_name = schema['$ref'].split('/')[-1]
            return CognitiveWorker._generate_schema_example(defs.get(ref_name, {}), defs)

        # anyOf / oneOf — covers Optional[X] (anyOf: [{type: X}, {type: null}])
        if 'anyOf' in schema:
            non_null = [s for s in schema['anyOf'] if s.get('type') != 'null']
            if non_null:
                return CognitiveWorker._generate_schema_example(non_null[0], defs)
            return None

        # allOf — usually single-item wrapper
        if 'allOf' in schema:
            if len(schema['allOf']) == 1:
                return CognitiveWorker._generate_schema_example(schema['allOf'][0], defs)
            return {}

        # Enum — use first value
        if 'enum' in schema:
            return schema['enum'][0]

        # Const
        if 'const' in schema:
            return schema['const']

        schema_type = schema.get('type')

        if schema_type == 'object':
            props = schema.get('properties', {})
            result = {}
            for name, prop_schema in props.items():
                if 'default' in prop_schema:
                    result[name] = prop_schema['default']
                else:
                    result[name] = CognitiveWorker._generate_schema_example(prop_schema, defs)
            return result

        if schema_type == 'array':
            items = schema.get('items', {})
            item_example = CognitiveWorker._generate_schema_example(items, defs)
            return [item_example]

        if schema_type == 'string':
            return "..."
        if schema_type == 'integer':
            return schema.get('default', 0)
        if schema_type == 'number':
            return schema.get('default', 0.0)
        if schema_type == 'boolean':
            return schema.get('default', False)

        return None

    @staticmethod
    def _build_schema_example_prompt(model: Type[BaseModel]) -> str:
        """
        Build a compact inline example from a Pydantic model.

        Replaces the old full JSON Schema dump with a concise representative example,
        which is more token-efficient and equally effective for guiding the LLM.

        Example output for ``PlanningResult``:
            {"phases": [{"sub_goal": "...", "skill_name": "...", "max_steps": 20}]}
        """
        schema = model.model_json_schema()
        example = CognitiveWorker._generate_schema_example(schema)
        return json.dumps(example, ensure_ascii=False)

    def _output_fields_prompt(self, acquiring_open: bool, rehearsal_open: bool, reflection_open: bool) -> str:
        """Base fields: step_content, output, finish. Always present."""
        parts = []

        # Base fields
        parts.append(
            "# Output Fields\n"
            "- **step_content**: Your analysis and reasoning for this step\n"
            "- **finish**: Set True when the sub-task is fully complete (default: False)"
        )

        # Cognitive operators
        if acquiring_open:
            parts.append(
                "- **details**: Available fields: **skills**, **cognitive_history**. "
                "example: [{field: 'skills', index: 0}, ...]"
            )
        if rehearsal_open:
            parts.append("- **rehearsal**: string describing your simulation and predictions")
        if reflection_open:
            parts.append("- **reflection**: string describing your assessment conclusions")

        # Output schema
        if self.output_schema is not None:
            parts.append(
                "- **output**: Structured result — example: "
                f"{self._build_schema_example_prompt(self.output_schema)}"
            )
        else:
            parts.append(
                "- **output**: Tool calls to execute: "
                "[{tool, tool_arguments: [{name: 'param', value: 'value'}]}]\n"
            )
        return "\n".join(parts)

    @staticmethod
    def _acquiring_prompt() -> str:
        """Acquiring operator: when/why to use + details field format."""
        return (
            "# Context Acquiring\n"
            "If the context contains progressively disclosed information (e.g. skills, history steps) "
            "and you want to inspect the details, use the **details** field to request them. "
            "The framework will expand these items in the next round. "
            "Batch all requests in a single output. "
            "When using this field, leave step_content and output empty.\n\n"
            "## Field format:\n"
            "- **details**: [{field: \"skills\", index: 0}, ...]\n"
            "  Available fields: **skills** (view a skill's full workflow), "
            "**cognitive_history** (view the full result of a previous step)"
        )

    @staticmethod
    def _rehearsal_prompt() -> str:
        """Rehearsal operator: when/why to use + rehearsal field format."""
        return (
            "# Pre-Action Rehearsal (optional)\n"
            "To ensure the next action is accurate, you may mentally simulate it first: "
            "which tools you plan to call, what they are expected to return, and whether any issues may arise. "
            "When using this field, leave step_content and output empty; "
            "the framework will ask for an actual decision in the next round.\n\n"
            "## Field format:\n"
            "- **rehearsal**: \"(string describing your simulation and predictions)\""
        )

    @staticmethod
    def _reflection_prompt() -> str:
        """Reflection operator: when/why to use + reflection field format."""
        return (
            "# Pre-Action Reflection (optional)\n"
            "Before committing to a decision, evaluate whether the current information is "
            "sufficient and self-consistent. "
            "If you have doubts, fill the **reflection** field and leave step_content and output empty; "
            "the framework will ask for an actual decision in the next round.\n\n"
            "## Field format:\n"
            "- **reflection**: \"(string describing your assessment conclusions)\""
        )

    def _build_output_instructions(
        self,
        acquiring_open: bool,
        rehearsal_open: bool,
        reflection_open: bool,
    ) -> str:
        parts = []
        if acquiring_open:
            parts.append(self._acquiring_prompt())
        if rehearsal_open:
            parts.append(self._rehearsal_prompt())
        if reflection_open:
            parts.append(self._reflection_prompt())
        parts.append(self._output_fields_prompt(acquiring_open, rehearsal_open, reflection_open))
        return "\n\n".join(parts)

    async def _build_messages(
        self,
        think_prompt: str,
        context: CognitiveContext,
        observation: Optional[str] = None,
        policy_context: List[str] = None,
        acquiring_open: bool = True,
        rehearsal_open: bool = False,
        reflection_open: bool = False,
    ) -> List[Message]:
        """Build messages for the thinking phase (thinking + tool selection in one call).

        Parameters
        ----------
        think_prompt : str
            The thinking prompt from the thinking() method.
        context : CognitiveContext
            The cognitive context.
        observation : Optional[str]
            Custom observation from user-overridden observation() method.
        policy_context : List[str]
            Policy outputs from previous rounds (rehearsal, reflection).
        acquiring_open : bool
            Whether the acquiring operator can still fire this round.
        rehearsal_open : bool
            Whether the rehearsal operator can still fire this round.
        reflection_open : bool
            Whether the reflection operator can still fire this round.
        """
        def _format_tools_details(tool_specs: List[ToolSpec]) -> str:
            """Format tool specs into detailed description string."""
            lines = []
            for tool in tool_specs:
                tool_lines = [f"• {tool.tool_name}: {tool.tool_description}"]

                if tool.tool_parameters:
                    props = tool.tool_parameters.get('properties', {})
                    required = tool.tool_parameters.get('required', [])

                    if props:
                        for name, info in props.items():
                            param_type = info.get('type', 'any')
                            param_desc = info.get('description', '')
                            is_required = name in required

                            req_mark = " [required]" if is_required else " [optional]"
                            param_line = f"  - {name} ({param_type}){req_mark}"
                            if param_desc:
                                param_line += f": {param_desc}"
                            tool_lines.append(param_line)

                lines.extend(tool_lines)

            return "\n".join(lines)

        # Cache the full summary dict once — both the capabilities block
        # below and the context-info block that follows derive from it.
        # Without caching, format_summary() would trigger a second summary()
        # traversal over every Exposure field on the context.
        summary_dict = context.summary()

        # 1. Build tool details and skills summary
        capabilities_parts = []
        _, tool_specs = context.get_field('tools')
        if tool_specs:
            tools_details = _format_tools_details(tool_specs)
            capabilities_parts.append(f"# Available Tools (with parameters):\n{tools_details}")
        if len(context.skills) > 0:
            skills_summary = summary_dict.get('skills')
            if skills_summary:
                capabilities_parts.append(f"# {skills_summary}")
        capabilities_description = "\n\n".join(capabilities_parts)

        # 2. Build context info about current status (reuse cached summary_dict)
        context_info = "\n".join(
            summary_dict[f]
            for f in summary_dict
            if f not in ('tools', 'skills') and summary_dict.get(f)
        )
        if observation is not None:
            context_info += f"\n\nObservation:\n{observation}"
        user_prompt_context = f"Based on the context below, decide your next action.\n\n{context_info}"

        # 3. Build output instructions dynamically based on per-operator state
        output_instructions = self._build_output_instructions(
            acquiring_open=acquiring_open,
            rehearsal_open=rehearsal_open,
            reflection_open=reflection_open,
        )

        # Call template method to assemble final messages
        messages = await self.build_messages(
            think_prompt=think_prompt.strip(),
            tools_description=capabilities_description,
            output_instructions=output_instructions,
            context_info=user_prompt_context
        )

        self.spent_tokens += sum(self._count_tokens(m.content) for m in messages)
        return messages

    def _count_tokens(self, text: str) -> int:
        """Estimate token count. Rough approximation: ~4 chars per token (typical for English/UTF-8)."""
        return (len(text) + 3) // 4

    ############################################################################
    # Template methods (override by user to customize the behavior)
    ############################################################################

    async def observation(self, context: CognitiveContext) -> Any:
        """
        Enhance or customize the observation before thinking.

        Parameters
        ----------
        context : CognitiveContext
            The cognitive context.

        Returns
        -------
        Any
            _DELEGATE to delegate observation to AmphibiousAutoma.observation().
            A string to use as the observation directly.

        Examples
        --------
        >>> async def observation(self, context):
        ...     return _DELEGATE  # Use agent-level observation (default)
        ...
        >>> async def observation(self, context):
        ...     return f"Current state: {context.goal}"  # Custom observation
        """
        return _DELEGATE

    async def thinking(self) -> str:
        """
        Define how to think about the next step(s). Must be implemented.

        The returned prompt is used to guide the LLM. This is the core template method.

        Returns
        -------
        str
            Thinking prompt for the LLM.

        Examples
        --------
        >>> async def thinking(self):
        ...     return "Plan ONE immediate next step."  # React-style
        ...
        >>> async def thinking(self):
        ...     return "Create a complete step-by-step plan."  # Plan-style
        """
        raise NotImplementedError("thinking() must be implemented")

    async def build_messages(
        self,
        think_prompt: str,
        tools_description: str,
        output_instructions: str,
        context_info: str,
    ) -> List[Message]:
        """
        Assemble the final messages for the thinking phase.

        Override this method to customize how the prompt components are structured
        across messages. This allows you to reorder, modify, or add to the message list.

        Parameters
        ----------
        think_prompt : str
            The thinking prompt from the thinking() method.
        tools_description : str
            Formatted description of available tools and skills.
        output_instructions : str
            Instructions for the output format (finish, steps/step_content, etc.).
        context_info : str
            Context information including goal, status, history, and fetched details.

        Returns
        -------
        List[Message]
            Messages to be sent to the LLM. Default structure:
            - Message 1 (system): think_prompt + tools_description (if non-empty) + output_instructions
            - Message 2 (user): context_info

        Examples
        --------
        >>> async def build_messages(self, think_prompt, tools_description,
        ...                          output_instructions, context_info):
        ...     # Custom: merge everything into a single system + user pair
        ...     extra = "EXTRA_INSTRUCTION: Always prefer cheapest option."
        ...     system = f"{think_prompt}\\n\\n{extra}\\n\\n{tools_description}\\n\\n{output_instructions}"
        ...     return [
        ...         Message.from_text(text=system, role="system"),
        ...         Message.from_text(text=context_info, role="user"),
        ...     ]
        """
        parts = [think_prompt]
        if tools_description:
            parts.append(tools_description)
        parts.append(output_instructions)
        system_content = "\n\n".join(parts)

        return [
            Message.from_text(text=system_content, role="system"),
            Message.from_text(text=context_info, role="user"),
        ]

    async def before_action(
        self,
        decision_result: Any,
        context: CognitiveContext
    ) -> Any:
        """
        Verify and optionally adjust the output before execution.

        Returns ``_DELEGATE`` by default, which delegates to the agent-level
        ``before_action()`` method. Override to intercept and modify tool calls
        at the worker level.

        Parameters
        ----------
        decision_result : Any
            The result of the decision.
        context : CognitiveContext
            Current cognitive context.

        Returns
        -------
        Any
            Verified/adjusted decision result, or ``_DELEGATE`` to delegate
            to the agent-level hook.

        Examples
        --------
        >>> async def before_action(self, decision_result, context):
        ...     # Filter out dangerous tools
        ...     return decision_result.filter(lambda x: x.tool_name not in ["delete", "drop"])
        """
        return _DELEGATE

    async def after_action(self, step_result: Any, ctx: "CognitiveContext") -> Any:
        """
        Worker-level post-action hook for side effects on the context.

        Returns ``_DELEGATE`` by default, which chains to the agent-level
        ``after_action()`` method. Override this hook to mutate custom
        context fields or perform side effects at the worker level.

        The return value is a *control signal*, not a data channel:
        - Return ``_DELEGATE`` to also invoke the agent-level ``after_action``.
        - Return anything else to suppress the agent-level hook.
        The returned value itself is discarded — the step_result stored in
        history is never replaced by this hook. Perform any mutation by
        updating ``ctx`` or ``step_result`` in place.

        Parameters
        ----------
        step_result : Any
            The result of the action step (typically a ``Step`` instance).
        ctx : CognitiveContext
            Current cognitive context.

        Returns
        -------
        Any
            ``_DELEGATE`` to chain to the agent-level hook, or any other
            value to suppress it.

        Examples
        --------
        >>> async def after_action(self, step_result, ctx):
        ...     # Update custom context fields (side effect only)
        ...     ctx.current_document = extract_document(step_result)
        ...     return _DELEGATE  # still let the agent-level hook run
        """
        return _DELEGATE

    ############################################################################
    # Entry point
    ############################################################################

    @classmethod
    def inline(
        cls,
        thinking_prompt: str,
        llm: Optional[BaseLlm] = None,
        enable_rehearsal: bool = False,
        enable_reflection: bool = False,
        verbose: Optional[bool] = None,
        verbose_prompt: Optional[bool] = None,
        output_schema: Optional[Type[BaseModel]] = None,
    ) -> "CognitiveWorker":
        """
        Create a simple CognitiveWorker from a thinking prompt string.

        Convenience factory for cases where you only need to customize the
        thinking prompt without overriding other methods.

        Parameters
        ----------
        thinking_prompt : str
            The thinking prompt to use.
        llm : Optional[BaseLlm]
            LLM instance to use.
        enable_rehearsal : bool
            Enable rehearsal policy.
        enable_reflection : bool
            Enable reflection policy.
        verbose : Optional[bool]
            Enable verbose logging.
        verbose_prompt : Optional[bool]
            Enable prompt logging.
        output_schema : Optional[Type[BaseModel]]
            If set, the worker produces a typed instance directly instead of
            going through the standard tool-call loop.

        Returns
        -------
        CognitiveWorker
            A worker instance with the specified thinking prompt.

        Examples
        --------
        >>> worker = CognitiveWorker.inline(
        ...     "Plan ONE immediate next step",
        ...     llm=llm,
        ...     enable_rehearsal=True
        ... )
        >>> planner = CognitiveWorker.inline(
        ...     "Analyze the goal and produce a phased plan.",
        ...     output_schema=PlanningResult,
        ... )
        """
        class _PromptWorker(cls):
            async def thinking(self):
                return thinking_prompt

        return _PromptWorker(
            llm=llm,
            enable_rehearsal=enable_rehearsal,
            enable_reflection=enable_reflection,
            verbose=verbose,
            verbose_prompt=verbose_prompt,
            output_schema=output_schema,
        )

    # Alias for inline() — preferred for readability in some contexts
    from_prompt = inline

    async def arun(
        self,
        *args: Any,
        feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
        **kwargs: Any,
    ) -> Any:
        """Execute the thinking phase. Observation must be pre-set in context.observation."""
        start_time = time.time()
        result = await super().arun(*args, feedback_data=feedback_data, **kwargs)
        self.spent_time += time.time() - start_time
        return result

set_llm

set_llm(llm: BaseLlm) -> None

Set the LLM used for thinking and tool selection.

Parameters:

Name Type Description Default
llm BaseLlm

LLM instance to use. Replaces any previously set LLM.

required
Source code in bridgic/amphibious/_cognitive_worker.py
def set_llm(self, llm: BaseLlm) -> None:
    """
    Set the LLM used for thinking and tool selection.

    Parameters
    ----------
    llm : BaseLlm
        LLM instance to use. Replaces any previously set LLM.
    """
    self._llm = llm

observation

async
observation(context: CognitiveContext) -> Any

Enhance or customize the observation before thinking.

Parameters:

Name Type Description Default
context CognitiveContext

The cognitive context.

required

Returns:

Type Description
Any

_DELEGATE to delegate observation to AmphibiousAutoma.observation(). A string to use as the observation directly.

Examples:

1
2
3
4
5
>>> async def observation(self, context):
...     return _DELEGATE  # Use agent-level observation (default)
...
>>> async def observation(self, context):
...     return f"Current state: {context.goal}"  # Custom observation
Source code in bridgic/amphibious/_cognitive_worker.py
async def observation(self, context: CognitiveContext) -> Any:
    """
    Enhance or customize the observation before thinking.

    Parameters
    ----------
    context : CognitiveContext
        The cognitive context.

    Returns
    -------
    Any
        _DELEGATE to delegate observation to AmphibiousAutoma.observation().
        A string to use as the observation directly.

    Examples
    --------
    >>> async def observation(self, context):
    ...     return _DELEGATE  # Use agent-level observation (default)
    ...
    >>> async def observation(self, context):
    ...     return f"Current state: {context.goal}"  # Custom observation
    """
    return _DELEGATE

thinking

async
thinking() -> str

Define how to think about the next step(s). Must be implemented.

The returned prompt is used to guide the LLM. This is the core template method.

Returns:

Type Description
str

Thinking prompt for the LLM.

Examples:

1
2
3
4
5
>>> async def thinking(self):
...     return "Plan ONE immediate next step."  # React-style
...
>>> async def thinking(self):
...     return "Create a complete step-by-step plan."  # Plan-style
Source code in bridgic/amphibious/_cognitive_worker.py
async def thinking(self) -> str:
    """
    Define how to think about the next step(s). Must be implemented.

    The returned prompt is used to guide the LLM. This is the core template method.

    Returns
    -------
    str
        Thinking prompt for the LLM.

    Examples
    --------
    >>> async def thinking(self):
    ...     return "Plan ONE immediate next step."  # React-style
    ...
    >>> async def thinking(self):
    ...     return "Create a complete step-by-step plan."  # Plan-style
    """
    raise NotImplementedError("thinking() must be implemented")

build_messages

async
build_messages(
    think_prompt: str,
    tools_description: str,
    output_instructions: str,
    context_info: str,
) -> List[Message]

Assemble the final messages for the thinking phase.

Override this method to customize how the prompt components are structured across messages. This allows you to reorder, modify, or add to the message list.

Parameters:

Name Type Description Default
think_prompt str

The thinking prompt from the thinking() method.

required
tools_description str

Formatted description of available tools and skills.

required
output_instructions str

Instructions for the output format (finish, steps/step_content, etc.).

required
context_info str

Context information including goal, status, history, and fetched details.

required

Returns:

Type Description
List[Message]

Messages to be sent to the LLM. Default structure: - Message 1 (system): think_prompt + tools_description (if non-empty) + output_instructions - Message 2 (user): context_info

Examples:

1
2
3
4
5
6
7
8
9
>>> async def build_messages(self, think_prompt, tools_description,
...                          output_instructions, context_info):
...     # Custom: merge everything into a single system + user pair
...     extra = "EXTRA_INSTRUCTION: Always prefer cheapest option."
...     system = f"{think_prompt}\n\n{extra}\n\n{tools_description}\n\n{output_instructions}"
...     return [
...         Message.from_text(text=system, role="system"),
...         Message.from_text(text=context_info, role="user"),
...     ]
Source code in bridgic/amphibious/_cognitive_worker.py
async def build_messages(
    self,
    think_prompt: str,
    tools_description: str,
    output_instructions: str,
    context_info: str,
) -> List[Message]:
    """
    Assemble the final messages for the thinking phase.

    Override this method to customize how the prompt components are structured
    across messages. This allows you to reorder, modify, or add to the message list.

    Parameters
    ----------
    think_prompt : str
        The thinking prompt from the thinking() method.
    tools_description : str
        Formatted description of available tools and skills.
    output_instructions : str
        Instructions for the output format (finish, steps/step_content, etc.).
    context_info : str
        Context information including goal, status, history, and fetched details.

    Returns
    -------
    List[Message]
        Messages to be sent to the LLM. Default structure:
        - Message 1 (system): think_prompt + tools_description (if non-empty) + output_instructions
        - Message 2 (user): context_info

    Examples
    --------
    >>> async def build_messages(self, think_prompt, tools_description,
    ...                          output_instructions, context_info):
    ...     # Custom: merge everything into a single system + user pair
    ...     extra = "EXTRA_INSTRUCTION: Always prefer cheapest option."
    ...     system = f"{think_prompt}\\n\\n{extra}\\n\\n{tools_description}\\n\\n{output_instructions}"
    ...     return [
    ...         Message.from_text(text=system, role="system"),
    ...         Message.from_text(text=context_info, role="user"),
    ...     ]
    """
    parts = [think_prompt]
    if tools_description:
        parts.append(tools_description)
    parts.append(output_instructions)
    system_content = "\n\n".join(parts)

    return [
        Message.from_text(text=system_content, role="system"),
        Message.from_text(text=context_info, role="user"),
    ]

before_action

async
before_action(
    decision_result: Any, context: CognitiveContext
) -> Any

Verify and optionally adjust the output before execution.

Returns _DELEGATE by default, which delegates to the agent-level before_action() method. Override to intercept and modify tool calls at the worker level.

Parameters:

Name Type Description Default
decision_result Any

The result of the decision.

required
context CognitiveContext

Current cognitive context.

required

Returns:

Type Description
Any

Verified/adjusted decision result, or _DELEGATE to delegate to the agent-level hook.

Examples:

1
2
3
>>> async def before_action(self, decision_result, context):
...     # Filter out dangerous tools
...     return decision_result.filter(lambda x: x.tool_name not in ["delete", "drop"])
Source code in bridgic/amphibious/_cognitive_worker.py
async def before_action(
    self,
    decision_result: Any,
    context: CognitiveContext
) -> Any:
    """
    Verify and optionally adjust the output before execution.

    Returns ``_DELEGATE`` by default, which delegates to the agent-level
    ``before_action()`` method. Override to intercept and modify tool calls
    at the worker level.

    Parameters
    ----------
    decision_result : Any
        The result of the decision.
    context : CognitiveContext
        Current cognitive context.

    Returns
    -------
    Any
        Verified/adjusted decision result, or ``_DELEGATE`` to delegate
        to the agent-level hook.

    Examples
    --------
    >>> async def before_action(self, decision_result, context):
    ...     # Filter out dangerous tools
    ...     return decision_result.filter(lambda x: x.tool_name not in ["delete", "drop"])
    """
    return _DELEGATE

after_action

async
after_action(
    step_result: Any, ctx: CognitiveContext
) -> Any

Worker-level post-action hook for side effects on the context.

Returns _DELEGATE by default, which chains to the agent-level after_action() method. Override this hook to mutate custom context fields or perform side effects at the worker level.

The return value is a control signal, not a data channel: - Return _DELEGATE to also invoke the agent-level after_action. - Return anything else to suppress the agent-level hook. The returned value itself is discarded — the step_result stored in history is never replaced by this hook. Perform any mutation by updating ctx or step_result in place.

Parameters:

Name Type Description Default
step_result Any

The result of the action step (typically a Step instance).

required
ctx CognitiveContext

Current cognitive context.

required

Returns:

Type Description
Any

_DELEGATE to chain to the agent-level hook, or any other value to suppress it.

Examples:

1
2
3
4
>>> async def after_action(self, step_result, ctx):
...     # Update custom context fields (side effect only)
...     ctx.current_document = extract_document(step_result)
...     return _DELEGATE  # still let the agent-level hook run
Source code in bridgic/amphibious/_cognitive_worker.py
async def after_action(self, step_result: Any, ctx: "CognitiveContext") -> Any:
    """
    Worker-level post-action hook for side effects on the context.

    Returns ``_DELEGATE`` by default, which chains to the agent-level
    ``after_action()`` method. Override this hook to mutate custom
    context fields or perform side effects at the worker level.

    The return value is a *control signal*, not a data channel:
    - Return ``_DELEGATE`` to also invoke the agent-level ``after_action``.
    - Return anything else to suppress the agent-level hook.
    The returned value itself is discarded — the step_result stored in
    history is never replaced by this hook. Perform any mutation by
    updating ``ctx`` or ``step_result`` in place.

    Parameters
    ----------
    step_result : Any
        The result of the action step (typically a ``Step`` instance).
    ctx : CognitiveContext
        Current cognitive context.

    Returns
    -------
    Any
        ``_DELEGATE`` to chain to the agent-level hook, or any other
        value to suppress it.

    Examples
    --------
    >>> async def after_action(self, step_result, ctx):
    ...     # Update custom context fields (side effect only)
    ...     ctx.current_document = extract_document(step_result)
    ...     return _DELEGATE  # still let the agent-level hook run
    """
    return _DELEGATE

inline

classmethod
inline(
    thinking_prompt: str,
    llm: Optional[BaseLlm] = None,
    enable_rehearsal: bool = False,
    enable_reflection: bool = False,
    verbose: Optional[bool] = None,
    verbose_prompt: Optional[bool] = None,
    output_schema: Optional[Type[BaseModel]] = None,
) -> CognitiveWorker

Create a simple CognitiveWorker from a thinking prompt string.

Convenience factory for cases where you only need to customize the thinking prompt without overriding other methods.

Parameters:

Name Type Description Default
thinking_prompt str

The thinking prompt to use.

required
llm Optional[BaseLlm]

LLM instance to use.

None
enable_rehearsal bool

Enable rehearsal policy.

False
enable_reflection bool

Enable reflection policy.

False
verbose Optional[bool]

Enable verbose logging.

None
verbose_prompt Optional[bool]

Enable prompt logging.

None
output_schema Optional[Type[BaseModel]]

If set, the worker produces a typed instance directly instead of going through the standard tool-call loop.

None

Returns:

Type Description
CognitiveWorker

A worker instance with the specified thinking prompt.

Examples:

1
2
3
4
5
6
7
8
9
>>> worker = CognitiveWorker.inline(
...     "Plan ONE immediate next step",
...     llm=llm,
...     enable_rehearsal=True
... )
>>> planner = CognitiveWorker.inline(
...     "Analyze the goal and produce a phased plan.",
...     output_schema=PlanningResult,
... )
Source code in bridgic/amphibious/_cognitive_worker.py
@classmethod
def inline(
    cls,
    thinking_prompt: str,
    llm: Optional[BaseLlm] = None,
    enable_rehearsal: bool = False,
    enable_reflection: bool = False,
    verbose: Optional[bool] = None,
    verbose_prompt: Optional[bool] = None,
    output_schema: Optional[Type[BaseModel]] = None,
) -> "CognitiveWorker":
    """
    Create a simple CognitiveWorker from a thinking prompt string.

    Convenience factory for cases where you only need to customize the
    thinking prompt without overriding other methods.

    Parameters
    ----------
    thinking_prompt : str
        The thinking prompt to use.
    llm : Optional[BaseLlm]
        LLM instance to use.
    enable_rehearsal : bool
        Enable rehearsal policy.
    enable_reflection : bool
        Enable reflection policy.
    verbose : Optional[bool]
        Enable verbose logging.
    verbose_prompt : Optional[bool]
        Enable prompt logging.
    output_schema : Optional[Type[BaseModel]]
        If set, the worker produces a typed instance directly instead of
        going through the standard tool-call loop.

    Returns
    -------
    CognitiveWorker
        A worker instance with the specified thinking prompt.

    Examples
    --------
    >>> worker = CognitiveWorker.inline(
    ...     "Plan ONE immediate next step",
    ...     llm=llm,
    ...     enable_rehearsal=True
    ... )
    >>> planner = CognitiveWorker.inline(
    ...     "Analyze the goal and produce a phased plan.",
    ...     output_schema=PlanningResult,
    ... )
    """
    class _PromptWorker(cls):
        async def thinking(self):
            return thinking_prompt

    return _PromptWorker(
        llm=llm,
        enable_rehearsal=enable_rehearsal,
        enable_reflection=enable_reflection,
        verbose=verbose,
        verbose_prompt=verbose_prompt,
        output_schema=output_schema,
    )

arun

async
arun(
    *args: Any,
    feedback_data: Optional[
        Union[
            InteractionFeedback, List[InteractionFeedback]
        ]
    ] = None,
    **kwargs: Any
) -> Any

Execute the thinking phase. Observation must be pre-set in context.observation.

Source code in bridgic/amphibious/_cognitive_worker.py
async def arun(
    self,
    *args: Any,
    feedback_data: Optional[Union[InteractionFeedback, List[InteractionFeedback]]] = None,
    **kwargs: Any,
) -> Any:
    """Execute the thinking phase. Observation must be pre-set in context.observation."""
    start_time = time.time()
    result = await super().arun(*args, feedback_data=feedback_data, **kwargs)
    self.spent_time += time.time() - start_time
    return result

AmphibiousAutoma

Bases: GraphAutoma, Generic[CognitiveContextT]

Base class for amphibious agents — dual-mode orchestration engine.

Supports three execution modes: - Agent mode (on_agent): LLM-driven observe-think-act cycles via think_unit - Workflow mode (on_workflow): Deterministic step execution via yield - Amphiflow mode (on_workflow + on_agent): workflow-first with automatic agent fallback when a step fails

Subclasses define behavior by implementing on_agent() and/or on_workflow(). Under RunMode.AUTO (the default) the runtime picks the mode from which template methods are overridden:

  • only on_agent overridden → RunMode.AGENT
  • only on_workflow overridden → RunMode.WORKFLOW
  • both overridden → RunMode.AMPHIFLOW

Parameters:

Name Type Description Default
llm Optional[BaseLlm]

Default LLM for workers and auxiliary tasks (e.g., history compression). Individual workers can specify their own LLM.

None
name Optional[str]

Optional name for the agent instance.

None
verbose bool

Enable logging of execution summary (tokens, time). Default is False.

False

Examples:

1
2
3
4
5
6
>>> class MyAgent(AmphibiousAutoma[CognitiveContext]):
...     main_think = think_unit(CognitiveWorker.inline("Execute step"), max_attempts=20)
...     async def on_agent(self, ctx: CognitiveContext):
...         await self.main_think
...
>>> ctx = await MyAgent(llm=llm).arun(goal="Complete the task", tools=[...])
Source code in bridgic/amphibious/_amphibious_automa.py
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
class AmphibiousAutoma(GraphAutoma, Generic[CognitiveContextT]):
    """
    Base class for amphibious agents — dual-mode orchestration engine.

    Supports three execution modes:
    - **Agent mode** (``on_agent``): LLM-driven observe-think-act cycles via think_unit
    - **Workflow mode** (``on_workflow``): Deterministic step execution via yield
    - **Amphiflow mode** (``on_workflow`` + ``on_agent``): workflow-first with
      automatic agent fallback when a step fails

    Subclasses define behavior by implementing ``on_agent()`` and/or ``on_workflow()``.
    Under ``RunMode.AUTO`` (the default) the runtime picks the mode from which
    template methods are overridden:

    - only ``on_agent`` overridden → ``RunMode.AGENT``
    - only ``on_workflow`` overridden → ``RunMode.WORKFLOW``
    - both overridden → ``RunMode.AMPHIFLOW``

    Parameters
    ----------
    llm : Optional[BaseLlm]
        Default LLM for workers and auxiliary tasks (e.g., history compression).
        Individual workers can specify their own LLM.
    name : Optional[str]
        Optional name for the agent instance.
    verbose : bool
        Enable logging of execution summary (tokens, time). Default is False.

    Examples
    --------
    >>> class MyAgent(AmphibiousAutoma[CognitiveContext]):
    ...     main_think = think_unit(CognitiveWorker.inline("Execute step"), max_attempts=20)
    ...     async def on_agent(self, ctx: CognitiveContext):
    ...         await self.main_think
    ...
    >>> ctx = await MyAgent(llm=llm).arun(goal="Complete the task", tools=[...])
    """

    _context_class: Optional[Type[CognitiveContext]] = None

    #: Max think-unit attempts when a workflow step falls back to agent mode
    #: for per-step recovery (see ``_run_workflow``). Subclasses may override
    #: to give the fallback agent a longer or shorter runway.
    WORKFLOW_STEP_FALLBACK_MAX_ATTEMPTS: int = 5

    def __init_subclass__(cls, **kwargs) -> None:
        """Extract the CognitiveContext type from the generic parameter."""
        super().__init_subclass__(**kwargs)
        for base in getattr(cls, "__orig_bases__", []):
            origin = get_origin(base)
            if origin is not None:
                args = get_args(base)
                if args:
                    context_type = args[0]
                    if isinstance(context_type, type) and issubclass(context_type, CognitiveContext):
                        cls._context_class = context_type
                        return
        for base in cls.__bases__:
            if hasattr(base, "_context_class") and base._context_class is not None:
                cls._context_class = base._context_class
                break

        # Check if there are any initialization issues with the context generic class.
        if cls._context_class is None or not issubclass(cls._context_class, CognitiveContext):
            raise TypeError(
                f"{cls.__name__} must specify a CognitiveContext type via generic parameter, "
                f"e.g., class {cls.__name__}(AmphibiousAutoma[MyContext])"
            )

    def __init__(
        self,
        name: Optional[str] = None,
        thread_pool: Optional[ThreadPoolExecutor] = None,
        running_options: Optional[RunningOptions] = None,
        llm: Optional[Any] = None,
        verbose: bool = False,
    ):
        super().__init__(name=name, thread_pool=thread_pool, running_options=running_options)

        self._llm = llm
        self._current_context: Optional[CognitiveContextT] = None
        self._verbose = verbose

        # Trace capture
        self._agent_trace: Optional[AgentTrace] = None

        # Final answer (set automatically or via set_final_answer())
        self._final_answer: Optional[str] = None

        # Usage stats (reset per arun call)
        self.spent_tokens: int = 0
        self.spent_time: float = 0.0

        # Human-in-the-loop event handler registration
        self._register_human_input_handler()

    @property
    def llm(self) -> Optional[Any]:
        """Access the agent's default LLM."""
        return self._llm

    @property
    def context(self) -> Optional[CognitiveContextT]:
        """Access the current context."""
        return self._current_context

    @property
    def final_answer(self) -> Optional[str]:
        """The final answer produced by the last ``arun()`` call.

        Automatically captured from the ``step_content`` of the finishing step
        (agent mode) or the last executed step (workflow mode).
        Can be overridden explicitly via ``set_final_answer()``.
        """
        return self._final_answer

    def set_final_answer(self, answer: str) -> None:
        """Explicitly set the final answer.

        Call this inside ``on_agent()`` or ``on_workflow()`` to override
        the auto-captured value.

        Parameters
        ----------
        answer : str
            The final answer text.
        """
        self._final_answer = answer

    ############################################################################
    # Human-in-the-loop support
    ############################################################################

    def _register_human_input_handler(self) -> None:
        """Register the default event handler for human input requests.

        The handler calls ``human_input()`` via ``asyncio.ensure_future()``
        so it can be an async template method while being invoked from
        the synchronous event-handler callback that ``request_feedback_async``
        uses.
        """

        def _on_human_input(event: Event, feedback_sender):
            asyncio.ensure_future(
                self._handle_human_input(event, feedback_sender)
            )

        self.register_event_handler(HUMAN_INPUT_EVENT_TYPE, _on_human_input)

    async def _handle_human_input(self, event: Event, feedback_sender) -> None:
        """Internal dispatcher — calls the overridable ``human_input()`` template."""
        response = await self.human_input(event.data)
        feedback_sender.send(Feedback(data=response))

    async def request_human(self, prompt: str, *, timeout: Optional[float] = None) -> str:
        """Request input from the human operator.

        This is the primary entry point for code-level human-in-the-loop
        calls inside ``on_agent()`` (between think units) or from the
        ``request_human_tool`` closure.

        Parameters
        ----------
        prompt : str
            The question or message to present to the human.
        timeout : Optional[float]
            Seconds to wait before raising ``TimeoutError``. None means
            wait indefinitely.

        Returns
        -------
        str
            The human's response text.

        Raises
        ------
        TimeoutError
            If no response is received within ``timeout`` seconds.
        """
        event = Event(
            event_type=HUMAN_INPUT_EVENT_TYPE,
            data={"prompt": prompt, "timeout": timeout},
        )
        feedback = await self.request_feedback_async(event, timeout=timeout)
        return feedback.data

    ############################################################################
    # Template methods (override by user to customize the behavior)
    ############################################################################
    async def observation(self, ctx: CognitiveContextT) -> Optional[str]:
        """
        Agent-level default observation, shared across all workers.

        Called by ``_run()`` before each thinking phase. Workers can
        enhance this via their own ``observation()`` method.

        Parameters
        ----------
        ctx : CognitiveContextT
            The cognitive context.

        Returns
        -------
        Optional[str]
            Custom observation text, or None.
        """
        return None

    async def on_agent(self, ctx: CognitiveContextT) -> None:
        """
        Agent mode: LLM-driven orchestration logic.

        Override this method to define how think_units are orchestrated.
        Use standard Python control flow combined with ``await self.think_unit``
        calls.

        A subclass may override this, ``on_workflow``, or both. The default
        implementation is a no-op so that subclasses which only implement
        ``on_workflow`` remain instantiable.

        Parameters
        ----------
        ctx : CognitiveContextT
            The cognitive context, used for accessing state and conditions.

        Examples
        --------
        >>> async def on_agent(self, ctx: MyContext):
        ...     await self.main_think
        ...     await self.exec_think.until(lambda ctx: ctx.done, max_attempts=20)
        """
        return None

    async def on_workflow(self, ctx: CognitiveContextT) -> AsyncGenerator[Union[ActionCall, HumanCall, AgentCall], None]:
        """Workflow mode: deterministic execution as an async generator.

        Override this method to define a deterministic workflow. When overridden,
        ``arun()`` automatically routes to workflow mode instead of ``on_agent()``.

        Three yield types are supported:
        - ``ActionCall``  — deterministic single-tool execution
        - ``HumanCall``   — pause and request human input (returned as str via asend)
        - ``AgentCall``   — delegate a sub-task to LLM agent mode

        The generator exhausting signals workflow completion — no finish signal needed.
        Use ``result = yield ActionCall(...)`` to receive tool execution results via asend().

        Examples
        --------
        >>> async def on_workflow(self, ctx):
        ...     yield ActionCall("navigate_to", url="http://example.com")
        ...     result = yield ActionCall("click_element_by_ref", ref="42")
        ...     yield AgentCall(goal="Handle complex case", max_attempts=10)
        """
        if False:  # pragma: no cover — makes this a proper async generator stub
            yield

    async def before_action(
        self,
        decision_result: Any,
        ctx: CognitiveContextT,
    ) -> Any:
        """
        Agent-level before_action hook, shared across all workers.

        Called when a worker's ``before_action()`` returns ``_DELEGATE``.
        Override to intercept and modify tool calls at the agent level.

        Parameters
        ----------
        decision_result : Any
            The decision result (typically List[Tuple[ToolCall, ToolSpec]]).
        ctx : CognitiveContextT
            The cognitive context.

        Returns
        -------
        Any
            Verified/adjusted decision result.
        """
        return decision_result

    async def action_tool_call(self, tool_list: List[Tuple[ToolCall, ToolSpec]], context: CognitiveContextT) -> ActionResult:
        """
        Execute tool calls concurrently and collect results.

        Override this method to customize tool execution behavior
        (e.g., sequential execution, rate limiting, sandboxing).

        Parameters
        ----------
        tool_list : List[Tuple[ToolCall, ToolSpec]]
            Matched tool call / spec pairs to execute.
        context : CognitiveContextT
            The current cognitive context.

        Returns
        -------
        ActionResult
            Aggregated results with per-tool success/failure status.
        """

        async def _run_one(tool_call: ToolCall, tool_spec: ToolSpec) -> ActionStepResult:
            tool_worker = tool_spec.create_worker()
            sandbox = ConcurrentAutoma()
            worker_key = f"tool_{tool_call.name}_{tool_call.id}"
            sandbox.add_worker(
                key=worker_key,
                worker=tool_worker,
                args_mapping_rule=ArgsMappingRule.UNPACK,
            )
            try:
                results = await sandbox.arun(InOrder([tool_call.arguments]))
                result = results[0] if results else None
                return ActionStepResult(
                    tool_id=tool_call.id,
                    tool_name=tool_call.name,
                    tool_arguments=tool_call.arguments,
                    tool_result=result,
                    success=True,
                )
            except Exception as e:
                return ActionStepResult(
                    tool_id=tool_call.id,
                    tool_name=tool_call.name,
                    tool_arguments=tool_call.arguments,
                    tool_result=None,
                    success=False,
                    error=str(e),
                )

        step_results = await asyncio.gather(
            *(_run_one(tc, ts) for tc, ts in tool_list)
        )
        return ActionResult(results=list(step_results))

    async def action_custom_output(self, decision_result: Any, context: CognitiveContextT) -> Any:
        """
        Handle structured output from a worker with ``output_schema`` set.

        Called instead of ``action_tool_call()`` when the worker produces
        a typed Pydantic instance (via ``output_schema``) rather than tool calls.
        Override to post-process or validate structured output.

        Parameters
        ----------
        decision_result : Any
            The structured output instance produced by the worker.
        context : CognitiveContextT
            The current cognitive context.

        Returns
        -------
        Any
            The (optionally processed) result to store in execution history.
        """
        return decision_result

    async def after_action(self, step_result: Any, ctx: CognitiveContextT) -> None:
        """
        Agent-level after_action hook.

        Called after action execution and before the result is returned.
        Override to update custom context fields based on tool results.

        Parameters
        ----------
        step_result : Any
            The result of the action step.
        ctx : CognitiveContextT
            The current cognitive context.
        """
        pass

    async def human_input(self, data: Dict[str, Any]) -> str:
        """Template method invoked when the agent requests human input.

        Override this in your subclass to integrate with your UI or
        messaging layer.  The default implementation reads from stdin.

        Parameters
        ----------
        data : Dict[str, Any]
            Event payload — always contains ``"prompt"``; may contain
            ``"timeout"`` and other metadata added by the caller.

        Returns
        -------
        str
            The human's response text.
        """
        prompt = data.get("prompt", "Human input required:")
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, input, f"\n[HumanInput] {prompt}\n> "
        )

    ############################################################################
    # Core methods
    ############################################################################
    async def _run(
        self,
        worker: CognitiveWorker,
        *,
        until: Optional[Union[Callable[..., bool], Callable[..., Awaitable[bool]]]] = None,
        max_attempts: int = 1,
        tools: Optional[List[str]] = None,
        skills: Optional[List[str]] = None,
        on_error: ErrorStrategy = ErrorStrategy.RAISE,
        max_retries: int = 0,
    ) -> None:
        """
        Execute a CognitiveWorker through observe-think-act cycle(s).

        Internal method used by think_unit descriptors and _run_workflow.
        Not intended for direct use by subclasses — use think_unit instead.

        Parameters
        ----------
        worker : CognitiveWorker
            The worker to execute.
        until : Optional callable
            Loop condition: if provided, the step repeats until this returns True
            or LLM signals finish. Supports sync and async callables.
        max_attempts : int
            Maximum execution attempts (default 1 = single shot).
        tools : Optional[List[str]]
            Tool filter: only these tools are visible to the worker.
        skills : Optional[List[str]]
            Skill filter: only these skills are visible to the worker.
        on_error : ErrorStrategy
            Error handling strategy.
        max_retries : int
            Max retries for RETRY strategy.
        """
        async def _execute():
            if until is not None or max_attempts > 1:
                for _ in range(max_attempts):
                    finished = await self._run_once(
                        worker, tools=tools, skills=skills,
                        on_error=on_error, max_retries=max_retries,
                    )
                    if finished:
                        return
                    if until is not None:
                        cond_result = until(context)
                        if inspect.iscoroutine(cond_result):
                            cond_result = await cond_result
                        if cond_result:
                            return
            else:
                await self._run_once(
                    worker, tools=tools, skills=skills,
                    on_error=on_error, max_retries=max_retries,
                )

        context = self._current_context
        if context is None:
            raise RuntimeError(
                "Cannot call _run(): no active context. "
                "_run() must be called within an on_agent() method."
            )

        await _execute()

    async def _run_once(
        self,
        worker: CognitiveWorker,
        *,
        tools: Optional[List[str]] = None,
        skills: Optional[List[str]] = None,
        on_error: ErrorStrategy = ErrorStrategy.RAISE,
        max_retries: int = 0,
    ) -> bool:
        """Execute a single observe-think-act cycle. Returns whether the worker signalled finish."""
        async def _run_observe_think_act(worker: CognitiveWorker, context: CognitiveContextT) -> bool:
            worker_name = worker.__class__.__name__

            # 1. Observe
            obs = await worker.observation(context)
            if obs is _DELEGATE:
                obs = await self.observation(context)
            context.observation = obs

            obs_str = str(obs) if obs is not None else "None"
            if len(obs_str) > 200:
                obs_str = obs_str[:200] + "..."
            self._log("Observe", f"{worker_name}: {obs_str}", color="green")

            # 2. Think
            decision = await worker.arun(context=context)
            step_str = getattr(decision, 'step_content', str(decision))
            finished = getattr(decision, 'finish', False)
            self._log("Think", f"{worker_name}: finish={finished}, step={step_str}", color="cyan")

            # 3. Act
            action_result = await self._action(decision, context, _worker=worker) if decision is not None else None
            if action_result is not None:
                formatted = action_result.model_dump_json(indent=4)
                self._log("Act", f"{worker_name}:\n{formatted}", color="purple")

            # Record trace step
            self._record_trace_step(worker, obs, decision, action_result, context)

            # Auto-capture final answer when the worker signals finish
            if decision.finish and decision.step_content:
                self._final_answer = decision.step_content

            return decision.finish


        ########################
        # Initialize CognitiveWorker
        # runtime environment
        ########################
        context = self._current_context
        worker_label = worker.__class__.__name__

        # Init LLM
        if worker._llm is None and self._llm is not None:
            worker.set_llm(self._llm)

        # Init verbose
        injected_verbose = False
        if worker._verbose is None:
            worker._verbose = self._verbose
            injected_verbose = True

        # Init tools
        original_tools = None
        if tools is not None:
            original_tools = context.tools
            filtered_tools = CognitiveTools()
            for tool in original_tools.get_all():
                if tool.tool_name in tools:
                    filtered_tools.add(tool)
            context.tools = filtered_tools

        # Init skills (all bindings declared up-front so the `finally` block
        # can read them unconditionally regardless of whether filtering ran).
        original_skills: Optional[CognitiveSkills] = None
        filtered_skills: Optional[CognitiveSkills] = None
        filtered_to_orig: Dict[int, int] = {}
        if skills is not None:
            original_skills = context.skills
            filtered_skills = CognitiveSkills()
            orig_to_filtered: Dict[int, int] = {}
            for orig_idx, skill in enumerate(original_skills.get_all()):
                if skill.name in skills:
                    new_idx = len(filtered_skills)
                    filtered_skills.add(skill)
                    orig_to_filtered[orig_idx] = new_idx
                    filtered_to_orig[new_idx] = orig_idx
            for orig_idx, detail in original_skills._revealed.items():
                if orig_idx in orig_to_filtered:
                    filtered_skills._revealed[orig_to_filtered[orig_idx]] = detail
            context.skills = filtered_skills

        # Init spent status
        tokens_before = worker.spent_tokens

        ########################
        # Run CognitiveWorker
        ########################
        finished = False
        try:
            finished = await _run_observe_think_act(worker, context)
        except Exception as e:
            if on_error == ErrorStrategy.RAISE:
                raise RuntimeError(
                    f"Worker '{worker_label}' failed during "
                    f"observe-think-act cycle: {e}"
                ) from e
            elif on_error == ErrorStrategy.IGNORE:
                pass
            elif on_error == ErrorStrategy.RETRY:
                for attempt in range(max_retries + 1):
                    try:
                        finished = await _run_observe_think_act(worker, context)
                        break
                    except Exception as e:
                        if attempt == max_retries:
                            raise RuntimeError(
                                f"Worker '{worker_label}' failed after "
                                f"{max_retries + 1} retries: {e}"
                            ) from e
        finally:
            # Record and restore the execution status of the worker
            self.spent_tokens += worker.spent_tokens - tokens_before
            if injected_verbose:
                worker._verbose = None
            if original_tools is not None:
                context.tools = original_tools
            if original_skills is not None:
                if filtered_skills is not None:
                    for filtered_idx, detail in filtered_skills._revealed.items():
                        orig_idx = filtered_to_orig.get(filtered_idx)
                        if orig_idx is not None:
                            original_skills._revealed[orig_idx] = detail
                context.skills = original_skills

        return finished

    @asynccontextmanager
    async def snapshot(self, *, goal: Optional[str] = None,
                   keep_revealed: Optional[Dict[str, List[int]]] = None,
                   **snapshot_fields):
        """
        Temporarily override context fields for the duration of the block.

        Parameters
        ----------
        goal : Optional[str]
            Temporary goal injected into the context so the LLM knows
            the purpose of this phase.
        keep_revealed : Optional[Dict[str, List[int]]]
            Passed to ``_AgentSnapshot`` for revealed state management.
        **snapshot_fields
            Additional context fields to temporarily override during this phase.
        """
        async with self._phase_context(keep_revealed=keep_revealed,
                                       goal=goal, **snapshot_fields):
            yield

    @asynccontextmanager
    async def _phase_context(self, *,
                             keep_revealed: Optional[Dict[str, List[int]]] = None,
                             **snapshot_fields):
        """Shared implementation for snapshot() context manager.

        Subclasses can override this to inject custom behavior around
        snapshot blocks (e.g., logging, metrics, extended trace capture).

        Parameters
        ----------
        keep_revealed : Optional[Dict[str, List[int]]]
            Revealed state management mode for ``_AgentSnapshot``.
        **snapshot_fields
            Context fields to temporarily override during this phase.
        """
        context = self._current_context
        fields = {k: v for k, v in snapshot_fields.items() if v is not None}

        if not fields and keep_revealed is None:
            raise ValueError(
                "snapshot(): no snapshot fields, keep_revealed, "
                "or goal provided. If no context state needs to be scoped, "
                "use _run() directly — the behavior is identical."
            )

        snap = _AgentSnapshot(context, fields, keep_revealed=keep_revealed)
        await snap.__aenter__()
        try:
            yield
        finally:
            await snap.__aexit__(None, None, None)

    async def _run_workflow(
        self,
        ctx: CognitiveContextT,
        *,
        will_fallback: bool = True,
        max_consecutive_fallbacks: int = 1,
    ) -> None:
        """Consume on_workflow() generator, executing each step.

        Uses asend() to return tool execution results (List[ToolResult]) back
        to the generator, enabling ``result = yield ActionCall(...)`` syntax.

        For each yielded item:
        - ``ActionCall``: run observe → log → act deterministically. If execution fails, fall back to agent mode for the current step.
        - ``HumanCall``: pause and await human input via request_feedback_async.
        - ``AgentCall``: delegate to ``_run()`` for LLM-driven execution.

        If consecutive failures exceed ``max_consecutive_fallbacks``, abandon
        the workflow and delegate the remaining task to ``on_agent()``
        (full agent mode).
        """
        consecutive_failures = 0
        if not will_fallback:
            max_consecutive_fallbacks = 0
        step_index = 0
        failed_steps = []  # Track failed step info for error reporting

        gen = self.on_workflow(ctx)
        send_value = None  # First iteration uses None (equivalent to __anext__)

        try:
            while True:
                # Get next item from generator (send previous result back)
                try:
                    if send_value is None:
                        item = await gen.__anext__()
                    else:
                        item = await gen.asend(send_value)
                    send_value = None  # Reset for next iteration
                except StopAsyncIteration:
                    break

                # When returning an AgentCall proactively in on_workflow, it is by default assumed that a brand new and clean
                # context is initiated for a new goal target to execute the agent mode. Otherwise, you can customize the
                # context information: tools, skills, history.
                if isinstance(item, AgentCall):
                    call_worker = item.worker if item.worker is not None else CognitiveWorker.inline("Complete the goal.")
                    history = item.history if item.history is not None else CognitiveHistory()
                    async with self.snapshot(goal=item.goal, cognitive_history=history):
                        await self._run(call_worker, tools=item.tools, skills=item.skills, max_attempts=item.max_attempts)
                    consecutive_failures = 0
                    send_value = None
                    continue

                if isinstance(item, HumanCall):
                    self._log("Workflow", f"Requesting human input: {item.prompt}", color="yellow")
                    response = await self.request_human(item.prompt, timeout=item.timeout)
                    self._log("Workflow", f"Human responded: {response[:100]}{'...' if len(response) > 100 else ''}", color="green")
                    consecutive_failures = 0
                    send_value = response
                    continue

                # Deterministic execution (ActionCall)
                worker = item.worker
                decision = item.decision

                # 1. Observe
                if worker is not None:
                    obs = await worker.observation(ctx)
                    if obs is _DELEGATE:
                        obs = await self.observation(ctx)
                else:
                    obs = await self.observation(ctx)
                ctx.observation = obs

                obs_str = str(obs) if obs is not None else "None"
                if len(obs_str) > 200:
                    obs_str = obs_str[:200] + "..."
                self._log("Observe", f"workflow: {obs_str}", color="green")
                self._log("Think", f"workflow: {decision.step_content}", color="cyan")

                # 2. Act — with fallback on failure
                try:
                    action_result = await self._action(decision, ctx, _worker=worker)

                    # Check if any tool execution failed
                    inner = getattr(action_result, "result", None)
                    if isinstance(inner, ActionResult):
                        failed = [
                            r for r in inner.results
                            if not r.success
                        ]
                        if failed:
                            errors = "; ".join(
                                f"{r.tool_name}: {r.error}" for r in failed
                            )
                            raise RuntimeError(
                                f"Tool execution failed for: "
                                f"{decision.step_content}{errors}"
                            )

                    if action_result is not None:
                        formatted = action_result.model_dump_json(indent=4)
                        self._log("Act", f"workflow:\n{formatted}", color="purple")

                    # 3. Record trace step (if capturing)
                    self._record_trace_step(worker, obs, decision, action_result, ctx)
                    consecutive_failures = 0
                    step_index += 1

                    # 4. Build ToolResult list to send back via asend()
                    send_value = self._build_tool_results(action_result)

                except Exception as e:
                    if not will_fallback:
                        raise e

                    consecutive_failures += 1
                    step_index += 1
                    failed_steps.append(f"Step {step_index}: {decision.step_content}{e}")
                    self._log(
                        "Workflow",
                        f"[ERROR] Step {step_index} failed "
                        f"({consecutive_failures}/{max_consecutive_fallbacks}): {e}",
                        color="red",
                    )

                    # Check if consecutive failures exceed threshold → full fallback
                    if consecutive_failures >= max_consecutive_fallbacks:
                        if not self._has_agent():
                            failed_summary = "\n".join(failed_steps)
                            raise RuntimeError(
                                f"Workflow degradation failed: consecutive failures reached "
                                f"{max_consecutive_fallbacks}, but on_agent() is not overridden "
                                f"so fallback cannot proceed.\n"
                                f"Failed steps:\n{failed_summary}"
                            )
                        self._log(
                            "Workflow",
                            f"[ERROR] Consecutive failures reached {max_consecutive_fallbacks}, "
                            f"falling back to full agent mode (on_agent).",
                            color="red",
                        )
                        await self.on_agent(ctx)
                        return

                    # Step-level fallback: construct a scoped goal and let agent fix it
                    fallback_goal = (
                        f"[Workflow fallback] Step {step_index} failed.\n"
                        f"Step intent: {decision.step_content}\n"
                        f"Error: {e}\n\n"
                        f"You must do TWO things:\n"
                        f"1. Resolve the error — fix whatever is blocking this step (e.g. login, navigate, wait for page load).\n"
                        f"2. Complete the original step intent: {decision.step_content}\n\n"
                        f"Set finish=True ONLY after both are done. "
                        f"Do NOT continue with subsequent steps."
                    )
                    self._log(
                        "Workflow",
                        f"Falling back to agent mode for step {step_index}: "
                        f"{decision.step_content}",
                        color="yellow",
                    )
                    async with self.snapshot(goal=fallback_goal):
                        await self._run(
                            worker if worker is not None else CognitiveWorker.inline("Complete the goal."),
                            max_attempts=self.WORKFLOW_STEP_FALLBACK_MAX_ATTEMPTS,
                        )
                    send_value = None  # No result to send back after fallback
        finally:
            await gen.aclose()

    @staticmethod
    def _build_tool_results(action_result: Optional[Step]) -> List[ToolResult]:
        """Convert an action Step into a List[ToolResult] for asend() back to the generator."""
        if action_result is None:
            return []
        inner = getattr(action_result, "result", None)
        if inner is not None and isinstance(inner, ActionResult):
            return [
                ToolResult(
                    tool_name=r.tool_name,
                    tool_arguments=r.tool_arguments,
                    result=r.tool_result,
                    success=r.success,
                    error=r.error,
                )
                for r in inner.results
            ]
        return []

    async def _action(
        self,
        decision: Any,
        ctx: CognitiveContextT,
        *,
        _worker: Optional[CognitiveWorker] = None,
    ) -> Step:
        """
        Execute the action phase based on the thinking decision.

        Routes to ``action_tool_call()`` for tool-call output or
        ``action_custom_output()`` for structured output (output_schema).
        Calls ``before_action()`` on both the worker and agent level
        (with delegation via ``_DELEGATE``).

        Parameters
        ----------
        decision : Any
            The thinking decision with 'output' field (List[StepToolCall] or BaseModel).
        ctx : CognitiveContextT
            The cognitive context.
        _worker : Optional[CognitiveWorker]
            The worker that produced this decision (used for before_action callback).
        """ 
        def _is_list_step_tool_call(d: Any) -> bool:
            # Get the declared type of the output field
            if not isinstance(d, BaseModel):
                return False
            fi = type(d).model_fields.get('output')
            if fi is None:
                return False
            ann = fi.annotation
            if get_origin(ann) is Annotated:
                ann = get_args(ann)[0]

            # if the type is not List[StepToolCall], return False
            if ann is None:
                return False
            origin = get_origin(ann)
            if origin is list:
                args = get_args(ann)
                return len(args) == 1 and args[0] is StepToolCall
            return False

        def _convert_decision_to_tool_calls(calls: List, ctx: CognitiveContextT) -> List[ToolCall]:
            """Convert a list of StepToolCall into ToolCall objects with type-coerced arguments."""
            _, tool_specs = ctx.get_field('tools')
            tool_calls = []

            for idx, call in enumerate(calls):
                tool_spec = next((s for s in tool_specs if s.tool_name == call.tool), None)
                param_types: Dict[str, str] = {}
                if tool_spec and tool_spec.tool_parameters:
                    for name, info in tool_spec.tool_parameters.get('properties', {}).items():
                        param_types[name] = info.get('type', 'string')

                arguments: Dict[str, Any] = {}
                for arg in call.tool_arguments:
                    value: Any = arg.value
                    param_type = param_types.get(arg.name, 'string')
                    if param_type == 'integer':
                        try:
                            value = int(value)
                        except (ValueError, TypeError):
                            pass
                    elif param_type == 'number':
                        try:
                            value = float(value)
                        except (ValueError, TypeError):
                            pass
                    elif param_type == 'boolean':
                        value = value.lower() in ('true', '1', 'yes')
                    arguments[arg.name] = value

                tool_calls.append(ToolCall(id=f"call_{idx}", name=call.tool, arguments=arguments))

            return tool_calls

        def _match_tool_calls(tool_calls: List[ToolCall], ctx: CognitiveContextT) -> List[Tuple[ToolCall, ToolSpec]]:
            """Match each ToolCall to its ToolSpec by name."""
            _, tool_specs = ctx.get_field('tools')
            matched: List[Tuple[ToolCall, ToolSpec]] = []
            for tc in tool_calls:
                for spec in tool_specs:
                    if tc.name == spec.tool_name:
                        if tc.arguments.get("__args__") is not None:
                            props = list(spec.tool_parameters.get('properties', {}).keys())
                            args = tc.arguments.get("__args__")
                            if isinstance(args, list):
                                tc.arguments = dict(zip(props, args))
                            else:
                                tc.arguments = {props[0]: args} if props else {}
                        matched.append((tc, spec))
                        break
            return matched

        ########################
        # Analysis of the decision output
        ########################
        # Get the output structure of the decision
        output = getattr(decision, 'output', None)

        # If the output is a list of StepToolCall
        if _is_list_step_tool_call(decision):
            calls = output
            tool_calls = _convert_decision_to_tool_calls(calls, ctx)
            decision_result = _match_tool_calls(tool_calls, ctx)

        # If the output is a BaseModel
        else:
            decision_result = output


        ########################
        # Execution of the action based on the decision result
        ########################
        # before_action delegation: worker → agent (if _DELEGATE)
        original_decision_result = decision_result
        if _worker is not None:
            decision_result = await _worker.before_action(decision_result, ctx)
            if decision_result is _DELEGATE:
                decision_result = await self.before_action(original_decision_result, ctx)
        else:
            decision_result = await self.before_action(decision_result, ctx)

        # Execute the action based on the (possibly delegated) decision result
        result = None
        if _is_list_step_tool_call(decision):
            if not calls:
                result = Step(
                    content=decision.step_content,
                    result=None,
                    metadata={"tool_calls": []}
                )
                ctx.add_info(result)
            else:
                action_result = await self.action_tool_call(decision_result, ctx)
                result = Step(
                    content=decision.step_content,
                    result=action_result,
                    metadata={}
                )
                ctx.add_info(result)
        else:
            action_result = await self.action_custom_output(decision_result, ctx)
            result = Step(content=decision.step_content, result=action_result, metadata={})
            ctx.add_info(result)

        # after_action delegation: worker → agent (if _DELEGATE)
        if _worker is not None:
            delegate = await _worker.after_action(result, ctx)
            if delegate is _DELEGATE:
                await self.after_action(result, ctx)
        else:
            await self.after_action(result, ctx)

        return result

    ############################################################################
    # Internal Helper methods
    ############################################################################

    def _log(self, stage: str, message: str, data: Any = None, color: str = "white"):
        """Log formatted message with timestamp and caller location.

        Format: ``[HH:MM:SS.mmm] [Stage] (file:line) message``

        Only prints when ``self._verbose`` is True.
        """
        if not self._verbose:
            return
        import inspect
        from datetime import datetime
        from os.path import basename

        frame = inspect.currentframe()
        try:
            caller = frame.f_back if frame is not None else None
            if caller is not None:
                filename = basename(caller.f_code.co_filename)
                lineno = caller.f_lineno
            else:
                filename, lineno = "?", 0
        finally:
            del frame

        ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
        line = f"[{ts}] [{stage}] ({filename}:{lineno}) {message}"
        printer.print(line, color=color)
        if data is not None:
            printer.print(str(data), color="gray")

    def _record_trace_step(self, worker: Optional[CognitiveWorker], obs: str, decision: Any, action_result: Step, context: Any) -> None:
        """Record a trace step to the workflow builder (if capture is active).

        Detects the output type from the action_result:
        - Tool calls (ActionResult with results) → TOOL_CALLS
        - Structured BaseModel output → STRUCTURED
        - Everything else (content only, no action) → CONTENT_ONLY
        """
        if self._agent_trace is None:
            return

        tool_calls = []
        output_type = StepOutputType.CONTENT_ONLY
        structured_output = None
        structured_output_class = None

        if action_result is not None and isinstance(action_result, Step):
            result_obj = action_result.result
            if isinstance(result_obj, ActionResult):
                # Tool call output
                output_type = StepOutputType.TOOL_CALLS
                for r in result_obj.results:
                    tool_calls.append({
                        "tool_name": r.tool_name,
                        "tool_arguments": r.tool_arguments,
                        "tool_result": r.tool_result,
                        "success": r.success,
                        "error": r.error,
                    })
            elif result_obj is not None and isinstance(result_obj, BaseModel):
                # Structured BaseModel output
                output_type = StepOutputType.STRUCTURED
                structured_output = result_obj.model_dump()
                structured_output_class = (
                    f"{result_obj.__class__.__module__}.{result_obj.__class__.__qualname__}"
                )
            elif result_obj is not None:
                # Non-BaseModel custom output — store as structured with no class
                output_type = StepOutputType.STRUCTURED
                try:
                    structured_output = {"__value__": result_obj}
                except Exception:
                    structured_output = {"__value__": str(result_obj)}
            # else: result_obj is None → CONTENT_ONLY (default)

        self._agent_trace.record_step({
            "name": worker.__class__.__name__ if worker is not None else "workflow",
            "step_content": getattr(decision, "step_content", ""),
            "tool_calls": tool_calls,
            "observation": str(obs) if obs is not None else None,
            "observation_hash": observation_fingerprint(obs),
            "output_type": output_type.value,
            "structured_output": structured_output,
            "structured_output_class": structured_output_class,
        })

    def _has_workflow(self) -> bool:
        """Check whether the subclass has overridden on_workflow()."""
        return type(self).on_workflow is not AmphibiousAutoma.on_workflow

    def _has_agent(self) -> bool:
        """Check whether the subclass has overridden on_agent()."""
        return type(self).on_agent is not AmphibiousAutoma.on_agent

    def _resolve_mode(self, mode: RunMode) -> RunMode:
        """Resolve ``RunMode.AUTO`` to a concrete mode based on overridden template methods.

        Resolution rules:
        - both ``on_agent`` and ``on_workflow`` overridden → ``RunMode.AMPHIFLOW``
        - only ``on_workflow`` overridden → ``RunMode.WORKFLOW``
        - only ``on_agent`` overridden → ``RunMode.AGENT``
        - neither overridden → ``RuntimeError``

        Non-AUTO modes are returned unchanged.
        """
        if mode is not RunMode.AUTO:
            return mode
        has_agent = self._has_agent()
        has_workflow = self._has_workflow()
        if has_agent and has_workflow:
            return RunMode.AMPHIFLOW
        if has_workflow:
            return RunMode.WORKFLOW
        if has_agent:
            return RunMode.AGENT
        raise RuntimeError(
            f"{type(self).__name__} must override on_agent() or on_workflow()."
        )

    ############################################################################
    # Entry point
    ############################################################################
    @worker(is_start=True)
    async def router(self, mode: RunMode, max_consecutive_fallbacks: int) -> str:
        """
        Router worker: dispatches to the correct execution mode.

        ``RunMode.AUTO`` is resolved upstream in ``arun()``, so this worker
        always receives a concrete mode.
        """
        if mode is RunMode.AGENT:
            self._log("Router", "Ferrying to AGENT mode", color="green")
            self.ferry_to("_cognition")
        elif mode is RunMode.WORKFLOW:
            self._log("Router", "Ferrying to WORKFLOW mode", color="green")
            self.ferry_to("_workflow")
        elif mode is RunMode.AMPHIFLOW:
            self._log("Router", f"Ferrying to AMPHIFLOW mode, max_consecutive_fallbacks={max_consecutive_fallbacks}", color="green")
            self.ferry_to("_amphiflow", max_consecutive_fallbacks=max_consecutive_fallbacks)
        else:
            raise RuntimeError(f"Unsupported run mode: {mode!r}")

    @worker(is_output=True)
    async def _cognition(self) -> str:
        """
        Agent mode entry point: delegates to the user-defined on_agent() method.
        """
        await self.on_agent(self._current_context)
        return self._final_answer or self._current_context.summary()

    @worker(is_output=True)
    async def _workflow(self) -> str:
        """
        Workflow mode entry point: runs on_workflow() without agent fallback.
        """
        await self._run_workflow(self._current_context, will_fallback=False)
        return self._final_answer or self._current_context.summary()

    @worker(is_output=True)
    async def _amphiflow(self, max_consecutive_fallbacks: int) -> str:
        """
        Entry point: runs on_workflow() with agent fallback support (amphiflow mode).
        """
        await self._run_workflow(self._current_context, will_fallback=True, max_consecutive_fallbacks=max_consecutive_fallbacks)
        return self._final_answer or self._current_context.summary()

    async def arun(
        self,
        *,
        context: Optional[CognitiveContextT] = None,
        trace_running: bool = False,
        mode: Optional[RunMode] = RunMode.AUTO,
        max_consecutive_fallbacks: int = 1,
        **kwargs
    ) -> str:
        """
        Run the agent.

        Routes to one of the execution modes:
        1. Agent mode — LLM-driven ``on_agent()`` path.
        2. Workflow mode — deterministic ``on_workflow()`` path (no fallback).
        3. Amphiflow mode — ``on_workflow()`` with automatic agent fallback.
        4. Auto mode (default) — resolved from which template methods the
           subclass overrides: both → AMPHIFLOW, only ``on_workflow`` → WORKFLOW,
           only ``on_agent`` → AGENT.

        Context initialization has two paths:
        1. Pre-created: ``arun(context=my_ctx)``
        2. Auto-created: ``arun(goal="...", tools=[...], skills=[...])``

        Parameters
        ----------
        context : Optional[CognitiveContextT]
            Pre-created context object. If provided, uses this context directly.
        trace_running : bool
            If True, enables trace capture via AgentTrace during execution.
        mode : Optional[RunMode]
            Execution mode. ``RunMode.AGENT`` forces agent mode,
            ``RunMode.WORKFLOW`` forces workflow mode (no fallback),
            ``RunMode.AMPHIFLOW`` forces workflow with agent fallback,
            ``RunMode.AUTO`` (default) auto-detects from which template methods
            are overridden.
        max_consecutive_fallbacks : int
            Maximum consecutive workflow step failures before switching
            to full agent mode. Only applies to amphiflow mode. Default is 1.
        **kwargs
            Arguments passed to CognitiveContext constructor when ``context``
            is not provided (e.g., ``goal``, ``tools``, ``skills``).

        Returns
        -------
        str
            Summary of the context after execution.
        """
        def _build_trace(automa: "AmphibiousAutoma") -> Dict[str, Any]:
            """Build a trace dict from the workflow builder of the last run."""
            import time as _time
            metadata = {
                "automa_class": f"{automa.__class__.__module__}.{automa.__class__.__qualname__}",
                "context_class": (
                    f"{automa._context_class.__module__}.{automa._context_class.__qualname__}"
                    if automa._context_class else None
                ),
                "timestamp": _time.time(),
                "spent_tokens": automa.spent_tokens,
                "spent_time": automa.spent_time,
            }
            return automa._agent_trace.build(metadata=metadata)

        async def _run_and_report(context: CognitiveContextT) -> str:
            """Run the agent, measure time, and log summary."""
            start_time = time.time()
            result = await GraphAutoma.arun(
                self, resolved_mode,
                max_consecutive_fallbacks=max_consecutive_fallbacks,
            )
            self.spent_time = time.time() - start_time

            if self._verbose:
                agent_name = self.name or self.__class__.__name__
                separator = "=" * 50
                printer.print(separator, color="cyan")
                printer.print(
                    f"  {agent_name} | Completed\n"
                    f"Tokens: {self.spent_tokens} | "
                    f"Time: {self.spent_time:.2f}s",
                    color="cyan"
                )
                printer.print(separator, color="cyan")

            return result

        ########################
        # Pre-initialize status
        ########################
        self.spent_tokens = 0
        self.spent_time = 0.0
        self._final_answer = None

        resolved_mode = self._resolve_mode(mode if mode is not None else RunMode.AUTO)
        if self._llm is None and resolved_mode in (RunMode.AGENT, RunMode.AMPHIFLOW):
            raise RuntimeError(
                f"AmphibiousAutoma must be initialized with an LLM for "
                f"{resolved_mode.value} mode."
            )

        if trace_running:
            self._agent_trace = AgentTrace()
        else:
            self._agent_trace = None

        ########################
        # Initialize context
        ########################
        if context is not None:
            if not isinstance(context, self._context_class):
                raise ValueError(
                    f"Context must be an instance of {self._context_class.__name__}, "
                    f"got {type(context).__name__}"
                )

        # Separate Exposure fields (tools, skills, etc.) from regular constructor args
        exposure_fields = self._context_class._exposure_fields
        if exposure_fields is None:
            exposure_fields = self._context_class._detect_exposure_fields()
            self._context_class._exposure_fields = exposure_fields

        # Create the context
        constructor_kwargs = {}
        exposure_items = {}  # {field_name: list_of_items}
        for key, value in kwargs.items():  # Add fields to the context that can directly be added to the context
            if key in exposure_fields and isinstance(value, (list, tuple)):
                exposure_items[key] = value
            elif key in exposure_fields and isinstance(value, Exposure):
                constructor_kwargs[key] = value
            else:
                constructor_kwargs[key] = value

        if context is None:
            context = self._context_class(**constructor_kwargs)  # Create the context
        for field_name, items in exposure_items.items():  # Add items to Exposure fields
            attr = getattr(context, field_name)
            for item in items:
                attr.add(item)

        # Inject built-in tools (e.g. request_human) so on_agent execution can
        # trigger framework-level capabilities autonomously.
        existing_tool_names = {t.tool_name for t in context.tools.get_all()}
        for builtin in _BUILTIN_TOOLS:
            if builtin.tool_name not in existing_tool_names:
                context.tools.add(builtin)

        # Set the LLM to the context
        if self._llm is not None:
            context.set_llm(self._llm)
        self._current_context = context


        ########################
        # Run the amphibious automa
        ########################
        token = current_agent.set(self)
        try:
            result = await _run_and_report(context=context)
            if trace_running and self._agent_trace:
                _build_trace(self)
            return result
        finally:
            current_agent.reset(token)

llm property

llm: Optional[Any]

Access the agent's default LLM.

context property

context: Optional[CognitiveContextT]

Access the current context.

final_answer property

final_answer: Optional[str]

The final answer produced by the last arun() call.

Automatically captured from the step_content of the finishing step (agent mode) or the last executed step (workflow mode). Can be overridden explicitly via set_final_answer().

set_final_answer

set_final_answer(answer: str) -> None

Explicitly set the final answer.

Call this inside on_agent() or on_workflow() to override the auto-captured value.

Parameters:

Name Type Description Default
answer str

The final answer text.

required
Source code in bridgic/amphibious/_amphibious_automa.py
def set_final_answer(self, answer: str) -> None:
    """Explicitly set the final answer.

    Call this inside ``on_agent()`` or ``on_workflow()`` to override
    the auto-captured value.

    Parameters
    ----------
    answer : str
        The final answer text.
    """
    self._final_answer = answer

request_human

async
request_human(
    prompt: str, *, timeout: Optional[float] = None
) -> str

Request input from the human operator.

This is the primary entry point for code-level human-in-the-loop calls inside on_agent() (between think units) or from the request_human_tool closure.

Parameters:

Name Type Description Default
prompt str

The question or message to present to the human.

required
timeout Optional[float]

Seconds to wait before raising TimeoutError. None means wait indefinitely.

None

Returns:

Type Description
str

The human's response text.

Raises:

Type Description
TimeoutError

If no response is received within timeout seconds.

Source code in bridgic/amphibious/_amphibious_automa.py
async def request_human(self, prompt: str, *, timeout: Optional[float] = None) -> str:
    """Request input from the human operator.

    This is the primary entry point for code-level human-in-the-loop
    calls inside ``on_agent()`` (between think units) or from the
    ``request_human_tool`` closure.

    Parameters
    ----------
    prompt : str
        The question or message to present to the human.
    timeout : Optional[float]
        Seconds to wait before raising ``TimeoutError``. None means
        wait indefinitely.

    Returns
    -------
    str
        The human's response text.

    Raises
    ------
    TimeoutError
        If no response is received within ``timeout`` seconds.
    """
    event = Event(
        event_type=HUMAN_INPUT_EVENT_TYPE,
        data={"prompt": prompt, "timeout": timeout},
    )
    feedback = await self.request_feedback_async(event, timeout=timeout)
    return feedback.data

observation

async
observation(ctx: CognitiveContextT) -> Optional[str]

Agent-level default observation, shared across all workers.

Called by _run() before each thinking phase. Workers can enhance this via their own observation() method.

Parameters:

Name Type Description Default
ctx CognitiveContextT

The cognitive context.

required

Returns:

Type Description
Optional[str]

Custom observation text, or None.

Source code in bridgic/amphibious/_amphibious_automa.py
async def observation(self, ctx: CognitiveContextT) -> Optional[str]:
    """
    Agent-level default observation, shared across all workers.

    Called by ``_run()`` before each thinking phase. Workers can
    enhance this via their own ``observation()`` method.

    Parameters
    ----------
    ctx : CognitiveContextT
        The cognitive context.

    Returns
    -------
    Optional[str]
        Custom observation text, or None.
    """
    return None

on_agent

async
on_agent(ctx: CognitiveContextT) -> None

Agent mode: LLM-driven orchestration logic.

Override this method to define how think_units are orchestrated. Use standard Python control flow combined with await self.think_unit calls.

A subclass may override this, on_workflow, or both. The default implementation is a no-op so that subclasses which only implement on_workflow remain instantiable.

Parameters:

Name Type Description Default
ctx CognitiveContextT

The cognitive context, used for accessing state and conditions.

required

Examples:

1
2
3
>>> async def on_agent(self, ctx: MyContext):
...     await self.main_think
...     await self.exec_think.until(lambda ctx: ctx.done, max_attempts=20)
Source code in bridgic/amphibious/_amphibious_automa.py
async def on_agent(self, ctx: CognitiveContextT) -> None:
    """
    Agent mode: LLM-driven orchestration logic.

    Override this method to define how think_units are orchestrated.
    Use standard Python control flow combined with ``await self.think_unit``
    calls.

    A subclass may override this, ``on_workflow``, or both. The default
    implementation is a no-op so that subclasses which only implement
    ``on_workflow`` remain instantiable.

    Parameters
    ----------
    ctx : CognitiveContextT
        The cognitive context, used for accessing state and conditions.

    Examples
    --------
    >>> async def on_agent(self, ctx: MyContext):
    ...     await self.main_think
    ...     await self.exec_think.until(lambda ctx: ctx.done, max_attempts=20)
    """
    return None

on_workflow

async
on_workflow(
    ctx: CognitiveContextT,
) -> AsyncGenerator[
    Union[ActionCall, HumanCall, AgentCall], None
]

Workflow mode: deterministic execution as an async generator.

Override this method to define a deterministic workflow. When overridden, arun() automatically routes to workflow mode instead of on_agent().

Three yield types are supported: - ActionCall — deterministic single-tool execution - HumanCall — pause and request human input (returned as str via asend) - AgentCall — delegate a sub-task to LLM agent mode

The generator exhausting signals workflow completion — no finish signal needed. Use result = yield ActionCall(...) to receive tool execution results via asend().

Examples:

1
2
3
4
>>> async def on_workflow(self, ctx):
...     yield ActionCall("navigate_to", url="http://example.com")
...     result = yield ActionCall("click_element_by_ref", ref="42")
...     yield AgentCall(goal="Handle complex case", max_attempts=10)
Source code in bridgic/amphibious/_amphibious_automa.py
async def on_workflow(self, ctx: CognitiveContextT) -> AsyncGenerator[Union[ActionCall, HumanCall, AgentCall], None]:
    """Workflow mode: deterministic execution as an async generator.

    Override this method to define a deterministic workflow. When overridden,
    ``arun()`` automatically routes to workflow mode instead of ``on_agent()``.

    Three yield types are supported:
    - ``ActionCall``  — deterministic single-tool execution
    - ``HumanCall``   — pause and request human input (returned as str via asend)
    - ``AgentCall``   — delegate a sub-task to LLM agent mode

    The generator exhausting signals workflow completion — no finish signal needed.
    Use ``result = yield ActionCall(...)`` to receive tool execution results via asend().

    Examples
    --------
    >>> async def on_workflow(self, ctx):
    ...     yield ActionCall("navigate_to", url="http://example.com")
    ...     result = yield ActionCall("click_element_by_ref", ref="42")
    ...     yield AgentCall(goal="Handle complex case", max_attempts=10)
    """
    if False:  # pragma: no cover — makes this a proper async generator stub
        yield

before_action

async
before_action(
    decision_result: Any, ctx: CognitiveContextT
) -> Any

Agent-level before_action hook, shared across all workers.

Called when a worker's before_action() returns _DELEGATE. Override to intercept and modify tool calls at the agent level.

Parameters:

Name Type Description Default
decision_result Any

The decision result (typically List[Tuple[ToolCall, ToolSpec]]).

required
ctx CognitiveContextT

The cognitive context.

required

Returns:

Type Description
Any

Verified/adjusted decision result.

Source code in bridgic/amphibious/_amphibious_automa.py
async def before_action(
    self,
    decision_result: Any,
    ctx: CognitiveContextT,
) -> Any:
    """
    Agent-level before_action hook, shared across all workers.

    Called when a worker's ``before_action()`` returns ``_DELEGATE``.
    Override to intercept and modify tool calls at the agent level.

    Parameters
    ----------
    decision_result : Any
        The decision result (typically List[Tuple[ToolCall, ToolSpec]]).
    ctx : CognitiveContextT
        The cognitive context.

    Returns
    -------
    Any
        Verified/adjusted decision result.
    """
    return decision_result

action_tool_call

async
action_tool_call(
    tool_list: List[Tuple[ToolCall, ToolSpec]],
    context: CognitiveContextT,
) -> ActionResult

Execute tool calls concurrently and collect results.

Override this method to customize tool execution behavior (e.g., sequential execution, rate limiting, sandboxing).

Parameters:

Name Type Description Default
tool_list List[Tuple[ToolCall, ToolSpec]]

Matched tool call / spec pairs to execute.

required
context CognitiveContextT

The current cognitive context.

required

Returns:

Type Description
ActionResult

Aggregated results with per-tool success/failure status.

Source code in bridgic/amphibious/_amphibious_automa.py
async def action_tool_call(self, tool_list: List[Tuple[ToolCall, ToolSpec]], context: CognitiveContextT) -> ActionResult:
    """
    Execute tool calls concurrently and collect results.

    Override this method to customize tool execution behavior
    (e.g., sequential execution, rate limiting, sandboxing).

    Parameters
    ----------
    tool_list : List[Tuple[ToolCall, ToolSpec]]
        Matched tool call / spec pairs to execute.
    context : CognitiveContextT
        The current cognitive context.

    Returns
    -------
    ActionResult
        Aggregated results with per-tool success/failure status.
    """

    async def _run_one(tool_call: ToolCall, tool_spec: ToolSpec) -> ActionStepResult:
        tool_worker = tool_spec.create_worker()
        sandbox = ConcurrentAutoma()
        worker_key = f"tool_{tool_call.name}_{tool_call.id}"
        sandbox.add_worker(
            key=worker_key,
            worker=tool_worker,
            args_mapping_rule=ArgsMappingRule.UNPACK,
        )
        try:
            results = await sandbox.arun(InOrder([tool_call.arguments]))
            result = results[0] if results else None
            return ActionStepResult(
                tool_id=tool_call.id,
                tool_name=tool_call.name,
                tool_arguments=tool_call.arguments,
                tool_result=result,
                success=True,
            )
        except Exception as e:
            return ActionStepResult(
                tool_id=tool_call.id,
                tool_name=tool_call.name,
                tool_arguments=tool_call.arguments,
                tool_result=None,
                success=False,
                error=str(e),
            )

    step_results = await asyncio.gather(
        *(_run_one(tc, ts) for tc, ts in tool_list)
    )
    return ActionResult(results=list(step_results))

action_custom_output

async
action_custom_output(
    decision_result: Any, context: CognitiveContextT
) -> Any

Handle structured output from a worker with output_schema set.

Called instead of action_tool_call() when the worker produces a typed Pydantic instance (via output_schema) rather than tool calls. Override to post-process or validate structured output.

Parameters:

Name Type Description Default
decision_result Any

The structured output instance produced by the worker.

required
context CognitiveContextT

The current cognitive context.

required

Returns:

Type Description
Any

The (optionally processed) result to store in execution history.

Source code in bridgic/amphibious/_amphibious_automa.py
async def action_custom_output(self, decision_result: Any, context: CognitiveContextT) -> Any:
    """
    Handle structured output from a worker with ``output_schema`` set.

    Called instead of ``action_tool_call()`` when the worker produces
    a typed Pydantic instance (via ``output_schema``) rather than tool calls.
    Override to post-process or validate structured output.

    Parameters
    ----------
    decision_result : Any
        The structured output instance produced by the worker.
    context : CognitiveContextT
        The current cognitive context.

    Returns
    -------
    Any
        The (optionally processed) result to store in execution history.
    """
    return decision_result

after_action

async
after_action(
    step_result: Any, ctx: CognitiveContextT
) -> None

Agent-level after_action hook.

Called after action execution and before the result is returned. Override to update custom context fields based on tool results.

Parameters:

Name Type Description Default
step_result Any

The result of the action step.

required
ctx CognitiveContextT

The current cognitive context.

required
Source code in bridgic/amphibious/_amphibious_automa.py
async def after_action(self, step_result: Any, ctx: CognitiveContextT) -> None:
    """
    Agent-level after_action hook.

    Called after action execution and before the result is returned.
    Override to update custom context fields based on tool results.

    Parameters
    ----------
    step_result : Any
        The result of the action step.
    ctx : CognitiveContextT
        The current cognitive context.
    """
    pass

human_input

async
human_input(data: Dict[str, Any]) -> str

Template method invoked when the agent requests human input.

Override this in your subclass to integrate with your UI or messaging layer. The default implementation reads from stdin.

Parameters:

Name Type Description Default
data Dict[str, Any]

Event payload — always contains "prompt"; may contain "timeout" and other metadata added by the caller.

required

Returns:

Type Description
str

The human's response text.

Source code in bridgic/amphibious/_amphibious_automa.py
async def human_input(self, data: Dict[str, Any]) -> str:
    """Template method invoked when the agent requests human input.

    Override this in your subclass to integrate with your UI or
    messaging layer.  The default implementation reads from stdin.

    Parameters
    ----------
    data : Dict[str, Any]
        Event payload — always contains ``"prompt"``; may contain
        ``"timeout"`` and other metadata added by the caller.

    Returns
    -------
    str
        The human's response text.
    """
    prompt = data.get("prompt", "Human input required:")
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None, input, f"\n[HumanInput] {prompt}\n> "
    )

snapshot

async
snapshot(
    *,
    goal: Optional[str] = None,
    keep_revealed: Optional[Dict[str, List[int]]] = None,
    **snapshot_fields
)

Temporarily override context fields for the duration of the block.

Parameters:

Name Type Description Default
goal Optional[str]

Temporary goal injected into the context so the LLM knows the purpose of this phase.

None
keep_revealed Optional[Dict[str, List[int]]]

Passed to _AgentSnapshot for revealed state management.

None
**snapshot_fields

Additional context fields to temporarily override during this phase.

{}
Source code in bridgic/amphibious/_amphibious_automa.py
@asynccontextmanager
async def snapshot(self, *, goal: Optional[str] = None,
               keep_revealed: Optional[Dict[str, List[int]]] = None,
               **snapshot_fields):
    """
    Temporarily override context fields for the duration of the block.

    Parameters
    ----------
    goal : Optional[str]
        Temporary goal injected into the context so the LLM knows
        the purpose of this phase.
    keep_revealed : Optional[Dict[str, List[int]]]
        Passed to ``_AgentSnapshot`` for revealed state management.
    **snapshot_fields
        Additional context fields to temporarily override during this phase.
    """
    async with self._phase_context(keep_revealed=keep_revealed,
                                   goal=goal, **snapshot_fields):
        yield

router

async
router(
    mode: RunMode, max_consecutive_fallbacks: int
) -> str

Router worker: dispatches to the correct execution mode.

RunMode.AUTO is resolved upstream in arun(), so this worker always receives a concrete mode.

Source code in bridgic/amphibious/_amphibious_automa.py
@worker(is_start=True)
async def router(self, mode: RunMode, max_consecutive_fallbacks: int) -> str:
    """
    Router worker: dispatches to the correct execution mode.

    ``RunMode.AUTO`` is resolved upstream in ``arun()``, so this worker
    always receives a concrete mode.
    """
    if mode is RunMode.AGENT:
        self._log("Router", "Ferrying to AGENT mode", color="green")
        self.ferry_to("_cognition")
    elif mode is RunMode.WORKFLOW:
        self._log("Router", "Ferrying to WORKFLOW mode", color="green")
        self.ferry_to("_workflow")
    elif mode is RunMode.AMPHIFLOW:
        self._log("Router", f"Ferrying to AMPHIFLOW mode, max_consecutive_fallbacks={max_consecutive_fallbacks}", color="green")
        self.ferry_to("_amphiflow", max_consecutive_fallbacks=max_consecutive_fallbacks)
    else:
        raise RuntimeError(f"Unsupported run mode: {mode!r}")

arun

async
arun(
    *,
    context: Optional[CognitiveContextT] = None,
    trace_running: bool = False,
    mode: Optional[RunMode] = AUTO,
    max_consecutive_fallbacks: int = 1,
    **kwargs
) -> str

Run the agent.

Routes to one of the execution modes: 1. Agent mode — LLM-driven on_agent() path. 2. Workflow mode — deterministic on_workflow() path (no fallback). 3. Amphiflow mode — on_workflow() with automatic agent fallback. 4. Auto mode (default) — resolved from which template methods the subclass overrides: both → AMPHIFLOW, only on_workflow → WORKFLOW, only on_agent → AGENT.

Context initialization has two paths: 1. Pre-created: arun(context=my_ctx) 2. Auto-created: arun(goal="...", tools=[...], skills=[...])

Parameters:

Name Type Description Default
context Optional[CognitiveContextT]

Pre-created context object. If provided, uses this context directly.

None
trace_running bool

If True, enables trace capture via AgentTrace during execution.

False
mode Optional[RunMode]

Execution mode. RunMode.AGENT forces agent mode, RunMode.WORKFLOW forces workflow mode (no fallback), RunMode.AMPHIFLOW forces workflow with agent fallback, RunMode.AUTO (default) auto-detects from which template methods are overridden.

AUTO
max_consecutive_fallbacks int

Maximum consecutive workflow step failures before switching to full agent mode. Only applies to amphiflow mode. Default is 1.

1
**kwargs

Arguments passed to CognitiveContext constructor when context is not provided (e.g., goal, tools, skills).

{}

Returns:

Type Description
str

Summary of the context after execution.

Source code in bridgic/amphibious/_amphibious_automa.py
async def arun(
    self,
    *,
    context: Optional[CognitiveContextT] = None,
    trace_running: bool = False,
    mode: Optional[RunMode] = RunMode.AUTO,
    max_consecutive_fallbacks: int = 1,
    **kwargs
) -> str:
    """
    Run the agent.

    Routes to one of the execution modes:
    1. Agent mode — LLM-driven ``on_agent()`` path.
    2. Workflow mode — deterministic ``on_workflow()`` path (no fallback).
    3. Amphiflow mode — ``on_workflow()`` with automatic agent fallback.
    4. Auto mode (default) — resolved from which template methods the
       subclass overrides: both → AMPHIFLOW, only ``on_workflow`` → WORKFLOW,
       only ``on_agent`` → AGENT.

    Context initialization has two paths:
    1. Pre-created: ``arun(context=my_ctx)``
    2. Auto-created: ``arun(goal="...", tools=[...], skills=[...])``

    Parameters
    ----------
    context : Optional[CognitiveContextT]
        Pre-created context object. If provided, uses this context directly.
    trace_running : bool
        If True, enables trace capture via AgentTrace during execution.
    mode : Optional[RunMode]
        Execution mode. ``RunMode.AGENT`` forces agent mode,
        ``RunMode.WORKFLOW`` forces workflow mode (no fallback),
        ``RunMode.AMPHIFLOW`` forces workflow with agent fallback,
        ``RunMode.AUTO`` (default) auto-detects from which template methods
        are overridden.
    max_consecutive_fallbacks : int
        Maximum consecutive workflow step failures before switching
        to full agent mode. Only applies to amphiflow mode. Default is 1.
    **kwargs
        Arguments passed to CognitiveContext constructor when ``context``
        is not provided (e.g., ``goal``, ``tools``, ``skills``).

    Returns
    -------
    str
        Summary of the context after execution.
    """
    def _build_trace(automa: "AmphibiousAutoma") -> Dict[str, Any]:
        """Build a trace dict from the workflow builder of the last run."""
        import time as _time
        metadata = {
            "automa_class": f"{automa.__class__.__module__}.{automa.__class__.__qualname__}",
            "context_class": (
                f"{automa._context_class.__module__}.{automa._context_class.__qualname__}"
                if automa._context_class else None
            ),
            "timestamp": _time.time(),
            "spent_tokens": automa.spent_tokens,
            "spent_time": automa.spent_time,
        }
        return automa._agent_trace.build(metadata=metadata)

    async def _run_and_report(context: CognitiveContextT) -> str:
        """Run the agent, measure time, and log summary."""
        start_time = time.time()
        result = await GraphAutoma.arun(
            self, resolved_mode,
            max_consecutive_fallbacks=max_consecutive_fallbacks,
        )
        self.spent_time = time.time() - start_time

        if self._verbose:
            agent_name = self.name or self.__class__.__name__
            separator = "=" * 50
            printer.print(separator, color="cyan")
            printer.print(
                f"  {agent_name} | Completed\n"
                f"Tokens: {self.spent_tokens} | "
                f"Time: {self.spent_time:.2f}s",
                color="cyan"
            )
            printer.print(separator, color="cyan")

        return result

    ########################
    # Pre-initialize status
    ########################
    self.spent_tokens = 0
    self.spent_time = 0.0
    self._final_answer = None

    resolved_mode = self._resolve_mode(mode if mode is not None else RunMode.AUTO)
    if self._llm is None and resolved_mode in (RunMode.AGENT, RunMode.AMPHIFLOW):
        raise RuntimeError(
            f"AmphibiousAutoma must be initialized with an LLM for "
            f"{resolved_mode.value} mode."
        )

    if trace_running:
        self._agent_trace = AgentTrace()
    else:
        self._agent_trace = None

    ########################
    # Initialize context
    ########################
    if context is not None:
        if not isinstance(context, self._context_class):
            raise ValueError(
                f"Context must be an instance of {self._context_class.__name__}, "
                f"got {type(context).__name__}"
            )

    # Separate Exposure fields (tools, skills, etc.) from regular constructor args
    exposure_fields = self._context_class._exposure_fields
    if exposure_fields is None:
        exposure_fields = self._context_class._detect_exposure_fields()
        self._context_class._exposure_fields = exposure_fields

    # Create the context
    constructor_kwargs = {}
    exposure_items = {}  # {field_name: list_of_items}
    for key, value in kwargs.items():  # Add fields to the context that can directly be added to the context
        if key in exposure_fields and isinstance(value, (list, tuple)):
            exposure_items[key] = value
        elif key in exposure_fields and isinstance(value, Exposure):
            constructor_kwargs[key] = value
        else:
            constructor_kwargs[key] = value

    if context is None:
        context = self._context_class(**constructor_kwargs)  # Create the context
    for field_name, items in exposure_items.items():  # Add items to Exposure fields
        attr = getattr(context, field_name)
        for item in items:
            attr.add(item)

    # Inject built-in tools (e.g. request_human) so on_agent execution can
    # trigger framework-level capabilities autonomously.
    existing_tool_names = {t.tool_name for t in context.tools.get_all()}
    for builtin in _BUILTIN_TOOLS:
        if builtin.tool_name not in existing_tool_names:
            context.tools.add(builtin)

    # Set the LLM to the context
    if self._llm is not None:
        context.set_llm(self._llm)
    self._current_context = context


    ########################
    # Run the amphibious automa
    ########################
    token = current_agent.set(self)
    try:
        result = await _run_and_report(context=context)
        if trace_running and self._agent_trace:
            _build_trace(self)
        return result
    finally:
        current_agent.reset(token)

AgentTrace

Flat trace recorder that captures each observe-think-act cycle.

record_step() appends step data to the execution path. build() returns the collected steps as a structured dict. save() / load() provide JSON serialization.

Source code in bridgic/amphibious/_amphibious_automa.py
class AgentTrace:
    """Flat trace recorder that captures each observe-think-act cycle.

    ``record_step()`` appends step data to the execution path.
    ``build()`` returns the collected steps as a structured dict.
    ``save()`` / ``load()`` provide JSON serialization.
    """

    def __init__(self):
        self._steps: List[dict] = []

    def record_step(self, step_data: dict) -> None:
        """Append a trace step to the execution path."""
        self._steps.append(step_data)

    def build(self, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Return collected trace data as a structured dict."""
        steps = [
            TraceStep(
                name=s["name"],
                step_content=s.get("step_content", ""),
                tool_calls=[
                    RecordedToolCall(**tc) for tc in s.get("tool_calls", [])
                ],
                observation=s.get("observation"),
                observation_hash=s.get("observation_hash"),
                output_type=StepOutputType(s.get("output_type", StepOutputType.TOOL_CALLS)),
                structured_output=s.get("structured_output"),
                structured_output_class=s.get("structured_output_class"),
            )
            for s in self._steps
        ]

        return {
            "steps": steps,
            "metadata": metadata or {},
        }

    def save(self, path: str, metadata: Optional[Dict[str, Any]] = None) -> None:
        """Serialize the trace to a JSON file."""
        trace_data = self.build(metadata=metadata)
        serializable = self._to_serializable(trace_data)
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(serializable, f, indent=2, ensure_ascii=False, default=str)

    @staticmethod
    def load(path: str) -> Dict[str, Any]:
        """Deserialize a trace from a JSON file."""
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)

    def _to_serializable(self, data: Any) -> Any:
        """Recursively convert Pydantic models and enums to plain dicts/values."""
        from enum import Enum
        if isinstance(data, BaseModel):
            return self._to_serializable(data.model_dump())
        if isinstance(data, dict):
            return {k: self._to_serializable(v) for k, v in data.items()}
        if isinstance(data, list):
            return [self._to_serializable(item) for item in data]
        if isinstance(data, Enum):
            return data.value
        return data

record_step

record_step(step_data: dict) -> None

Append a trace step to the execution path.

Source code in bridgic/amphibious/_amphibious_automa.py
def record_step(self, step_data: dict) -> None:
    """Append a trace step to the execution path."""
    self._steps.append(step_data)

build

build(
    metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]

Return collected trace data as a structured dict.

Source code in bridgic/amphibious/_amphibious_automa.py
def build(self, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """Return collected trace data as a structured dict."""
    steps = [
        TraceStep(
            name=s["name"],
            step_content=s.get("step_content", ""),
            tool_calls=[
                RecordedToolCall(**tc) for tc in s.get("tool_calls", [])
            ],
            observation=s.get("observation"),
            observation_hash=s.get("observation_hash"),
            output_type=StepOutputType(s.get("output_type", StepOutputType.TOOL_CALLS)),
            structured_output=s.get("structured_output"),
            structured_output_class=s.get("structured_output_class"),
        )
        for s in self._steps
    ]

    return {
        "steps": steps,
        "metadata": metadata or {},
    }

save

save(
    path: str, metadata: Optional[Dict[str, Any]] = None
) -> None

Serialize the trace to a JSON file.

Source code in bridgic/amphibious/_amphibious_automa.py
def save(self, path: str, metadata: Optional[Dict[str, Any]] = None) -> None:
    """Serialize the trace to a JSON file."""
    trace_data = self.build(metadata=metadata)
    serializable = self._to_serializable(trace_data)
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(serializable, f, indent=2, ensure_ascii=False, default=str)

load

staticmethod
load(path: str) -> Dict[str, Any]

Deserialize a trace from a JSON file.

Source code in bridgic/amphibious/_amphibious_automa.py
@staticmethod
def load(path: str) -> Dict[str, Any]:
    """Deserialize a trace from a JSON file."""
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

ThinkUnitDescriptor

Python descriptor enabling class-level think unit declaration.

When accessed on an instance (self.main_think), returns a _BoundThinkUnit that is awaitable and supports .until().

When accessed on the class, returns the descriptor itself.

Source code in bridgic/amphibious/_amphibious_automa.py
class ThinkUnitDescriptor:
    """Python descriptor enabling class-level think unit declaration.

    When accessed on an instance (``self.main_think``), returns a
    ``_BoundThinkUnit`` that is awaitable and supports ``.until()``.

    When accessed on the class, returns the descriptor itself.
    """

    def __init__(
        self,
        worker: CognitiveWorker,
        *,
        until: Optional[Union[Callable[..., bool], Callable[..., Awaitable[bool]]]] = None,
        max_attempts: int = 1,
        tools: Optional[List[str]] = None,
        skills: Optional[List[str]] = None,
        on_error: ErrorStrategy = ErrorStrategy.RAISE,
        max_retries: int = 0,
    ):
        self._worker_template = worker
        self._until = until
        self._max_attempts = max_attempts
        self._tools = tools
        self._skills = skills
        self._on_error = on_error
        self._max_retries = max_retries

    def __get__(self, obj: Any, objtype: Optional[type] = None) -> Any:
        if obj is None:
            return self  # Class-level access returns the descriptor
        return _BoundThinkUnit(obj, self)

RunMode

Bases: str, Enum

The mode of the run.

Used by: _amphibious_automa.py (arun, router)

Source code in bridgic/amphibious/_type.py
class RunMode(str, Enum):
    """The mode of the run.

    Used by: _amphibious_automa.py (arun, router)
    """
    AGENT = "agent"
    WORKFLOW = "workflow"
    AMPHIFLOW = "amphiflow"
    AUTO = "auto"

DetailRequest

Bases: BaseModel

Request for detailed information about a specific item in a LayeredExposure field.

Used by: _cognitive_worker.py (acquiring policy)

Source code in bridgic/amphibious/_type.py
class DetailRequest(BaseModel):
    """Request for detailed information about a specific item in a LayeredExposure field.

    Used by: _cognitive_worker.py (acquiring policy)
    """
    model_config = ConfigDict(
        extra="forbid",
        json_schema_extra={
            "required": ["field", "index"],
            "additionalProperties": False,
        }
    )
    field: str = Field(description="Name of the field to get details from (e.g., 'cognitive_history', 'skills')")
    index: int = Field(description="0-based index of the item to get details for")

ToolArgument

Bases: BaseModel

A single tool argument as name-value pair.

Used by: _cognitive_worker.py (StepToolCall), _amphibious_automa.py (action phase)

Source code in bridgic/amphibious/_type.py
class ToolArgument(BaseModel):
    """A single tool argument as name-value pair.

    Used by: _cognitive_worker.py (StepToolCall), _amphibious_automa.py (action phase)
    """
    model_config = ConfigDict(
        extra="forbid",
        json_schema_extra={
            "required": ["name", "value"],
            "additionalProperties": False,
        }
    )
    name: str = Field(description="Parameter name")
    value: str = Field(description="Parameter value as string")

    @field_validator('value', mode='before')
    @classmethod
    def coerce_to_str(cls, v: Any) -> str:
        return str(v) if not isinstance(v, str) else v

StepToolCall

Bases: BaseModel

A single tool call specification.

Used by: _cognitive_worker.py (ThinkModel output), _amphibious_automa.py (action phase)

Source code in bridgic/amphibious/_type.py
class StepToolCall(BaseModel):
    """A single tool call specification.

    Used by: _cognitive_worker.py (ThinkModel output), _amphibious_automa.py (action phase)
    """
    model_config = ConfigDict(
        extra="forbid",
        json_schema_extra={
            "required": ["tool", "tool_arguments"],
            "additionalProperties": False,
        }
    )
    tool: str = Field(description="Name of the tool to call")
    tool_arguments: List[ToolArgument] = Field(
        description="Arguments as list of name-value pairs, e.g., [{name: 'city', value: 'Beijing'}]"
    )

WorkflowDecision

Bases: BaseModel

Single-step deterministic decision for workflow mode.

Used by: _amphibious_automa.py (_run_workflow, ActionCall)

Source code in bridgic/amphibious/_type.py
class WorkflowDecision(BaseModel):
    """
    Single-step deterministic decision for workflow mode.

    Used by: _amphibious_automa.py (_run_workflow, ActionCall)
    """
    model_config = ConfigDict(extra="forbid")

    step_content: str = ""
    output: List[StepToolCall] = Field(default_factory=list)

ActionCall dataclass

Yielded by on_workflow() for deterministic single-tool execution.

Replaces the removed step() shorthand function. Each instance wraps exactly one tool call together with an optional CognitiveWorker override.

Used by: _amphibious_automa.py (_run_workflow)

Usage:: yield ActionCall("navigate_to", url="http://example.com") yield ActionCall("click_element_by_ref", description="Click submit", ref="e42") result = yield ActionCall("fill_field", name="user", value="john", worker=my_worker)

Source code in bridgic/amphibious/_type.py
@dataclass(init=False)
class ActionCall:
    """Yielded by on_workflow() for deterministic single-tool execution.

    Replaces the removed ``step()`` shorthand function. Each instance wraps
    exactly one tool call together with an optional CognitiveWorker override.

    Used by: _amphibious_automa.py (_run_workflow)

    Usage::
        yield ActionCall("navigate_to", url="http://example.com")
        yield ActionCall("click_element_by_ref", description="Click submit", ref="e42")
        result = yield ActionCall("fill_field", name="user", value="john", worker=my_worker)
    """
    tool_name: str
    description: str
    worker: Optional[Any]
    tool_args: Dict[str, Any]
    decision: WorkflowDecision = field(repr=False)

    def __init__(
        self,
        tool_name: str,
        *,
        description: str = "",
        worker: Optional[Any] = None,
        **tool_args: Any,
    ) -> None:
        self.tool_name = tool_name
        self.description = description
        self.worker = worker
        self.tool_args = tool_args
        self.decision = WorkflowDecision(
            step_content=description,
            output=[StepToolCall(
                tool=tool_name,
                tool_arguments=[
                    ToolArgument(name=k, value=str(v)) for k, v in tool_args.items()
                ],
            )],
        )

HumanCall dataclass

Yielded by on_workflow() to pause execution and request human input.

Execution is suspended until the human provides a response, which is returned to the generator via asend() as a plain string.

Used by: _amphibious_automa.py (_run_workflow)

Usage:: feedback = yield HumanCall(prompt="Please verify the result (yes/no):")

Source code in bridgic/amphibious/_type.py
@dataclass
class HumanCall:
    """Yielded by on_workflow() to pause execution and request human input.

    Execution is suspended until the human provides a response, which is
    returned to the generator via asend() as a plain string.

    Used by: _amphibious_automa.py (_run_workflow)

    Usage::
        feedback = yield HumanCall(prompt="Please verify the result (yes/no):")
    """
    prompt: str = ""
    timeout: Optional[float] = None

AgentCall dataclass

Yielded by on_workflow() to fall back to agent mode for a sub-task.

By default a clean context is used (fresh history, full tool set). Provide explicit values to override specific context fields.

Used by: _amphibious_automa.py (_run_workflow)

Usage::

1
yield AgentCall(goal="Handle the login popup", max_attempts=5)
Source code in bridgic/amphibious/_type.py
@dataclass
class AgentCall:
    """Yielded by on_workflow() to fall back to agent mode for a sub-task.

    By default a clean context is used (fresh history, full tool set).
    Provide explicit values to override specific context fields.

    Used by: _amphibious_automa.py (_run_workflow)

    Usage::

        yield AgentCall(goal="Handle the login popup", max_attempts=5)
    """
    goal: str = ""
    tools: Optional[List[str]] = None   # Tool-name filter; None → use context's full tool set
    skills: Optional[List[str]] = None  # Skill-name filter; None → use context's full skill set
    history: Optional[Any] = None       # Optional[CognitiveHistory]; None → fresh CognitiveHistory()
    max_attempts: int = 1
    worker: Optional[Any] = None        # Optional[CognitiveWorker]; None → use framework default

ErrorStrategy

Bases: Enum

Error handling strategy for worker execution via _run().

Used by: _amphibious_automa.py (_run method), _amphibious_automa.py (ThinkUnitDescriptor)

Source code in bridgic/amphibious/_type.py
class ErrorStrategy(Enum):
    """Error handling strategy for worker execution via ``_run()``.

    Used by: _amphibious_automa.py (_run method), _amphibious_automa.py (ThinkUnitDescriptor)
    """
    RAISE = "raise"    # Re-raise exceptions (default)
    IGNORE = "ignore"  # Silently ignore exceptions
    RETRY = "retry"    # Retry up to max_retries times

ActionResult

Bases: BaseModel

Overall result of the action phase (one or more tool executions).

Used by: _amphibious_automa.py (action_tool_call, _record_trace_step)

Source code in bridgic/amphibious/_type.py
class ActionResult(BaseModel):
    """Overall result of the action phase (one or more tool executions).

    Used by: _amphibious_automa.py (action_tool_call, _record_trace_step)
    """
    model_config = ConfigDict(
        extra="forbid",
        json_schema_extra={
            "required": ["results"],
            "additionalProperties": False,
        }
    )
    results: List[ActionStepResult]

ActionStepResult

Bases: BaseModel

Result of executing one tool in the action phase.

Used by: _amphibious_automa.py (action_tool_call)

Source code in bridgic/amphibious/_type.py
class ActionStepResult(BaseModel):
    """Result of executing one tool in the action phase.

    Used by: _amphibious_automa.py (action_tool_call)
    """
    model_config = ConfigDict(
        extra="forbid",
        json_schema_extra={
            "required": ["tool_id", "tool_name", "tool_arguments", "tool_result", "success"],
            "additionalProperties": False,
        }
    )
    tool_id: str
    tool_name: str
    tool_arguments: Dict[str, Any]
    tool_result: Any
    success: bool = True
    error: Optional[str] = None

ToolResult dataclass

Single tool execution result returned to workflow generator via asend().

Used by: _amphibious_automa.py (_run_workflow), on_workflow user code

Source code in bridgic/amphibious/_type.py
@dataclass
class ToolResult:
    """Single tool execution result returned to workflow generator via asend().

    Used by: _amphibious_automa.py (_run_workflow), on_workflow user code
    """
    tool_name: str
    tool_arguments: Dict[str, Any]
    result: Any
    success: bool = True
    error: Optional[str] = None

TraceStep

Bases: BaseModel

Record of one observe-think-act cycle.

Used by: _amphibious_automa.py (AgentTrace.build)

Source code in bridgic/amphibious/_type.py
class TraceStep(BaseModel):
    """Record of one observe-think-act cycle.

    Used by: _amphibious_automa.py (AgentTrace.build)
    """
    model_config = ConfigDict(extra="forbid")

    name: str
    step_content: str
    tool_calls: List[RecordedToolCall] = Field(default_factory=list)
    observation: Optional[str] = None
    observation_hash: Optional[str] = None
    output_type: StepOutputType = StepOutputType.TOOL_CALLS
    structured_output: Optional[Dict[str, Any]] = None
    structured_output_class: Optional[str] = None

RecordedToolCall

Bases: BaseModel

A complete record of one tool invocation.

Used by: _amphibious_automa.py (AgentTrace.build)

Source code in bridgic/amphibious/_type.py
class RecordedToolCall(BaseModel):
    """A complete record of one tool invocation.

    Used by: _amphibious_automa.py (AgentTrace.build)
    """
    model_config = ConfigDict(extra="forbid")

    tool_name: str
    tool_arguments: Dict[str, Any]
    tool_result: Any
    success: bool = True
    error: Optional[str] = None

StepOutputType

Bases: str, Enum

Discriminator for the kind of output a trace step produced.

Used by: _amphibious_automa.py (AgentTrace, _record_trace_step)

Source code in bridgic/amphibious/_type.py
class StepOutputType(str, Enum):
    """Discriminator for the kind of output a trace step produced.

    Used by: _amphibious_automa.py (AgentTrace, _record_trace_step)
    """
    TOOL_CALLS = "tool_calls"
    STRUCTURED = "structured"
    CONTENT_ONLY = "content_only"

think_unit

think_unit(
    worker: CognitiveWorker,
    *,
    until: Optional[
        Union[
            Callable[..., bool],
            Callable[..., Awaitable[bool]],
        ]
    ] = None,
    max_attempts: int = 1,
    tools: Optional[List[str]] = None,
    skills: Optional[List[str]] = None,
    on_error: ErrorStrategy = RAISE,
    max_retries: int = 0
) -> ThinkUnitDescriptor

Declare a think unit for use in on_agent().

Factory function that returns a ThinkUnitDescriptor. Use as a class variable::

1
2
3
4
5
6
7
8
9
class MyAgent(AmphibiousAutoma[MyContext]):
    main_think = think_unit(
        CognitiveWorker.inline("Plan ONE immediate next step"),
        max_attempts=80,
        on_error=ErrorStrategy.RAISE,
    )

    async def on_agent(self, ctx):
        await self.main_think

Parameters:

Name Type Description Default
worker CognitiveWorker

The worker template. A fresh clone is created for each execution.

required
until Optional callable

Loop condition: repeats until this returns True or LLM signals finish.

None
max_attempts int

Maximum execution attempts (default 1 = single shot).

1
tools Optional[List[str]]

Tool filter: only these tools are visible to the worker.

None
skills Optional[List[str]]

Skill filter: only these skills are visible to the worker.

None
on_error ErrorStrategy

Error handling strategy (default: RAISE).

RAISE
max_retries int

Max retries for RETRY strategy.

0
Source code in bridgic/amphibious/_amphibious_automa.py
def think_unit(
    worker: CognitiveWorker,
    *,
    until: Optional[Union[Callable[..., bool], Callable[..., Awaitable[bool]]]] = None,
    max_attempts: int = 1,
    tools: Optional[List[str]] = None,
    skills: Optional[List[str]] = None,
    on_error: ErrorStrategy = ErrorStrategy.RAISE,
    max_retries: int = 0,
) -> ThinkUnitDescriptor:
    """Declare a think unit for use in on_agent().

    Factory function that returns a ThinkUnitDescriptor. Use as a class variable::

        class MyAgent(AmphibiousAutoma[MyContext]):
            main_think = think_unit(
                CognitiveWorker.inline("Plan ONE immediate next step"),
                max_attempts=80,
                on_error=ErrorStrategy.RAISE,
            )

            async def on_agent(self, ctx):
                await self.main_think

    Parameters
    ----------
    worker : CognitiveWorker
        The worker template. A fresh clone is created for each execution.
    until : Optional callable
        Loop condition: repeats until this returns True or LLM signals finish.
    max_attempts : int
        Maximum execution attempts (default 1 = single shot).
    tools : Optional[List[str]]
        Tool filter: only these tools are visible to the worker.
    skills : Optional[List[str]]
        Skill filter: only these skills are visible to the worker.
    on_error : ErrorStrategy
        Error handling strategy (default: RAISE).
    max_retries : int
        Max retries for RETRY strategy.
    """
    return ThinkUnitDescriptor(
        worker,
        until=until,
        max_attempts=max_attempts,
        tools=tools,
        skills=skills,
        on_error=on_error,
        max_retries=max_retries,
    )

create_project

create_project(
    name: str,
    base_dir: Optional[str] = None,
    task: Optional[str] = None,
) -> Path

Create a new amphibious automa project with standard directory structure.

Parameters:

Name Type Description Default
name str

Project directory name.

required
base_dir str

Parent directory. Defaults to current working directory.

None
task str

Initial task description for task.md.

None

Returns:

Type Description
Path

Path to the created project directory.

Source code in bridgic/amphibious/scaffold.py
def create_project(
    name: str,
    base_dir: Optional[str] = None,
    task: Optional[str] = None,
) -> Path:
    """Create a new amphibious automa project with standard directory structure.

    Parameters
    ----------
    name : str
        Project directory name.
    base_dir : str, optional
        Parent directory. Defaults to current working directory.
    task : str, optional
        Initial task description for task.md.

    Returns
    -------
    Path
        Path to the created project directory.
    """
    base = Path(base_dir) if base_dir else Path.cwd()
    project_dir = base / name

    if project_dir.exists():
        raise FileExistsError(f"Directory already exists: {project_dir}")

    # Create directory structure
    project_dir.mkdir(parents=True)
    (project_dir / "skills").mkdir()
    (project_dir / "result").mkdir()
    (project_dir / "log").mkdir()

    # Write template files
    task_text = task or "Describe your task here."
    _write(project_dir / "task.md", _TASK_MD.format(task=task_text))
    _write(project_dir / "config.py", _CONFIG_PY)
    _write(project_dir / "tools.py", _TOOLS_PY)
    _write(project_dir / "workers.py", _WORKERS_PY)
    _write(project_dir / "agents.py", _AGENTS_PY)

    return project_dir