ConcurrentAutoma¶
Introduction¶
ConcurrentAutoma is an orchestration Automa in the Bridgic framework designed for concurrent execution of multiple workers. It allows you to execute multiple independent tasks simultaneously and aggregates all results into a list.
- Simplified Interface: Provides a simpler concurrent orchestration interface compared to
GraphAutoma - Concurrent Execution: All workers execute simultaneously, independent of each other
- Result Aggregation: Automatically aggregates all worker results into a list
Example¶
Let's learn how to use ConcurrentAutoma through a practical example. In the following example, we'll create a data fetcher that concurrently fetches data from multiple APIs.
In [5]:
Copied!
import asyncio
import time
import json
from bridgic.core.automa import worker
from bridgic.core.agentic import ConcurrentAutoma
# Define the data fetcher with concurrent workers
class DataFetcher(ConcurrentAutoma):
@worker()
async def fetch_from_api_1(self, query: str) -> str:
"""Fetch data from API-1"""
await asyncio.sleep(0.2)
return f"API-1 result: {query}"
@worker()
async def fetch_from_api_2(self, query: str) -> str:
"""Fetch data from API-2"""
await asyncio.sleep(0.2)
return f"API-2 result: {query}"
@worker()
def fetch_from_api_3(self, query: str) -> str:
"""Fetch data from API-3 (synchronous function)"""
time.sleep(0.2)
return f"API-3 result: {query}"
# Execute all workers concurrently
async def main():
fetcher = DataFetcher()
start_time = time.time()
results = await fetcher.arun(query="search keyword")
elapsed_time = time.time() - start_time
print(f"Time Cost: {elapsed_time:.2f} seconds")
print(f"Results: {json.dumps(results, indent=4)}")
await main()
import asyncio import time import json from bridgic.core.automa import worker from bridgic.core.agentic import ConcurrentAutoma # Define the data fetcher with concurrent workers class DataFetcher(ConcurrentAutoma): @worker() async def fetch_from_api_1(self, query: str) -> str: """Fetch data from API-1""" await asyncio.sleep(0.2) return f"API-1 result: {query}" @worker() async def fetch_from_api_2(self, query: str) -> str: """Fetch data from API-2""" await asyncio.sleep(0.2) return f"API-2 result: {query}" @worker() def fetch_from_api_3(self, query: str) -> str: """Fetch data from API-3 (synchronous function)""" time.sleep(0.2) return f"API-3 result: {query}" # Execute all workers concurrently async def main(): fetcher = DataFetcher() start_time = time.time() results = await fetcher.arun(query="search keyword") elapsed_time = time.time() - start_time print(f"Time Cost: {elapsed_time:.2f} seconds") print(f"Results: {json.dumps(results, indent=4)}") await main()
Time Cost: 0.20 seconds
Results: [
"API-1 result: search keyword",
"API-2 result: search keyword",
"API-3 result: search keyword"
]