Customizing the OTC Cycle¶
The observe-think-act (OTC) cycle is the heartbeat of every amphibious agent. At each stage, the framework provides hook methods you can override to inject custom behavior — whether it's enriching observations, reshaping LLM messages, intercepting tool calls, or post-processing outputs.
These hooks exist at two levels: Worker level (per-worker customization) and Agent level (shared across all workers). When a worker hook returns the special _DELEGATE sentinel, the framework falls through to the agent-level hook.
In this tutorial, we'll build a security audit agent that uses OTC hooks to inject system state, enforce safety rules, filter dangerous operations, and sanitize output.
Initialize¶
First, let's set up the LLM and define the security-themed tools our audit agent will use.
import os
model_name = os.environ.get("MODEL_NAME")
api_key = os.environ.get("API_KEY")
api_base = os.environ.get("BASE_URL")
from bridgic.llms.openai import OpenAILlm, OpenAIConfiguration
llm = OpenAILlm(
api_key=api_key,
api_base=api_base,
timeout=120,
configuration=OpenAIConfiguration(
model=model_name,
temperature=0.0,
max_tokens=16384,
),
)
from bridgic.core.agentic.tool_specs import FunctionToolSpec
async def list_files(directory: str) -> str:
"""List files in a directory"""
return f"Files in {directory}: config.yaml, app.log, secrets.env, data.db"
async def read_file(filepath: str) -> str:
"""Read the content of a file"""
if "secret" in filepath.lower() or ".env" in filepath:
return f"[SENSITIVE] Content of {filepath}: API_KEY=sk-xxx, DB_PASS=yyy"
return f"Content of {filepath}: (normal file content)"
async def delete_file(filepath: str) -> str:
"""Delete a file from the system"""
return f"Deleted {filepath}"
async def check_security_status() -> str:
"""Check system security status"""
return "System status: 2 warnings, 0 critical, last scan: 1 hour ago"
list_files_tool = FunctionToolSpec.from_raw(list_files)
read_file_tool = FunctionToolSpec.from_raw(read_file)
delete_file_tool = FunctionToolSpec.from_raw(delete_file)
check_security_status_tool = FunctionToolSpec.from_raw(check_security_status)
We have four tools:
list_files— lists files in a given directory. Returns a mix of normal and sensitive files.read_file— reads a file's content. If the file is sensitive (e.g.,.envorsecrets), it returns data containing API keys and passwords.delete_file— deletes a file. This is the dangerous operation we want to block.check_security_status— reports the system's current security posture.
Part 1: Observe Phase — Injecting Custom Perception¶
The observation() hook lets you inject custom context before the LLM starts thinking. It exists at two levels:
- Worker-level
observation(): Gives a specific worker its own view of the environment — for example, injecting security policies or audit metadata. - Agent-level
observation(): Provides shared context for all workers — for example, system-wide information like server name and region. - When a worker returns
_DELEGATE, the framework falls through to the agent'sobservation()instead.
Let's build a security worker that injects audit-specific context into the observe phase.
from bridgic.amphibious import (
AmphibiousAutoma, CognitiveContext, CognitiveWorker,
think_unit
)
class SecurityWorker(CognitiveWorker):
"""A worker with custom observation that injects security logs."""
async def thinking(self) -> str:
return (
"Analyze the system for security issues. "
"Check files, review the security status, and report findings. "
"NEVER delete files or access sensitive files like .env or secrets."
)
async def observation(self, context: CognitiveContext):
"""Inject security-specific context before thinking."""
return (
f"Current goal: {context.goal}\n"
f"Security policy: Read-only audit mode. File deletion is PROHIBITED.\n"
f"Audit timestamp: 2024-06-15T10:30:00Z"
)
class SecurityAuditAgent(AmphibiousAutoma[CognitiveContext]):
auditor = think_unit(SecurityWorker(), max_attempts=5)
async def observation(self, ctx: CognitiveContext):
"""Agent-level observation: provides system-wide context for all workers."""
return f"System: production-server-01, Region: us-east-1, Uptime: 45 days"
async def on_agent(self, ctx: CognitiveContext):
await self.auditor
agent = SecurityAuditAgent(llm=llm, verbose=True)
result = await agent.arun(
goal="Perform a security audit on the /var/app directory",
tools=[list_files_tool, read_file_tool, delete_file_tool, check_security_status_tool],
)
print(result)
[17:03:11.426] [Router] (_amphibious_automa.py:1573) Auto-detecting execution mode [17:03:11.427] [Router] (_amphibious_automa.py:1579) Detected AGENT mode [17:03:11.428] [Observe] (_amphibious_automa.py:861) SecurityWorker: Current goal: Perform a security audit on the /var/app directory Security policy: Read-only audit mode. File deletion is PROHIBITED. Audit timestamp: 2024-06-15T10:30:00Z [17:03:17.463] [Think] (_amphibious_automa.py:867) SecurityWorker: finish=False, step=Starting security audit on /var/app directory. I will first list the files in the directory and check the system security status to understand what we're working with. Following the read-only audit policy - no file deletions will be performed. [17:03:17.465] [Act] (_amphibious_automa.py:873) SecurityWorker: { "content": "Starting security audit on /var/app directory. I will first list the files in the directory and check the system security status to understand what we're working with. Following the read-only audit policy - no file deletions will be performed.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "list_files", "tool_arguments": { "directory": "/var/app" }, "tool_result": "Files in /var/app: config.yaml, app.log, secrets.env, data.db", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:03:17.465] [Observe] (_amphibious_automa.py:861) SecurityWorker: Current goal: Perform a security audit on the /var/app directory Security policy: Read-only audit mode. File deletion is PROHIBITED. Audit timestamp: 2024-06-15T10:30:00Z [17:03:52.125] [Think] (_amphibious_automa.py:867) SecurityWorker: finish=False, step=Proceeding to analyze non-sensitive files (config.yaml, app.log) for security issues while adhering to read-only policy. The presence of secrets.env is flagged as a risk without accessing its contents. Data.db presence noted for permission checks via security status. [17:03:52.131] [Act] (_amphibious_automa.py:873) SecurityWorker: { "content": "Proceeding to analyze non-sensitive files (config.yaml, app.log) for security issues while adhering to read-only policy. The presence of secrets.env is flagged as a risk without accessing its contents. Data.db presence noted for permission checks via security status.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/config.yaml" }, "tool_result": "Content of /var/app/config.yaml: (normal file content)", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/app.log" }, "tool_result": "Content of /var/app/app.log: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:03:52.131] [Observe] (_amphibious_automa.py:861) SecurityWorker: Current goal: Perform a security audit on the /var/app directory Security policy: Read-only audit mode. File deletion is PROHIBITED. Audit timestamp: 2024-06-15T10:30:00Z [17:04:24.728] [Think] (_amphibious_automa.py:867) SecurityWorker: finish=True, step=Security audit findings: 1) secrets.env file present in /var/app (high risk - should not store secrets in app directory). 2) System security status shows 2 warnings (likely related to file permissions or configuration). 3) config.yaml and app.log contain no obvious security issues. 4) data.db exists - recommend verifying database permissions separately. No file deletions performed per read-only policy. [17:04:24.729] [Act] (_amphibious_automa.py:873) SecurityWorker: { "content": "Security audit findings: 1) secrets.env file present in /var/app (high risk - should not store secrets in app directory). 2) System security status shows 2 warnings (likely related to file permissions or configuration). 3) config.yaml and app.log contain no obvious security issues. 4) data.db exists - recommend verifying database permissions separately. No file deletions performed per read-only policy.", "result": null, "metadata": { "tool_calls": [] }, "status": null } ================================================== SecurityAuditAgent-85348d15 | Completed Tokens: 1823 | Time: 73.30s ================================================== Security audit findings: 1) secrets.env file present in /var/app (high risk - should not store secrets in app directory). 2) System security status shows 2 warnings (likely related to file permissions or configuration). 3) config.yaml and app.log contain no obvious security issues. 4) data.db exists - recommend verifying database permissions separately. No file deletions performed per read-only policy.
Notice how the SecurityWorker injects its own observation — the security policy and audit timestamp — into the context before the LLM reasons about the task. Because the worker defines its own observation(), it does not fall through to the agent-level observation.
If you wanted a worker to use the agent-level observation, just do nothing.
This two-tier pattern lets you share common context across workers while allowing individual workers to override it when needed.
Part 2: Think Phase — Reshaping LLM Messages¶
The build_messages() hook controls how the prompt is assembled before being sent to the LLM. By overriding it, you can inject mandatory rules, restructure the message format, or add custom system instructions.
Let's create a worker that injects strict security rules into every LLM call.
from bridgic.amphibious._cognitive_worker import Message
class StrictSecurityWorker(CognitiveWorker):
async def thinking(self) -> str:
return "Perform a security audit of the specified directory. Respond in JSON format."
async def build_messages(
self,
think_prompt: str,
tools_description: str,
output_instructions: str,
context_info: str,
):
"""Inject mandatory security rules into the system message."""
security_rules = (
"\n\n=== MANDATORY SECURITY RULES ===\n"
"1. NEVER call delete_file under any circumstances.\n"
"2. NEVER read files ending in .env or containing 'secret'.\n"
"3. Always check security status before accessing any file.\n"
"4. Report all findings without exposing sensitive data.\n"
)
system_content = f"{think_prompt}{security_rules}\n\n{tools_description}\n\n{output_instructions}"
return [
Message.from_text(text=system_content, role="system"),
Message.from_text(text=context_info, role="user"),
]
The build_messages() method receives four components:
think_prompt— the thinking instruction fromthinking().tools_description— auto-generated descriptions of available tools.output_instructions— formatting rules for the LLM's response.context_info— the assembled context (goal, observation, history, etc.).
By overriding this method, you have full control over the structure and content of the messages sent to the LLM. Here, we insert mandatory security rules that the LLM must follow — regardless of what the user's goal says.
Let's use this worker in an agent.
class StrictAuditAgent(AmphibiousAutoma[CognitiveContext]):
auditor = think_unit(StrictSecurityWorker(verbose_prompt=True), max_attempts=5)
async def on_agent(self, ctx: CognitiveContext):
await self.auditor
agent = StrictAuditAgent(llm=llm, verbose=True)
result = await agent.arun(
goal="Audit the /var/app directory for security issues",
tools=[list_files_tool, read_file_tool, delete_file_tool, check_security_status_tool],
)
print(result)
[17:09:12.012] [Router] (_amphibious_automa.py:1573) Auto-detecting execution mode [17:09:12.012] [Router] (_amphibious_automa.py:1579) Detected AGENT mode [17:09:12.013] [Observe] (_amphibious_automa.py:861) StrictSecurityWorker: None [17:09:18.136] [Think] (_cognitive_worker.py:332) Message 1 (system, 390 tokens): Perform a security audit of the specified directory. Respond in JSON format. === MANDATORY SECURITY RULES === 1. NEVER call delete_file under any circumstances. 2. NEVER read files ending in .env or containing 'secret'. 3. Always check security status before accessing any file. 4. Report all findings without exposing sensitive data. # Available Tools (with parameters): • list_files: List files in a directory - directory (string) [required] • read_file: Read the content of a file - filepath (string) [required] • delete_file: Delete a file from the system - filepath (string) [required] • check_security_status: Check system security status # Context Acquiring If the context contains progressively disclosed information (e.g. skills, history steps) and you want to inspect the details, use the **details** field to request them. The framework will expand these items in the next round. Batch all requests in a single output. When using this field, leave step_content and output empty. ## Field format: - **details**: [{field: "skills", index: 0}, ...] Available fields: **skills** (view a skill's full workflow), **cognitive_history** (view the full result of a previous step) # Output Fields - **step_content**: Your analysis and reasoning for this step - **finish**: Set True when the sub-task is fully complete (default: False) - **details**: Available fields: **skills**, **cognitive_history**. example: [{field: 'skills', index: 0}, ...] - **output**: Tool calls to execute: [{tool, tool_arguments: [{name: 'param', value: 'value'}]}] [17:09:18.136] [Think] (_cognitive_worker.py:332) Message 2 (user, 34 tokens): Based on the context below, decide your next action. Goal: Audit the /var/app directory for security issues Execution History: (none) [17:09:18.136] [Think] (_cognitive_worker.py:332) Total: 424 tokens (cumulative: 424) [17:09:18.139] [Think] (_amphibious_automa.py:867) StrictSecurityWorker: finish=False, step=Starting security audit of /var/app directory. First, I need to check the system security status before accessing any files, as per mandatory security rule #3. Then I'll list the files in the directory to understand what we're working with. [17:09:18.142] [Act] (_amphibious_automa.py:873) StrictSecurityWorker: { "content": "Starting security audit of /var/app directory. First, I need to check the system security status before accessing any files, as per mandatory security rule #3. Then I'll list the files in the directory to understand what we're working with.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "list_files", "tool_arguments": { "directory": "/var/app" }, "tool_result": "Files in /var/app: config.yaml, app.log, secrets.env, data.db", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:09:18.142] [Observe] (_amphibious_automa.py:861) StrictSecurityWorker: None [17:09:56.469] [Think] (_cognitive_worker.py:332) Message 1 (system, 390 tokens): Perform a security audit of the specified directory. Respond in JSON format. === MANDATORY SECURITY RULES === 1. NEVER call delete_file under any circumstances. 2. NEVER read files ending in .env or containing 'secret'. 3. Always check security status before accessing any file. 4. Report all findings without exposing sensitive data. # Available Tools (with parameters): • list_files: List files in a directory - directory (string) [required] • read_file: Read the content of a file - filepath (string) [required] • delete_file: Delete a file from the system - filepath (string) [required] • check_security_status: Check system security status # Context Acquiring If the context contains progressively disclosed information (e.g. skills, history steps) and you want to inspect the details, use the **details** field to request them. The framework will expand these items in the next round. Batch all requests in a single output. When using this field, leave step_content and output empty. ## Field format: - **details**: [{field: "skills", index: 0}, ...] Available fields: **skills** (view a skill's full workflow), **cognitive_history** (view the full result of a previous step) # Output Fields - **step_content**: Your analysis and reasoning for this step - **finish**: Set True when the sub-task is fully complete (default: False) - **details**: Available fields: **skills**, **cognitive_history**. example: [{field: 'skills', index: 0}, ...] - **output**: Tool calls to execute: [{tool, tool_arguments: [{name: 'param', value: 'value'}]}] [17:09:56.469] [Think] (_cognitive_worker.py:332) Message 2 (user, 205 tokens): Based on the context below, decide your next action. Goal: Audit the /var/app directory for security issues Execution History: [Working Memory (0-0)] [0] Starting security audit of /var/app directory. First, I need to check the system security status before accessing any files, as per mandatory security rule #3. Then I'll list the files in the directory to understand what we're working with. Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='list_files', tool_arguments={'directory': '/var/app'}, tool_result='Files in /var/app: config.yaml, app.log, secrets.env, data.db', success=True, error=None)] [17:09:56.469] [Think] (_cognitive_worker.py:332) Total: 595 tokens (cumulative: 1019) [17:09:56.472] [Think] (_amphibious_automa.py:867) StrictSecurityWorker: finish=False, step=Checking system security status again before accessing files (per rule #3). Will read allowed files (config.yaml, app.log, data.db) while excluding secrets.env (prohibited by rule #2). [17:09:56.477] [Act] (_amphibious_automa.py:873) StrictSecurityWorker: { "content": "Checking system security status again before accessing files (per rule #3). Will read allowed files (config.yaml, app.log, data.db) while excluding secrets.env (prohibited by rule #2).", "result": { "results": [ { "tool_id": "call_0", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/config.yaml" }, "tool_result": "Content of /var/app/config.yaml: (normal file content)", "success": true, "error": null }, { "tool_id": "call_2", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/app.log" }, "tool_result": "Content of /var/app/app.log: (normal file content)", "success": true, "error": null }, { "tool_id": "call_3", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/data.db" }, "tool_result": "Content of /var/app/data.db: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:09:56.477] [Observe] (_amphibious_automa.py:861) StrictSecurityWorker: None [17:10:10.601] [Think] (_cognitive_worker.py:332) Message 1 (system, 390 tokens): Perform a security audit of the specified directory. Respond in JSON format. === MANDATORY SECURITY RULES === 1. NEVER call delete_file under any circumstances. 2. NEVER read files ending in .env or containing 'secret'. 3. Always check security status before accessing any file. 4. Report all findings without exposing sensitive data. # Available Tools (with parameters): • list_files: List files in a directory - directory (string) [required] • read_file: Read the content of a file - filepath (string) [required] • delete_file: Delete a file from the system - filepath (string) [required] • check_security_status: Check system security status # Context Acquiring If the context contains progressively disclosed information (e.g. skills, history steps) and you want to inspect the details, use the **details** field to request them. The framework will expand these items in the next round. Batch all requests in a single output. When using this field, leave step_content and output empty. ## Field format: - **details**: [{field: "skills", index: 0}, ...] Available fields: **skills** (view a skill's full workflow), **cognitive_history** (view the full result of a previous step) # Output Fields - **step_content**: Your analysis and reasoning for this step - **finish**: Set True when the sub-task is fully complete (default: False) - **details**: Available fields: **skills**, **cognitive_history**. example: [{field: 'skills', index: 0}, ...] - **output**: Tool calls to execute: [{tool, tool_arguments: [{name: 'param', value: 'value'}]}] [17:10:10.601] [Think] (_cognitive_worker.py:332) Message 2 (user, 382 tokens): Based on the context below, decide your next action. Goal: Audit the /var/app directory for security issues Execution History: [Working Memory (0-1)] [0] Starting security audit of /var/app directory. First, I need to check the system security status before accessing any files, as per mandatory security rule #3. Then I'll list the files in the directory to understand what we're working with. Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='list_files', tool_arguments={'directory': '/var/app'}, tool_result='Files in /var/app: config.yaml, app.log, secrets.env, data.db', success=True, error=None)] [1] Checking system security status again before accessing files (per rule #3). Will read allowed files (config.yaml, app.log, data.db) while excluding secrets.env (prohibited by rule #2). Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='read_file', tool_arguments={'filepath': '/var/app/config.yaml'}, tool_result='Content of /var/app/config.yaml: (normal file content)', success=True, error=None), ActionStepResult(tool_id='call_2', tool_name='read_file', tool_arguments={'filepath': '/var... [17:10:10.601] [Think] (_cognitive_worker.py:332) Total: 772 tokens (cumulative: 1791) [17:10:10.604] [Think] (_amphibious_automa.py:867) StrictSecurityWorker: finish=False, step=Continuing the security audit of /var/app. Based on the file listing, I found: config.yaml, app.log, secrets.env, and data.db. Per rule #2, I must NOT read secrets.env (contains 'secret' in filename). I've already read config.yaml in the previous step. Now I need to check security status again (rule #3) and read the remaining allowed files (app.log and data.db) to complete the audit. I will NOT use delete_file under any circumstances (rule #1). [17:10:10.608] [Act] (_amphibious_automa.py:873) StrictSecurityWorker: { "content": "Continuing the security audit of /var/app. Based on the file listing, I found: config.yaml, app.log, secrets.env, and data.db. Per rule #2, I must NOT read secrets.env (contains 'secret' in filename). I've already read config.yaml in the previous step. Now I need to check security status again (rule #3) and read the remaining allowed files (app.log and data.db) to complete the audit. I will NOT use delete_file under any circumstances (rule #1).", "result": { "results": [ { "tool_id": "call_0", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/app.log" }, "tool_result": "Content of /var/app/app.log: (normal file content)", "success": true, "error": null }, { "tool_id": "call_2", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/data.db" }, "tool_result": "Content of /var/app/data.db: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:10:10.608] [Observe] (_amphibious_automa.py:861) StrictSecurityWorker: None [17:10:34.984] [Think] (_cognitive_worker.py:332) Message 1 (system, 390 tokens): Perform a security audit of the specified directory. Respond in JSON format. === MANDATORY SECURITY RULES === 1. NEVER call delete_file under any circumstances. 2. NEVER read files ending in .env or containing 'secret'. 3. Always check security status before accessing any file. 4. Report all findings without exposing sensitive data. # Available Tools (with parameters): • list_files: List files in a directory - directory (string) [required] • read_file: Read the content of a file - filepath (string) [required] • delete_file: Delete a file from the system - filepath (string) [required] • check_security_status: Check system security status # Context Acquiring If the context contains progressively disclosed information (e.g. skills, history steps) and you want to inspect the details, use the **details** field to request them. The framework will expand these items in the next round. Batch all requests in a single output. When using this field, leave step_content and output empty. ## Field format: - **details**: [{field: "skills", index: 0}, ...] Available fields: **skills** (view a skill's full workflow), **cognitive_history** (view the full result of a previous step) # Output Fields - **step_content**: Your analysis and reasoning for this step - **finish**: Set True when the sub-task is fully complete (default: False) - **details**: Available fields: **skills**, **cognitive_history**. example: [{field: 'skills', index: 0}, ...] - **output**: Tool calls to execute: [{tool, tool_arguments: [{name: 'param', value: 'value'}]}] [17:10:34.984] [Think] (_cognitive_worker.py:332) Message 2 (user, 625 tokens): Based on the context below, decide your next action. Goal: Audit the /var/app directory for security issues Execution History: [Working Memory (0-2)] [0] Starting security audit of /var/app directory. First, I need to check the system security status before accessing any files, as per mandatory security rule #3. Then I'll list the files in the directory to understand what we're working with. Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='list_files', tool_arguments={'directory': '/var/app'}, tool_result='Files in /var/app: config.yaml, app.log, secrets.env, data.db', success=True, error=None)] [1] Checking system security status again before accessing files (per rule #3). Will read allowed files (config.yaml, app.log, data.db) while excluding secrets.env (prohibited by rule #2). Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='read_file', tool_arguments={'filepath': '/var/app/config.yaml'}, tool_result='Content of /var/app/config.yaml: (normal file content)', success=True, error=None), ActionStepResult(tool_id='call_2', tool_name='read_file', tool_arguments={'filepath': '/var... [2] Continuing the security audit of /var/app. Based on the file listing, I found: config.yaml, app.log, secrets.env, and data.db. Per rule #2, I must NOT read secrets.env (contains 'secret' in filename). I've already read config.yaml in the previous step. Now I need to check security status again (rule #3) and read the remaining allowed files (app.log and data.db) to complete the audit. I will NOT use delete_file under any circumstances (rule #1). Result: results=[ActionStepResult(tool_id='call_0', tool_name='check_security_status', tool_arguments={}, tool_result='System status: 2 warnings, 0 critical, last scan: 1 hour ago', success=True, error=None), ActionStepResult(tool_id='call_1', tool_name='read_file', tool_arguments={'filepath': '/var/app/app.log'}, tool_result='Content of /var/app/app.log: (normal file content)', success=True, error=None), ActionStepResult(tool_id='call_2', tool_name='read_file', tool_arguments={'filepath': '/var/app/dat... [17:10:34.984] [Think] (_cognitive_worker.py:332) Total: 1015 tokens (cumulative: 2806) [17:10:34.988] [Think] (_amphibious_automa.py:867) StrictSecurityWorker: finish=True, step=Completed security audit of /var/app directory. All allowed files (config.yaml, app.log, data.db) were reviewed after verifying system security status each time (rule #3). secrets.env was excluded per rule #2. System shows 2 warnings but no critical issues. No sensitive data exposed in findings. [17:10:34.989] [Act] (_amphibious_automa.py:873) StrictSecurityWorker: { "content": "Completed security audit of /var/app directory. All allowed files (config.yaml, app.log, data.db) were reviewed after verifying system security status each time (rule #3). secrets.env was excluded per rule #2. System shows 2 warnings but no critical issues. No sensitive data exposed in findings.", "result": null, "metadata": { "tool_calls": [] }, "status": null } ================================================== StrictAuditAgent-733b778c | Completed Tokens: 2806 | Time: 82.98s ================================================== Completed security audit of /var/app directory. All allowed files (config.yaml, app.log, data.db) were reviewed after verifying system security status each time (rule #3). secrets.env was excluded per rule #2. System shows 2 warnings but no critical issues. No sensitive data exposed in findings.
Even though the delete_file tool is available, the injected security rules instruct the LLM to never call it. The build_messages() hook gives you a clean way to add policies at the prompt level — the LLM sees the rules as part of its instructions.
Part 3: Act Phase — Intercepting Tool Calls¶
The act phase provides hooks to intercept and modify tool execution. These hooks give you programmatic control over what actually happens when the LLM decides to call a tool.
before_action — Filtering Dangerous Calls¶
The before_action() hook runs after the LLM makes its decision but before any tools are executed. This is your last line of defense — you can block, modify, or filter tool calls regardless of what the LLM decided.
The decision_result parameter is a List[Tuple[ToolCall, ToolSpec]] — each element is a pair of:
ToolCall— the LLM's request (name,arguments,id)ToolSpec— the matching tool specification (tool_name,tool_description,tool_parameters)
class SafeAuditAgent(AmphibiousAutoma[CognitiveContext]):
auditor = think_unit(
CognitiveWorker.inline("Audit the system and report security findings. Respond in JSON format."),
max_attempts=5,
)
async def before_action(self, decision_result, ctx):
"""Filter out any dangerous tool calls before execution."""
if isinstance(decision_result, list):
blocked_tools = {"delete_file"}
filtered = []
for tool_call, tool_spec in decision_result:
if tool_spec.tool_name in blocked_tools:
print(f"[BLOCKED] Prevented call to: {tool_spec.tool_name}")
else:
filtered.append((tool_call, tool_spec))
return filtered if filtered else decision_result
return decision_result
async def on_agent(self, ctx: CognitiveContext):
await self.auditor
agent = SafeAuditAgent(llm=llm, verbose=True)
result = await agent.arun(
goal="Audit /var/app and clean up unnecessary files",
tools=[list_files_tool, read_file_tool, delete_file_tool, check_security_status_tool],
)
print(result)
[17:25:05.947] [Router] (_amphibious_automa.py:1573) Auto-detecting execution mode [17:25:05.947] [Router] (_amphibious_automa.py:1579) Detected AGENT mode [17:25:05.948] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:25:11.238] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Starting the security audit of /var/app directory. First, I need to explore what files exist in this directory and check the overall system security status to understand the current state before identifying unnecessary files to clean up. [17:25:11.241] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Starting the security audit of /var/app directory. First, I need to explore what files exist in this directory and check the overall system security status to understand the current state before identifying unnecessary files to clean up.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "list_files", "tool_arguments": { "directory": "/var/app" }, "tool_result": "Files in /var/app: config.yaml, app.log, secrets.env, data.db", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:25:11.241] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:25:37.530] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Analyze the files in /var/app to identify security risks and unnecessary files. The initial scan found config.yaml, app.log, secrets.env, and data.db. The system has 2 security warnings. Next, read each file's content to check for sensitive data exposure, misconfigurations, or redundant files. [17:25:37.533] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Analyze the files in /var/app to identify security risks and unnecessary files. The initial scan found config.yaml, app.log, secrets.env, and data.db. The system has 2 security warnings. Next, read each file's content to check for sensitive data exposure, misconfigurations, or redundant files.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/data.db" }, "tool_result": "Content of /var/app/data.db: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:25:37.533] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:25:48.641] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Continuing the audit by examining secrets.env for potential sensitive data exposure, as environment files often contain credentials. Next, check config.yaml for misconfigurations. The system's 2 warnings may relate to these files. [17:25:48.645] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Continuing the audit by examining secrets.env for potential sensitive data exposure, as environment files often contain credentials. Next, check config.yaml for misconfigurations. The system's 2 warnings may relate to these files.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/secrets.env" }, "tool_result": "[SENSITIVE] Content of /var/app/secrets.env: API_KEY=sk-xxx, DB_PASS=yyy", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/config.yaml" }, "tool_result": "Content of /var/app/config.yaml: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:25:48.646] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:25:59.483] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Continuing the audit by examining app.log for potential sensitive data exposure or unnecessary log entries. Log files can accumulate sensitive information or become redundant over time. After reviewing all files, I'll determine which files pose security risks or are unnecessary for cleanup. [17:25:59.487] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Continuing the audit by examining app.log for potential sensitive data exposure or unnecessary log entries. Log files can accumulate sensitive information or become redundant over time. After reviewing all files, I'll determine which files pose security risks or are unnecessary for cleanup.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/app.log" }, "tool_result": "Content of /var/app/app.log: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:25:59.487] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:26:24.348] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=True, step=Completed audit of /var/app directory. Security findings: 1) secrets.env contains exposed credentials (API_KEY, DB_PASS) posing high risk. 2) System has 2 warnings likely related to sensitive file exposure. All files (config.yaml, data.db, app.log) appear necessary for application function. No unnecessary files identified for deletion, but secrets.env requires immediate security remediation (e.g., move to secure vault). [17:26:24.348] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Completed audit of /var/app directory. Security findings: 1) secrets.env contains exposed credentials (API_KEY, DB_PASS) posing high risk. 2) System has 2 warnings likely related to sensitive file exposure. All files (config.yaml, data.db, app.log) appear necessary for application function. No unnecessary files identified for deletion, but secrets.env requires immediate security remediation (e.g., move to secure vault).", "result": null, "metadata": { "tool_calls": [] }, "status": null } ================================================== SafeAuditAgent-4bb4ae32 | Completed Tokens: 3334 | Time: 78.40s ================================================== Completed audit of /var/app directory. Security findings: 1) secrets.env contains exposed credentials (API_KEY, DB_PASS) posing high risk. 2) System has 2 warnings likely related to sensitive file exposure. All files (config.yaml, data.db, app.log) appear necessary for application function. No unnecessary files identified for deletion, but secrets.env requires immediate security remediation (e.g., move to secure vault).
Even though the goal explicitly asks to "clean up unnecessary files," the before_action hook blocks any calls to delete_file. This is a programmatic safety net — unlike prompt-level rules that the LLM might occasionally ignore, before_action is enforced in code and cannot be bypassed.
The key difference between before_action and build_messages:
build_messages(Think phase): Persuades the LLM not to call dangerous tools. Effective but not guaranteed.before_action(Act phase): Programmatically blocks dangerous tools. Guaranteed enforcement.
action_custom_output — Post-Processing Structured Output¶
When a CognitiveWorker is configured with output_schema, the LLM produces a typed Pydantic instance instead of tool calls. In this mode, the framework calls action_custom_output() instead of action_tool_call().
This hook lets you post-process the structured output before it's recorded in the context — for example, sanitizing sensitive data, adding metadata, or validating business rules.
from pydantic import BaseModel, Field
from typing import List
class AuditReport(BaseModel):
"""Structured output schema for the audit worker."""
findings: List[str] = Field(description="List of security findings")
risk_level: str = Field(description="Overall risk level: low, medium, or high")
recommendations: List[str] = Field(description="Recommended actions")
class SanitizingAgent(AmphibiousAutoma[CognitiveContext]):
# output_schema tells the worker to produce an AuditReport instead of tool calls
auditor = think_unit(
CognitiveWorker.inline(
"Analyze the security status and produce a structured audit report. "
"List the findings, assess risk level, and provide recommendations.",
output_schema=AuditReport,
),
max_attempts=1,
)
async def action_custom_output(self, decision_result, ctx):
"""Redact sensitive information from structured output."""
if isinstance(decision_result, AuditReport):
decision_result.findings = [
f.replace("sk-xxx", "[REDACTED]").replace("yyy", "[REDACTED]")
for f in decision_result.findings
]
print(f"[action_custom_output] Sanitized {len(decision_result.findings)} findings")
return decision_result
async def on_agent(self, ctx: CognitiveContext):
await self.auditor
agent = SanitizingAgent(llm=llm, verbose=True)
result = await agent.arun(
goal="Produce a security audit report for /var/app. "
"Known facts: files include config.yaml, app.log, secrets.env (contains API_KEY=sk-xxx, DB_PASS=yyy), data.db. "
"System status: 2 warnings, 0 critical.",
)
print(result)
[17:26:49.335] [Router] (_amphibious_automa.py:1573) Auto-detecting execution mode [17:26:49.336] [Router] (_amphibious_automa.py:1579) Detected AGENT mode [17:26:49.336] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:27:05.063] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=True, step=Analyzed the /var/app directory contents and system status. Key findings include exposed secrets in secrets.env (API_KEY and DB_PASS in plaintext), potential log file sensitivity in app.log, and possible misconfigurations in config.yaml or data.db. Although system status reports 0 critical issues, plaintext secrets represent a high-risk vulnerability. Two warnings may relate to file permissions or logging practices. [action_custom_output] Sanitized 3 findings [17:27:05.063] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Analyzed the /var/app directory contents and system status. Key findings include exposed secrets in secrets.env (API_KEY and DB_PASS in plaintext), potential log file sensitivity in app.log, and possible misconfigurations in config.yaml or data.db. Although system status reports 0 critical issues, plaintext secrets represent a high-risk vulnerability. Two warnings may relate to file permissions or logging practices.", "result": { "findings": [ "secrets.env contains plaintext API_KEY and DB_PASS credentials", "app.log may expose sensitive data if not properly secured", "config.yaml and data.db could have insecure default configurations" ], "risk_level": "high", "recommendations": [ "Move secrets to a secure vault (e.g., HashiCorp Vault, AWS Secrets Manager)", "Restrict file permissions (chmod 600) for secrets.env and data.db", "Implement log sanitization to prevent sensitive data leakage", "Audit config.yaml for insecure defaults and encrypt data.db at rest" ] }, "metadata": {}, "status": null } ================================================== SanitizingAgent-03a27e3f | Completed Tokens: 177 | Time: 15.73s ================================================== Analyzed the /var/app directory contents and system status. Key findings include exposed secrets in secrets.env (API_KEY and DB_PASS in plaintext), potential log file sensitivity in app.log, and possible misconfigurations in config.yaml or data.db. Although system status reports 0 critical issues, plaintext secrets represent a high-risk vulnerability. Two warnings may relate to file permissions or logging practices.
Notice the key difference from the previous examples:
- The worker uses
output_schema=AuditReport— this tells the LLM to produce a typedAuditReportinstance instead of making tool calls. - Because the output is structured, the framework routes to
action_custom_output()instead ofaction_tool_call(). - Inside the hook,
decision_resultis theAuditReportinstance — you can inspect and modify its typed fields directly.
This hook is only relevant when using output_schema. For normal tool-call workflows, use before_action (pre-execution) or after_action (post-execution) instead.
after_action — Updating Context After Execution¶
The after_action() hook runs after tool execution completes but before the result is returned. This is the ideal place to update custom context fields based on tool results — for example, tracking which documents have been analyzed or accumulating findings across steps.
Like before_action, it follows the worker → agent delegation pattern: the worker's after_action() runs first; if it is not overwrited, the agent-level after_action() is called.
from pydantic import ConfigDict
from bridgic.amphibious._type import ActionResult
class AuditContext(CognitiveContext):
"""Custom context that tracks audit findings across steps."""
model_config = ConfigDict(arbitrary_types_allowed=True)
files_checked: list = Field(default_factory=list, description="Directories that have been listed")
findings_count: int = Field(default=0, description="Total number of tool actions completed")
class ContextUpdatingAgent(AmphibiousAutoma[AuditContext]):
auditor = think_unit(
CognitiveWorker.inline(
"Audit the system and report security findings. "
),
max_attempts=3,
)
async def after_action(self, step_result, ctx: AuditContext):
"""Update custom context fields based on tool results."""
action_result = step_result.result
if not isinstance(action_result, ActionResult):
return
for step in action_result.results:
if not step.success:
continue
ctx.findings_count += 1
if step.tool_name == "list_files":
ctx.files_checked.append(step.tool_arguments.get("directory", ""))
print(f"[after_action] Files checked: {ctx.files_checked}, "
f"Total actions: {ctx.findings_count}")
async def on_agent(self, ctx: AuditContext):
await self.auditor
agent = ContextUpdatingAgent(llm=llm, verbose=True)
result = await agent.arun(
goal="Audit the /var/app directory for security issues",
tools=[list_files_tool, read_file_tool, check_security_status_tool],
)
print(f"\nFinal context — Files checked: {agent.context.files_checked}, "
f"Total actions: {agent.context.findings_count}")
[17:31:23.553] [Router] (_amphibious_automa.py:1573) Auto-detecting execution mode [17:31:23.554] [Router] (_amphibious_automa.py:1579) Detected AGENT mode [17:31:23.554] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:31:31.053] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Starting the security audit of /var/app directory. First step is to list all files and subdirectories to identify potential targets for further inspection. [after_action] Files checked: ['/var/app'], Total actions: 1 [17:31:31.056] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Starting the security audit of /var/app directory. First step is to list all files and subdirectories to identify potential targets for further inspection.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "list_files", "tool_arguments": { "directory": "/var/app" }, "tool_result": "Files in /var/app: config.yaml, app.log, secrets.env, data.db", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:31:31.056] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:32:02.386] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Proceeding to inspect the contents of each file in /var/app for security vulnerabilities. Priority is given to secrets.env (potential exposed credentials), config.yaml (misconfigurations), app.log (sensitive data logging), and data.db (database security). [after_action] Files checked: ['/var/app'], Total actions: 5 [17:32:02.390] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Proceeding to inspect the contents of each file in /var/app for security vulnerabilities. Priority is given to secrets.env (potential exposed credentials), config.yaml (misconfigurations), app.log (sensitive data logging), and data.db (database security).", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/secrets.env" }, "tool_result": "[SENSITIVE] Content of /var/app/secrets.env: API_KEY=sk-xxx, DB_PASS=yyy", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/config.yaml" }, "tool_result": "Content of /var/app/config.yaml: (normal file content)", "success": true, "error": null }, { "tool_id": "call_2", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/app.log" }, "tool_result": "Content of /var/app/app.log: (normal file content)", "success": true, "error": null }, { "tool_id": "call_3", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/data.db" }, "tool_result": "Content of /var/app/data.db: (normal file content)", "success": true, "error": null } ] }, "metadata": {}, "status": null } [17:32:02.390] [Observe] (_amphibious_automa.py:861) _PromptWorker: None [17:32:10.624] [Think] (_amphibious_automa.py:867) _PromptWorker: finish=False, step=Continuing the security audit by inspecting remaining files (app.log and data.db) for sensitive data exposure and database security issues. After file inspection, will run a system security status check to complete the audit. [after_action] Files checked: ['/var/app'], Total actions: 7 [17:32:10.627] [Act] (_amphibious_automa.py:873) _PromptWorker: { "content": "Continuing the security audit by inspecting remaining files (app.log and data.db) for sensitive data exposure and database security issues. After file inspection, will run a system security status check to complete the audit.", "result": { "results": [ { "tool_id": "call_0", "tool_name": "read_file", "tool_arguments": { "filepath": "/var/app/data.db" }, "tool_result": "Content of /var/app/data.db: (normal file content)", "success": true, "error": null }, { "tool_id": "call_1", "tool_name": "check_security_status", "tool_arguments": {}, "tool_result": "System status: 2 warnings, 0 critical, last scan: 1 hour ago", "success": true, "error": null } ] }, "metadata": {}, "status": null } ================================================== ContextUpdatingAgent-02b8dd06 | Completed Tokens: 1504 | Time: 47.08s ================================================== Final context — Files checked: ['/var/app'], Total actions: 7
Hook Summary¶
Here is the complete reference for all OTC hooks:
| Hook | Override At | Default Behavior | Use Case |
|---|---|---|---|
observation() | Worker / Agent | Worker → _DELEGATE; Agent → None | Inject environment perception |
thinking() | Worker | Must implement | Define the thinking prompt |
build_messages() | Worker | Standard assembly | Customize LLM message format |
before_action() | Worker / Agent | Worker → _DELEGATE; Agent → passthrough | Intercept/filter tool calls |
action_tool_call() | Agent | Concurrent execution | Custom tool execution strategy |
action_custom_output() | Agent | Passthrough | Post-process output_schema results |
after_action() | Worker / Agent | Worker → _DELEGATE; Agent → no-op | Update context after execution |
Note:
action_tool_call()andaction_custom_output()are mutually exclusive paths — the framework routes to one or the other based on whether the worker usesoutput_schema.
What have we learnt?¶
In this tutorial, we explored the full hook system of the OTC cycle:
- Observe hooks (
observation()) let you inject custom environment perception at both the Worker and Agent level. Workers can delegate to the agent via_DELEGATE. - Think hooks (
build_messages()) let you reshape the messages sent to the LLM — useful for injecting safety rules or custom instructions. - Act hooks (
before_action(),action_tool_call(),action_custom_output(),after_action()) let you intercept tool calls before execution, customize how tools run, post-process outputs, and update custom context fields after execution. - The delegation pattern (
_DELEGATE) provides a clean two-tier hook system: per-worker customization with agent-level fallback.
These hooks give you fine-grained control over agent behavior without modifying the core framework logic.