Human-in-the-loop¶
When building workflows or agents with Bridgic, developers can seamlessly integrate human-in-the-loop interactions into the execution flow. At any point, the system can pause its automated process to request human input — such as approval, verification, or additional instructions — and wait for a response. Once the human feedback is provided, the workflow or agent resumes execution from the point of interruption, adapting its behavior based on the new input. Bridgic ensures that the entire process, including paused and resumed states, can be reliably serialized and deserialized for persistence and recovery.
Interaction Scenarios¶
Let's go through a few simple examples to understand this process. Before that, let's set up the running environment.
Run the following pip command to make sure the 'openai' integration is installed.
pip install bridgic-llms-openai
import os
# Get the API base, API key and model name.
_api_key = os.environ.get("OPENAI_API_KEY")
_api_base = os.environ.get("OPENAI_API_BASE")
_model_name = os.environ.get("OPENAI_MODEL_NAME")
from pydantic import BaseModel, Field
from bridgic.core.automa import GraphAutoma, worker, Snapshot
from bridgic.core.automa.args import From
from bridgic.core.automa.interaction import Event, Feedback, FeedbackSender, InteractionFeedback, InteractionException
from bridgic.core.model.types import Message, Role
from bridgic.core.model.protocols import PydanticModel
from bridgic.llms.openai import OpenAILlm
Programming assistant¶
During the development of a programming assistant, it can be designed to automatically execute and verify the code it generates. However, since program execution consumes system resources, the user must decide whether to grant permission for the assistant to run the code.
Let's achieve it with Bridgic. The source code can be downloaded in GitHub. The steps are as follows:
- Generate code based on user requirements.
- Ask the user if it is allowed to execute the generated code.
- Output result.
# Set the LLM
llm = OpenAILlm(api_base=_api_base, api_key=_api_key, timeout=10)
class CodeBlock(BaseModel):
code: str = Field(description="The code to be executed.")
class CodeAssistant(GraphAutoma):
@worker(is_start=True)
async def generate_code(self, user_requirement: str):
response = await llm.astructured_output(
model=_model_name,
messages=[
Message.from_text(text=f"You are a programming assistant. Please generate code according to the user's requirements.", role=Role.SYSTEM),
Message.from_text(text=user_requirement, role=Role.USER),
],
constraint=PydanticModel(model=CodeBlock)
)
return response.code
@worker(dependencies=["generate_code"])
async def ask_to_run_code(self, code: str):
event = Event(event_type="can_run_code", data=code)
feedback = await self.request_feedback_async(event)
return feedback.data
@worker(dependencies=["ask_to_run_code"])
async def output_result(self, feedback: str, code: str = From("generate_code")):
code = code.strip("```python").strip("```")
if feedback == "yes":
print(f"- - - - - - Result - - - - - -")
exec(code)
print(f"- - - - - - End - - - - - -")
else:
print(f"This code was rejected for execution. In response to the requirements, I have generated the following code:\n```python\n{code}\n```")
In the ask_to_run_code() method of CodeAssistant, we use request_feedback_async() to send an Event to the human user and expect to receive a feedback. To handle this Event, the corresponding logic needs to be registered with the automa, like this:
# Handle can_run_code event
def can_run_code_handler(event: Event, feedback_sender: FeedbackSender):
print(f"Can I run this code now to verify if it's correct?")
print(f"```python\n{event.data}\n```")
res = input("Please input your answer (yes/no): ")
if res in ["yes", "no"]:
feedback_sender.send(Feedback(data=res))
else:
print("Invalid input. Please input yes or no.")
feedback_sender.send(Feedback(data="no"))
# register can_run_code event handler to `CodeAssistant` automa
code_assistant = CodeAssistant()
code_assistant.register_event_handler("can_run_code", can_run_code_handler)
Now let's use it!
await code_assistant.arun(user_requirement="Please write a function to print 'Hello, World!' and run it.")
Can I run this code now to verify if it's correct?
```python
def greet():
print('Hello, World!')
greet()
```
- - - - - - Result - - - - - -
Hello, World!
- - - - - - End - - - - - -
In the above example, Bridgic wrap the message sent to the human user in an Event and he message received from the user in a FeedBack.
Eventcontains three fields:event_type: A string. The event type is used to identify the registered event handler.timestamp: A Python datetime object. The timestamp of the event. The default isdatetime.now().data: The data attached to the event.
FeedBackcontains one field:data: The data attached to the feedback.
request_feedback_async() send an event to the user and request for a feedback. This method call will block the caller until the feedback is received. However, thanks to Python’s asynchronous event loop mechanism, other automas running on the same main thread will not be blocked.
The registered event handler must be defined as type of EventHandlerType. Here it should be a function that takes an Event and a FeedbackSender as arguments.
Counting notifier¶
Sometimes, it may be necessary to post an event without expecting any feedback, for example, message notifications or progress updates. At this point, we call the post_event() method and register a event handler of type EventHandlerType to process the event. Here the event handler should be a function that takes only an Event as an argument
For example, a counting notifier is implemented to count from 1 up to the number specified by the user_input argument. The user can also specify which number (notify_int) should trigger a reminder.
class MessageNotifier(GraphAutoma):
@worker(is_start=True)
async def notify(self, user_input: int, notify_int: int):
print(f"Loop from 1 to {user_input}")
for i in range(1, user_input + 1):
if i == notify_int:
event = Event(event_type="message_notification", data=f"Loop {i} times")
self.post_event(event)
def message_notification_handler(event: Event):
print(f'!! Now count to {event.data}. !!')
message_notifier = MessageNotifier()
message_notifier.register_event_handler("message_notification", message_notification_handler)
await message_notifier.arun(user_input=10, notify_int=5)
Loop from 1 to 10 !! Now count to Loop 5 times. !!
Reimbursement Workflow¶
In certain scenarios, it may be necessary to wait for feedback for a long time after triggering an event. However, keeping the system in a waiting state would result in unnecessary resource consumption.
Bridgic provides a powerful interact_with_human mechanism for interruption recovery in this situation. This allows the workflow or agent to pause and persist its current execution state when such an event occur, wait for feedback, and then resume execution.
Let's implement a reimbursement workflow that is automatically triggered by the enterprise's OA system and requires approval before the reimbursement can be completed. The source code can be downloaded in GitHub.
import os
import tempfile
from httpx import delete
from pydantic import BaseModel
from datetime import datetime
from bridgic.core.automa import GraphAutoma, worker, Snapshot
from bridgic.core.automa.args import From
from bridgic.core.automa.interaction import Event, InteractionFeedback, InteractionException
class ReimbursementRecord(BaseModel):
request_id: int
employee_id: int
employee_name: str
reimbursement_month: str
reimbursement_amount: float
description: str
created_at: datetime
updated_at: datetime
class AuditResult(BaseModel):
request_id: int
passed: bool
audit_reason: str
class ReimbursementWorkflow(GraphAutoma):
@worker(is_start=True)
async def load_record(self, request_id: int):
"""
The reimbursement workflow can be triggered by the OA system — for instance, when an employee submits a new reimbursement request. Each request is uniquely identified by a `request_id`, which is then used to retrieve the corresponding reimbursement record from the database.
"""
# Load the data from database.
return await self.load_record_from_database(request_id)
@worker(dependencies=["load_record"])
async def audit_by_rules(self, record: ReimbursementRecord):
"""
This method simulates the logic for automatically determining whether a reimbursement request complies with business rules.
Typical reasons for a reimbursement request failing the audit include:
- Unusually large individual amounts
- Excessive total amounts within a month
- Expenses that do not meet reimbursement policies
- Missing or invalid supporting documents
- Duplicate submissions
- Other non-compliant cases
"""
if record.reimbursement_amount > 2500:
return AuditResult(
request_id=record.request_id,
passed=False,
audit_reason="The reimbursement amount {record.reimbursement_amount} exceeds the limit of 2500."
)
# TODO: Add more audit rules here.
...
return AuditResult(
request_id=record.request_id,
passed=True,
audit_reason="The reimbursement request passed the audit."
)
@worker(dependencies=["audit_by_rules"])
async def execute_payment(self, result: AuditResult, record: ReimbursementRecord = From("load_record")):
if not result.passed:
print(f"The reimbursement request {record.request_id} failed the audit. Reason: {result.audit_reason}")
return False
# The reimbursement request {record.request_id} has passed the audit rules. Requesting approval from the manager...
# human-in-the-loop: request approval from the manager.
event = Event(
event_type="request_approval",
data={
"reimbursement_record": record,
"audit_result": result
}
)
feedback: InteractionFeedback = self.interact_with_human(event)
if feedback.data == "yes":
await self.lanuch_payment_transaction(record.request_id)
print(f"The reimbursement request {record.request_id} has been approved. Payment transaction launched.")
return True
print(f"!!!The reimbursement request {record.request_id} has been rejected. Payment transaction not launched.\nRejection info:\n {feedback.data}")
return False
async def load_record_from_database(self, request_id: int):
# Simulate a database query...
return ReimbursementRecord(
request_id=request_id,
employee_id=888888,
employee_name="John Doe",
reimbursement_month="2025-10",
reimbursement_amount=1024.00,
description="Hotel expenses for a business trip",
created_at=datetime(2025, 10, 11, 10, 0, 0),
updated_at=datetime(2025, 10, 11, 10, 0, 0)
)
async def lanuch_payment_transaction(self, request_id: int):
# Simulate a payment execution...
...
This workflow, ReimbursementWorkflow, consists of three steps:
load_record: Loading the reimbursement record identified by arequest_idfrom database.audit_by_rules: Automatically audit the reimbursement request by predefined rules.execute_payment: Requesting approval from the manager (a human user) after the reimbursement request has passed the audit rules.
In the third step (execute_payment), calling interact_with_human() posts an event and pauses the workflow execution.
async def save_snapshot_to_database(snapshot: Snapshot):
# Simulate a database storage using temporary files.
temp_dir = tempfile.TemporaryDirectory()
bytes_file = os.path.join(temp_dir.name, "reimbursement_workflow.bytes")
version_file = os.path.join(temp_dir.name, "reimbursement_workflow.version")
with open(bytes_file, "wb") as f:
f.write(snapshot.serialized_bytes)
with open(version_file, "w") as f:
f.write(snapshot.serialization_version)
return {
"bytes_file": bytes_file,
"version_file": version_file,
"temp_dir": temp_dir,
}
reimbursement_workflow = ReimbursementWorkflow()
try:
await reimbursement_workflow.arun(request_id=123456)
except InteractionException as e:
# The `ReimbursementWorkflow` instance has been paused and serialized to a snapshot.
interaction_id = e.interactions[0].interaction_id
record = e.interactions[0].event.data["reimbursement_record"]
# Save the snapshot to the database.
db_context = await save_snapshot_to_database(e.snapshot)
print("The `ReimbursementWorkflow` instance has been paused and serialized to a snapshot.")
print("The snapshot has been persisted to database.")
The `ReimbursementWorkflow` instance has been paused and serialized to a snapshot. The snapshot has been persisted to database.
When the arun method of the automa instance is called, an InteractionException will be raised as a result of invoking interact_with_human().
An InteractionException contains two fields:
interactions: A list ofInteractions, eachInteractioncontaining aninteraction_idand anevent.snapshot: aSnapshotinstance, representing the Automa's current state serialized in bytes.
Then the workflow, ReimbursementWorkflow, pauses, and the snapshot corresponding to the interaction is persisted in the database for later recovery.
async def load_snapshot_from_database(db_context):
# Simulate a database query using temporary files.
bytes_file = db_context["bytes_file"]
version_file = db_context["version_file"]
temp_dir = db_context["temp_dir"]
with open(bytes_file, "rb") as f:
serialized_bytes = f.read()
with open(version_file, "r") as f:
serialization_version = f.read()
snapshot = Snapshot(
serialized_bytes=serialized_bytes,
serialization_version=serialization_version
)
return snapshot
print("Waiting for the manager's approval (It may take long time) ...")
human_feedback = input(
"\n"
"---------- Message to User ------------\n"
"A reimbursement request has been submitted and audited by the system.\n"
"Please check the details and give your approval or rejection.\n"
"Reimbursement Request Details:\n"
f"\n{record.model_dump_json(indent=4)}\n"
"If you approve the request, please input 'yes'.\n"
"Otherwise, please input 'no' or the reason for rejection.\n"
"Your input: "
)
# Load the snapshot from the database.
snapshot = await load_snapshot_from_database(db_context)
# Deserialize the `ReimbursementWorkflow` instance from the snapshot.
reimbursement_workflow = ReimbursementWorkflow.load_from_snapshot(snapshot)
print("-------------------------------------\n")
print("The `ReimbursementWorkflow` instance has been deserialized and loaded from the snapshot. It will resume to run immediately...")
feedback = InteractionFeedback(
interaction_id=interaction_id,
data=human_feedback
)
await reimbursement_workflow.arun(interaction_feedback=feedback)
Waiting for the manager's approval (It may take long time) ... ------------------------------------- The `ReimbursementWorkflow` instance has been deserialized and loaded from the snapshot. It will resume to run immediately... The reimbursement request 123456 has been approved. Payment transaction launched.
After an extended period, the user may complete the approval interaction for the reimbursement workflow. The system then retrieves the serialized snapshot from the database and deserializes it into an instance of the ReimbursementWorkflow class. Subsequently, the user’s decision — either approval or rejection — is wrapped into an InteractionFeedback object, and the arun method of the automa is invoked again to resume the workflow execution from the previously paused state.
When facing a situation that requires feedback but the waiting time is uncertain, this mechanism saves the current state and re-enters when the right moment comes in the future. This not only enables the system to release resources that would otherwise be occupied for a long time, but also allows it to be awakened at an appropriate time.
What have we done¶
Bridgic provides flexible support for any form of human-in-the-loop interaction:
request_feedback_async: Used when the event must return feedback before the program can proceed. The program remains blocked until feedback is received.post_event: Used when you just want to notify or trigger an event without expecting any feedback. The main program never blocks.interact_with_human: Used when feedback is required but may arrive much later. The program is suspended and persisted, and resumes immediately when feedback becomes available.