Parameter Binding¶
There are three ways to pass data among workers, including Arguments Mapping, Arguments Injection, and Inputs Propagation. Now let's understand them with a sample example.
Query expansion¶
Query expansion is a common step in RAG and can enhance the quality of RAG. To enhance the quality of query expansion, developers often first extract the entity information from the query and use it to assist the model in expanding the original query.
Now let's implement this. The user inputs the original query, and then we expand the query to obtain more queries. There are three steps to complete the query expansion:
- Receive the user's input and perform preprocessing to get the original query.
- Extract the entity information from the query to get the entity information.
- Expand and obtain multiple queries.
1. Initialize¶
Before we start, let's prepare the running environment. In this tutorial, we will use the OpenAI model integration (not OpenAI-like) that supports the StructuredOutput feature. Run the following pip command to make sure this integration is available.
pip install bridgic-llms-openai
# Get the environment variables.
import os
_api_key = os.environ.get("OPENAI_API_KEY")
_api_base = os.environ.get("OPENAI_API_BASE")
_model_name = os.environ.get("OPENAI_MODEL_NAME")
# Import the necessary packages.
from pydantic import BaseModel, Field
from typing import List, Dict, Tuple
from bridgic.core.automa import GraphAutoma, worker
from bridgic.core.automa.args import ArgsMappingRule
from bridgic.core.model.types import Message, Role
from bridgic.core.model.protocols import PydanticModel
from bridgic.llms.openai import OpenAILlm, OpenAIConfiguration
llm = OpenAILlm( # the llm instance
api_base=_api_base,
api_key=_api_key,
timeout=5,
configuration=OpenAIConfiguration(model=_model_name),
)
2. Complete Query Expansion¶
Now let's implement this query expansion. We assume that the user query we receive is in JSON format. It contains three keys:
id: A string that indicates who inputs the query.query: A string in the form ofQ: user_queryrepresenting the question input by the user.date: The time when the user entered the query.
query_obj = {
"id": "user_1",
"query": "Q: What new developments have there been in RAG in the past year?",
"date": "2025-09-30"
}
Furthermore, we define that when the model completes entity extraction and query expansion, it returns the result in a Pydantic data structure.
class EntityList(BaseModel): # The expected format of the model output in the extract_entity worker
entities: List[str] = Field(description="All entities in the input.")
class QueryList(BaseModel): # The expected format of the model output in the expand_query worker
queries: List[str] = Field(description="All queries in the input.")
Next, let's complete the three steps of query expansion to achieve our goal:
class QueryExpansion(GraphAutoma):
@worker(is_start=True)
async def pre_query(self, query_obj: Dict): # Receive the user's input and preprocess query
query = query_obj["query"].split(":")[1].strip()
return query
@worker(is_start=True)
async def pre_date(self, query_obj: Dict): # Receive the user's input and preprocess date
date = query_obj["date"]
return date
@worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.AS_IS)
async def extract_entity(self, query: str, date: str): # Extract the entity information from the question, get entity information.
response: EntityList = await llm.astructured_output(
constraint=PydanticModel(model=EntityList),
messages=[
Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
Message.from_text(text=query, role=Role.USER,),
]
)
return query, response.entities, date
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.AS_IS)
async def expand_query(self, query_meta: Tuple[str, List[str]]): # Expand and obtain multiple queries.
query, entities, date = query_meta
task_input = f"Query: {query}\nEntities: {entities}"
response: QueryList = await llm.astructured_output(
constraint=PydanticModel(model=QueryList),
messages=[
Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
Message.from_text(text=task_input, role=Role.USER,),
]
)
return response.queries
Let's run it!
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)
['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?', 'What new developments have emerged in RAG systems over the past 12 months?', 'How have RAG implementations evolved in the last year?', 'What innovations in RAG have been introduced between 2024 and 2025?', 'What are the key breakthroughs in RAG technology in 2025?', 'What new features or improvements have been added to RAG models in the past year?', 'How has the performance of RAG systems improved in the last 12 months?', 'What are the most recent trends and developments in RAG research and deployment?', 'What new techniques have been introduced in RAG to improve accuracy and efficiency in 2025?', 'What are the major updates in RAG frameworks and tools from 2024 to 2025?']
Great! We have successfully completed the small module for query expansion.
What have we done?¶
Reviewing the code, we find that each @worker decorator has an args_mapping_rule parameter. Let's understand what it does.
Arguments Mapping¶
The args_mapping_rule defines the way data is passed between directly dependent workers, that is, how the result of the previous worker is mapped to the parameter of the next worker. Its value can only be specified through the properties of ArgsMappingRule.
AS_IS mode (default)¶
In the AS_IS mode, a worker will receive the output of all its directly dependent workers as input parameters in the order declared by the dependencies.
In the above example, extract_entity declares dependencies: dependencies=["pre_query", "pre_date"], so the results of the two preceding workers will be mapped to the first and second parameters of extract_entity in the order specified by the dependencies declaration, the result of pre_query is mapped to query parameter and the result of pre_date is mapped to date parameter.
Note: The declaration order in dependencies only affects the order of parameter mapping, but does not influence the execution order of the dependent workers.
Additionally, if the previous worker returns a result with multiple values, such as return x, y, then all the results will be passed as a tuple result. So in the above example, the parameter query_meta of expand_query received all the result values from extract_entity.
UNPACK mode¶
Let's go back to the previous example. In the expand_query, we receive the parameters from the previous worker in the AS_IS mode and manually unpack them as a whole, like this:
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.AS_IS)
async def expand_query(self, query_meta: Tuple[str, List[str]]): # Expand and obtain multiple queries.
query, entities, date = query_meta
...
This operation requires knowing what the parameters of query_meta as a whole contain, which might seem inconvenient. Could we complete the unpacking operation and fill in the corresponding parameters when returning? At this point, the UNPACK mode comes in handy.
Let's modify the expand_query in the above example and add some print messages.
class QueryExpansion(GraphAutoma):
@worker(is_start=True)
async def pre_query(self, query_obj: Dict): # Receive the user's input and preprocess query
query = query_obj["query"].split(":")[1].strip()
return query
@worker(is_start=True)
async def pre_date(self, query_obj: Dict): # Receive the user's input and preprocess date
date = query_obj["date"]
return date
@worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.AS_IS)
async def extract_entity(self, query: str, date: str): # Extract the entity information from the question, get entity information.
response: EntityList = await llm.astructured_output(
constraint=PydanticModel(model=EntityList),
messages=[
Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
Message.from_text(text=query, role=Role.USER,),
]
)
return query, response.entities, date
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query: str, entities: List[str], date: str): # Expand and obtain multiple queries.
print(f"query: {query}, entities: {entities}, date: {date}")
task_input = f"Query: {query}\nEntities: {entities}"
response: QueryList = await llm.astructured_output(
constraint=PydanticModel(model=QueryList),
messages=[
Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
Message.from_text(text=task_input, role=Role.USER,),
]
)
return response.queries
Let's run it!
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)
query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30
['What are the latest advancements in Retrieval-Augmented Generation (RAG) technologies as of 2025?', 'What new developments have emerged in RAG systems over the past 12 months?', 'How has RAG evolved in the last year with regard to accuracy, efficiency, and scalability?', 'What are the key innovations in RAG that have been introduced between 2024 and 2025?', 'What new tools and frameworks have been launched for RAG implementation in the past year?', 'What recent breakthroughs in RAG have improved context handling and retrieval precision?', 'How have large language models integrated with RAG in the past year to enhance performance?', 'What are the most significant updates in RAG-based applications from 2024 to 2025?', 'What new techniques in RAG have been proposed to reduce hallucinations and improve factual consistency?', 'How have RAG solutions adapted to real-time data retrieval and dynamic content updates in the past year?']
Great! All the parameters were unpacked and accepted. It can be seen that the unpack mode makes our task flow clearer!
However, it should be noted that the UNPACK mechanism requires that the current worker can only directly depend on one worker; otherwise, the results of multiple workers will be confused when unpacking!
MERGE mode¶
At the same time, conversely, since there is an UNPACK mechanism, is there also a mechanism that can aggregate multiple results for receiving? This is particularly useful when a worker collects the results of multiple dependent workers. At this point, the MERGE mode comes in handy.
Still referring to the example above, extract_entity actually received the results from two workers. Now let's try to make extract_entity receive all these results in a single parameter for use, instead of receiving two parameters.
Let's modify the extract_entity in the above example and add some print messages.
class QueryExpansion(GraphAutoma):
@worker(is_start=True)
async def pre_query(self, query_obj: Dict): # Receive the user's input and preprocess query
query = query_obj["query"].split(":")[1].strip()
return query
@worker(is_start=True)
async def pre_date(self, query_obj: Dict): # Receive the user's input and preprocess date
date = query_obj["date"]
return date
@worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.MERGE)
async def extract_entity(self, query_meta: Tuple[str, str]): # Extract the entity information from the question, get entity information.
print(f"query_meta: {query_meta}")
query, date = query_meta
response: EntityList = await llm.astructured_output(
constraint=PydanticModel(model=EntityList),
messages=[
Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
Message.from_text(text=query, role=Role.USER,),
]
)
return query, response.entities, date
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query: str, entities: List[str], date: str): # Expand and obtain multiple queries.
task_input = f"Query: {query}\nEntities: {entities}"
response: QueryList = await llm.astructured_output(
constraint=PydanticModel(model=QueryList),
messages=[
Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
Message.from_text(text=task_input, role=Role.USER,),
]
)
return response.queries
Let's run it!
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)
query_meta: ['What new developments have there been in RAG in the past year?', '2025-09-30']
['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?', 'What new developments have emerged in RAG systems over the past 12 months?', 'How has RAG evolved in the last year with recent innovations in AI and NLP?', 'What are the key updates and breakthroughs in RAG models from 2024 to 2025?', 'What new features or improvements have been introduced in RAG implementations in the past year?', 'What are the most significant RAG developments reported in 2025?', 'How have retrieval and generation components in RAG been improved in the last year?', 'What are the recent trends and new developments in RAG applications from 2024 to 2025?', 'What innovations in RAG have been introduced by leading AI companies in the past year?', 'What new challenges and solutions have emerged in RAG research over the last 12 months?']
Great! The results that extract_entity depends on from the workers have all been collected into a list and passed to its parameters.
Arguments Injection¶
Looking back at the example above, we actually find that the date information is passed through pre_date, extract_entity, and finally reaches expand_query. However, in reality, extract_entity doesn't use this information at all. Thus, passing date here seems redundant. And The use of date in expand_query essentially only means that the data depends on it, but whether it is executed or not, this control dependency does not directly rely on it.
Bridgic emphasizes the separation of data dependency and control dependency. This is beneficial for the future construction of complex graphs, as it allows for decoupling and avoids the need to adjust the entire graph due to changes in data dependency.
In Bridgic, we can use Arguments Injection to make it. We can indicate which worker's result to take by using the From marker when declaring parameters, and at the same time set the default value if no result is obtained. For example:
# import the From marker
from bridgic.core.automa import From
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query_meta: Tuple[str, str], date: str = From("pre_date", "2025-01-01")): # Expand and obtain multiple queries.
...
date: str = From("pre_date", "2025-01-01") indicates that the value of date will be assigned based on the result of the pre_date worker. If the result from this worker has not yet been produced, the default value will be used instead.
If the pre_date worker does not exist, or if the pre_date worker has not yet produced a result, and there is no default value, an error will be reported: AutomaDataInjectionError.
Let's modify the above example and add some print messages.
class QueryExpansion(GraphAutoma):
@worker(is_start=True)
async def pre_query(self, query_obj: Dict): # Receive the user's input and preprocess query
query = query_obj["query"].split(":")[1].strip()
return query
@worker(is_start=True)
async def pre_date(self, query_obj: Dict): # Receive the user's input and preprocess date
date = query_obj["date"]
return date
@worker(dependencies=["pre_query"], args_mapping_rule=ArgsMappingRule.AS_IS)
async def extract_entity(self, query: str): # Extract the entity information from the question, get entity information.
response: EntityList = await llm.astructured_output(
constraint=PydanticModel(model=EntityList),
messages=[
Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
Message.from_text(text=query, role=Role.USER,),
]
)
return query, response.entities
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query: str, entities: List[str], date: str = From("pre_date", "2025-01-01")): # Expand and obtain multiple queries.
print(f"query: {query}, entities: {entities}, date: {date}")
task_input = f"Query: {query}\nEntities: {entities}"
response: QueryList = await llm.astructured_output(
constraint=PydanticModel(model=QueryList),
messages=[
Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
Message.from_text(text=task_input, role=Role.USER,),
]
)
return response.queries
Let's run it!
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)
query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30
['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?', 'What new developments have emerged in RAG systems over the past 12 months?', 'How have RAG implementations evolved in the last year in terms of performance and scalability?', 'What innovations in RAG have been introduced in 2025 that improve accuracy and context handling?', 'What are the key breakthroughs in RAG research and deployment from 2024 to 2025?', 'What new tools and frameworks have been released for RAG in the past year?', 'How have privacy and security features improved in RAG systems over the last year?', 'What are the most notable RAG developments in enterprise AI applications from 2024 to 2025?', "What recent improvements have been made to RAG's ability to handle long-context inputs?", 'How has the integration of RAG with large language models evolved in the past year?']
I have modified extract_entity, and now it only accepts query, making its functionality more pure. Also, in expand_query, I have correctly obtained the date.
Inputs Propagation¶
Looking back at the example above again, our program did not process the id field in the input at all. Eventually, we only returned a list of generalized problems, which might cause the external call to be unable to associate which "id" corresponds to the result. However, this ID neither requires preprocessing nor is it needed for entity extraction.
We can use Inputs Propagation to resolve it. This can be achieved by adding the name of the startup parameter to the worker when declaring the parameters.
@worker(dependencies=["extract_entity"], is_output=True args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(
self,
query: str,
entities: List[str],
+ query_obj: Dict, # The input of the entire Automa
date: str = From("pre_date", "2025-01-01"),
): # Expand and obtain multiple queries.
...
Let's modify the above example and add some print messages.
class QueryExpansion(GraphAutoma):
@worker(is_start=True)
async def pre_query(self, query_obj: Dict): # Receive the user's input and preprocess query
query = query_obj["query"].split(":")[1].strip()
return query
@worker(is_start=True)
async def pre_date(self, query_obj: Dict): # Receive the user's input and preprocess date
date = query_obj["date"]
return date
@worker(dependencies=["pre_query"], args_mapping_rule=ArgsMappingRule.AS_IS)
async def extract_entity(self, query: str): # Extract the entity information from the question, get entity information.
response: EntityList = await llm.astructured_output(
constraint=PydanticModel(model=EntityList),
messages=[
Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
Message.from_text(text=query, role=Role.USER,),
]
)
return query, response.entities
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query: str, entities: List[str], query_obj: Dict, date: str = From("pre_date", "2025-01-01")): # Expand and obtain multiple queries.
print(f"query: {query}, entities: {entities}, date: {date}, query_obj: {query_obj}")
task_input = f"Query: {query}\nEntities: {entities}"
response: QueryList = await llm.astructured_output(
constraint=PydanticModel(model=QueryList),
messages=[
Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
Message.from_text(text=task_input, role=Role.USER,),
]
)
return {"id": query_obj["id"], "queries": response.queries}
Let's run it! When using the Inputs Propagation, the startup parameters must be passed in the form of keywords at startup.
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj=query_obj) # using keyword parameter passing
query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30, query_obj: {'id': 'user_1', 'query': 'Q: What new developments have there been in RAG in the past year?', 'date': '2025-09-30'}
{'id': 'user_1',
'queries': ['What are the latest advancements in Retrieval-Augmented Generation (RAG) technologies as of 2025?',
'What new developments have emerged in RAG systems over the past 12 months?',
'How have RAG implementations evolved in the last year in terms of performance and scalability?',
'What are the key innovations in RAG models reported between 2024 and 2025?',
'What new techniques have been introduced in RAG to improve accuracy and context retention in the past year?',
'What recent breakthroughs in RAG have been highlighted in 2025?',
'How have industry leaders advanced RAG technology in the last year?',
'What are the most significant updates in RAG frameworks and tools from 2024 to 2025?',
'What new challenges and solutions have been proposed in RAG research over the past year?',
'What developments in RAG have improved real-time retrieval and generation performance in 2025?']} Among all the ways of parameter passing mentioned above, the priority order is: arguments mapping positional parameters > arguments injection > propagation > arguments mapping keyword parameters.