SequentialAutoma¶
Introduction¶
SequentialAutoma is an orchestration Automa in the Bridgic framework designed for sequential execution of multiple workers. It ensures that workers execute in the order they are added, with each worker's output serving as the input for the next worker.
- Simplified Interface: Provides a simpler sequential orchestration interface compared to
GraphAutoma - Linear Workflow: Ensures an ordered, step-by-step processing flow
- Data Passing: The output of the previous worker is automatically passed to the next worker
Example¶
Let's learn how to use SequentialAutoma through practical examples. In the following example, we'll create a simple data processing pipeline where each step depends on the output of the previous step.
In [5]:
Copied!
from bridgic.core.automa import worker
from bridgic.core.agentic import SequentialAutoma
# Define the data processing pipeline
class DataProcessingPipeline(SequentialAutoma):
@worker()
def filter(self, data: list) -> list:
"""Step 1: Filter data (receives output from previous step)"""
print(f"Filtering data: {data}")
return [x for x in data if x % 2 == 0]
@worker()
def transform(self, data: list) -> list:
"""Step 2: Transform data (receives output from previous step)"""
print(f"Transforming data: {data}")
return [x * 2 for x in data]
@worker()
async def save(self, data: list) -> dict:
"""Step 3: Save data (receives output from previous step)"""
print(f"Saving data: {data}")
return {"saved": True, "count": len(data), "data": data}
# Execute all workers sequentially
async def main():
# Create SequentialAutoma and add workers
pipeline = DataProcessingPipeline()
result = await pipeline.arun(data=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(f"\nFinal result: {result}")
await main()
from bridgic.core.automa import worker from bridgic.core.agentic import SequentialAutoma # Define the data processing pipeline class DataProcessingPipeline(SequentialAutoma): @worker() def filter(self, data: list) -> list: """Step 1: Filter data (receives output from previous step)""" print(f"Filtering data: {data}") return [x for x in data if x % 2 == 0] @worker() def transform(self, data: list) -> list: """Step 2: Transform data (receives output from previous step)""" print(f"Transforming data: {data}") return [x * 2 for x in data] @worker() async def save(self, data: list) -> dict: """Step 3: Save data (receives output from previous step)""" print(f"Saving data: {data}") return {"saved": True, "count": len(data), "data": data} # Execute all workers sequentially async def main(): # Create SequentialAutoma and add workers pipeline = DataProcessingPipeline() result = await pipeline.arun(data=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) print(f"\nFinal result: {result}") await main()
Filtering data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Transforming data: [2, 4, 6, 8, 10]
Saving data: [4, 8, 12, 16, 20]
Final result: {'saved': True, 'count': 5, 'data': [4, 8, 12, 16, 20]}