Skip to content

mcp

The MCP module provides integration with the Model Context Protocol (MCP) for Bridgic.

This module enables Bridgic to connect to and interact with MCP servers, allowing:

  • Connection Management: Establish and manage connections to MCP servers via different transport protocols (stdio, streamable HTTP, etc.)
  • Tool Integration: Use MCP tools as callable tools within Bridgic agentic systems
  • Prompt Integration: Access and use prompts from MCP servers in Bridgic workflows
  • Worker Integration: Execute MCP tools through Bridgic's worker system
Core Components
  • McpServerConnection: Abstract base class for MCP server connections
  • McpServerConnectionStdio: Connection implementation using stdio transport
  • McpServerConnectionStreamableHttp: Connection implementation using streamable HTTP
  • McpServerConnectionManager: Manager for multiple MCP connections with shared event loop
  • McpToolSpec: Tool specification for MCP tools
  • McpToolSetBuilder: Builder for creating exclusive McpToolSpec instances for ReCentAutoma
  • McpToolWorker: Worker implementation for executing MCP tools
  • McpPromptTemplate: Prompt template implementation for MCP prompts

Examples:

Create a connection to an MCP server using stdio transport and list available tools:

>>> from bridgic.protocols.mcp import McpServerConnectionStdio
>>> from bridgic.core.automa import GraphAutoma
>>>
>>> # Create and connect to an MCP server
>>> connection = McpServerConnectionStdio(
...     name="my-mcp-server",
...     command="npx",
...     args=["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"],
... )
>>> connection.connect()
>>>
>>> # List available tools
>>> tools = connection.list_tools()
>>>
>>> # Use tools in an automa
>>> automa = GraphAutoma()
>>> for tool_spec in tools:
...     automa.add_worker(tool_spec.create_worker())

McpServerConnection

Bases: ABC

The abstract base class for Connection to an MCP server.

This class is responsible for establishing a connection to an MCP server and providing a session to interact with the server. The connection can be established using different transport protocols, which depends on the specific implementation.

Methods:

Name Description
connect

Establish a connection to an MCP server.

get_mcp_client

Get a MCP client to interact with the server.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class McpServerConnection(ABC):
    """
    The abstract base class for Connection to an MCP server.

    This class is responsible for establishing a connection to an MCP server and providing a 
    session to interact with the server. The connection can be established using different 
    transport protocols, which depends on the specific implementation.

    Methods
    -------
    connect
        Establish a connection to an MCP server.
    get_mcp_client
        Get a MCP client to interact with the server.
    """

    name: str
    """The name of the connected MCP server."""

    request_timeout: int
    """The timeout in seconds for the requests to the MCP server. Default is 30 seconds."""

    encoding: str
    """The encoding to use for the connection."""

    client_kwargs: Dict[str, Any]
    """The keyword arguments to pass to the MCP client."""

    _manager: McpServerConnectionManager
    _session: ClientSession
    _exit_stack: AsyncExitStack

    _lifecycle_task: asyncio.Task
    _connection_ready_event: asyncio.Event
    _connection_stop_event: asyncio.Event
    _connection_error: Exception

    is_connected: bool
    """Whether the connection is established."""

    def __init__(
        self,
        name: str,
        *,
        request_timeout: Optional[int] = None,
        **kwargs: Any,
    ):
        self.name = name
        self.request_timeout = request_timeout or 30
        self.client_kwargs = kwargs

        self._manager = None
        self._session = None
        self._exit_stack = None

        self._lifecycle_task = None
        self._connection_stop_event = asyncio.Event()
        self._connection_ready_event = asyncio.Event()

        self.is_connected = False
        self._connection_error = None

    def _get_manager(self) -> McpServerConnectionManager:
        if self._manager is None:
            manager = McpServerConnectionManager.get_instance()
            manager.register_connection(self)
            assert manager is self._manager
        return self._manager

    def connect(self):
        """
        Establish a connection to the MCP server. Call this method once before using the connection.

        If the connection is not registered in a specific manager explicitly, it will be registered
        in the default manager (manager_name="default-mcp-manager"). If the connection needs to be
        registered in a specific manager, the `connect` method should be called after the registration.

        Notes
        -----
        The event loop responsible for managing the session is determined at the time when `connect()` is called.
        Therefore, it is required to register the connection to the desired manager *before* calling `connect()`.
        Otherwise, the connection will be registered to the default manager. All registrations could not be changed later.

        Examples
        --------
        Create a connection to a streamable HTTP MCP server and register it to a manager:
        >>> connection = McpServerConnectionStreamableHttp(
        ...     name="streamable-http-server-connection",
        ...     url="http://localhost:8000",
        ...     request_timeout=5,
        ... )
        >>> manager = McpServerConnectionManager.get_instance("my-manager")
        >>> manager.register_connection(connection)
        >>> connection.connect()
        """
        if self.is_connected:
            return

        # Create events and lifecycle task in the manager's event loop.
        async def setup_and_wait():
            # Create events in the manager's event loop.
            self._connection_stop_event.clear()
            self._connection_ready_event.clear()
            self._connection_error = None

            # Create the lifecycle task.
            self._lifecycle_task = asyncio.create_task(self._lifecycle_task_coro())

            # Wait for connection to be ready
            await self._connection_ready_event.wait()

            # Check if there was an error
            if self._connection_error:
                raise self._connection_error

        # Setup the connection and make sure it could be closed within the same lifecycle task.
        self._get_manager().run_sync(
            coro=setup_and_wait(),
            timeout=self.request_timeout + 1,
        )

    def close(self):
        """
        Close the connection to the MCP server.
        """
        if not self.is_connected:
            return

        # Signal the lifecycle task to stop and wait for it to complete.
        async def stop_and_wait():
            if self._connection_stop_event is not None:
                self._connection_stop_event.set()

            if self._lifecycle_task is not None:
                try:
                    await self._lifecycle_task
                except Exception as e:
                    warnings.warn(f"Exception occurred while waiting for the lifecycle task to complete: {e}")
                finally:
                    self.is_connected = False
                    self._lifecycle_task = None
                    self._manager.unregister_connection(self)

        self._get_manager().run_sync(
            coro=stop_and_wait(),
            timeout=self.request_timeout + 1,
        )

    @abstractmethod
    def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
        """
        Get an MCP client.

        Returns
        -------
        _AsyncGeneratorContextManager[Any, None]
            An async context manager for the MCP client transport.
        """
        ...

    ###########################################################################
    # Protected methods that should be called within the dedicated event loop.
    ###########################################################################

    async def _lifecycle_task_coro(self):
        """
        The lifecycle task coroutine that manages the connection lifecycle.

        This coroutine runs in the manager's dedicated event loop and is responsible for:
        1. Establishing the connection to the MCP server
        2. Waiting for the stop event
        3. Closing the connection when the stop event is set

        This ensures that the connection is created and closed within the same Task,
        which is required by anyio's cancel scope mechanism.
        """
        try:
            # Establish connection
            self._exit_stack = AsyncExitStack()
            try:
                transport = await self._exit_stack.enter_async_context(self.get_mcp_client())
                session = await self._exit_stack.enter_async_context(
                    ClientSession(
                        read_stream=transport[0],
                        write_stream=transport[1],
                        read_timeout_seconds=timedelta(seconds=self.request_timeout),
                        # TODO : Callback will be added in the future to support more advanced features.
                        # message_handler=...,
                        # logging_callback=...,
                        # sampling_callback=...,
                    )
                )
                await session.initialize()
            except Exception as ex:
                session = None
                self._connection_error = McpServerConnectionError(
                    f"Failed to create session to MCP server: name={self.name}"
                ).with_traceback(ex.__traceback__)

            # Hold the connected session for later use.
            self._session = session
            self.is_connected = True if session is not None else False

            # Signal that the connection is ready.
            self._connection_ready_event.set()

            # Wait for the stop signal.
            await self._connection_stop_event.wait()
        finally:
            # At the final moment, the exit stack must be closed.
            if self._exit_stack is not None:
                await self._exit_stack.aclose()
                self._exit_stack = None

            self._session = None
            self.is_connected = False

    async def _list_prompts_unsafe(self) -> ListPromptsResult:
        """
        Asynchronously list the prompts from the MCP server.

        Since the session used to communicate with the MCP server is bound to a specific event 
        loop, this method should be called within the designated event loop for the connection.
        """
        if not self.is_connected or self._session is None:
            raise McpServerConnectionError(
                f"Connection to MCP server is not established: name={self.name}"
            )
        return await self._session.list_prompts()

    async def _get_prompt_unsafe(
        self,
        prompt_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> GetPromptResult:
        """
        Asynchronously get a prompt from the MCP server.

        Since the session used to communicate with the MCP server is bound to a specific event 
        loop, this method should be called within the designated event loop for the connection.
        """
        if not self.is_connected or self._session is None:
            raise McpServerConnectionError(
                f"Connection to MCP server is not established: name={self.name}"
            )
        return await self._session.get_prompt(name=prompt_name, arguments=arguments or {})

    async def _list_tools_unsafe(self) -> ListToolsResult:
        """
        Asynchronously list the tools from the MCP server.

        Since the session used to communicate with the MCP server is bound to a specific event 
        loop, this method should be called within the designated event loop for the connection.
        """
        if not self.is_connected or self._session is None:
            raise McpServerConnectionError(
                f"Connection to MCP server is not established: name={self.name}"
            )
        try:
            result = await self._session.list_tools()
        except McpError as e:
            is_timeout = False
            for content in ["timed out", "timeout"]:
                if content in str.lower(e.error.message):
                    result = ListToolsResult(tools=[])
                    is_timeout = True
                    warnings.warn(f"Timeout occurred while listing tools from MCP server: name={self.name}")
                    break
            if not is_timeout:
                raise e
        return result

    async def _call_tool_unsafe(
        self,
        tool_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> CallToolResult:
        """
        Asynchronously call a tool on the MCP server.

        Since the session used to communicate with the MCP server is bound to a specific event 
        loop, this method should be called within the designated event loop for the connection.
        """
        if not self.is_connected or self._session is None:
            raise McpServerConnectionError(
                f"Connection to MCP server is not established: name={self.name}"
            )
        return await self._session.call_tool(name=tool_name, arguments=arguments or {})

    ###########################################################################
    # Public methods that are safely wrapped and could be called anywhere.
    ###########################################################################

    def list_prompts(self) -> List["McpPromptTemplate"]:
        """
        List the prompts from the MCP server.

        Returns
        -------
        List[McpPromptTemplate]
            The list of prompt template instances from the server.
        """
        from bridgic.protocols.mcp._mcp_template import McpPromptTemplate

        result = self._get_manager().run_sync(
            coro=self._list_prompts_unsafe(),
            timeout=self.request_timeout + 1,
        )

        return [
            McpPromptTemplate(
                prompt_name=prompt.name,
                prompt_info=prompt,
                server_connection=self
            )
            for prompt in result.prompts
        ]

    async def alist_prompts(self) -> List["McpPromptTemplate"]:
        """
        Asynchronously list the prompts from the MCP server.

        Returns
        -------
        List[McpPromptTemplate]
            The list of prompt template instances from the server.
        """
        from bridgic.protocols.mcp._mcp_template import McpPromptTemplate

        result = await self._get_manager().run_async(
            coro=self._list_prompts_unsafe(),
            timeout=self.request_timeout + 1,
        )

        return [
            McpPromptTemplate(
                prompt_name=prompt.name,
                prompt_info=prompt,
                server_connection=self
            )
            for prompt in result.prompts
        ]

    def get_prompt(
        self,
        prompt_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> GetPromptResult:
        """
        Synchronously get a prompt from the MCP server.

        Parameters
        ----------
        prompt_name : str
            The name of the prompt to retrieve.
        arguments : Optional[Dict[str, Any]]
            Arguments to pass to the prompt.

        Returns
        -------
        GetPromptResult
            The prompt result from the server.

        Raises
        ------
        RuntimeError
            If the connection is not established.
        """
        return self._get_manager().run_sync(
            coro=self._get_prompt_unsafe(prompt_name=prompt_name, arguments=arguments),
            timeout=self.request_timeout + 1,
        )

    async def aget_prompt(
        self,
        prompt_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> GetPromptResult:
        """
        Asynchronously get a prompt from the MCP server.

        Parameters
        ----------
        prompt_name : str
            The name of the prompt to retrieve.
        arguments : Optional[Dict[str, Any]]
            Arguments to pass to the prompt.

        Returns
        -------
        GetPromptResult
            The prompt result from the server.

        Raises
        ------
        RuntimeError
            If the connection is not established.
        """
        return await self._get_manager().run_async(
            coro=self._get_prompt_unsafe(prompt_name=prompt_name, arguments=arguments),
            timeout=self.request_timeout + 1,
        )

    def list_tools(self) -> List["McpToolSpec"]:
        """
        List the tools from the MCP server.

        This method synchronously retrieves the list of tools available from the connected
        MCP server and wraps each tool in an `McpToolSpec` instance for use within the
        bridgic framework.

        Returns
        -------
        List[McpToolSpec]
            The list of tool specification instances from the server.

        Raises
        ------
        RuntimeError
            If the connection is not established and cannot be established.
        """
        from bridgic.protocols.mcp._mcp_tool_spec import McpToolSpec

        result = self._get_manager().run_sync(
            coro=self._list_tools_unsafe(),
            timeout=self.request_timeout + 1,
        )

        return [
            McpToolSpec(
                tool_name=tool.name,
                tool_info=tool,
                server_connection=self
            )
            for tool in result.tools
        ]

    async def alist_tools(self) -> List["McpToolSpec"]:
        """
        Asynchronously list the tools from the MCP server.

        This method asynchronously retrieves the list of tools available from the connected
        MCP server and wraps each tool in an `McpToolSpec` instance for use within the
        bridgic framework.

        Returns
        -------
        List[McpToolSpec]
            The list of tool specification instances from the server.

        Raises
        ------
        RuntimeError
            If the connection is not established and cannot be established.
        """
        from bridgic.protocols.mcp._mcp_tool_spec import McpToolSpec

        result = await self._get_manager().run_async(
            coro=self._list_tools_unsafe(),
            timeout=self.request_timeout + 1,
        )

        return [
            McpToolSpec(
                tool_name=tool.name,
                tool_info=tool,
                server_connection=self
            )
            for tool in result.tools
        ]

    def call_tool(
        self,
        tool_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> CallToolResult:
        """
        Synchronously call a tool on the MCP server.

        This method synchronously invokes a tool on the connected MCP server with the
        specified arguments and returns the result.

        Parameters
        ----------
        tool_name : str
            The name of the tool to call.
        arguments : Optional[Dict[str, Any]]
            The arguments to pass to the tool. If None, an empty dictionary will be used.

        Returns
        -------
        CallToolResult
            The result of the tool call from the server, containing content and optionally
            structured content.

        Raises
        ------
        RuntimeError
            If the connection is not established and cannot be established.
        """
        return self._get_manager().run_sync(
            coro=self._call_tool_unsafe(tool_name=tool_name, arguments=arguments),
            timeout=self.request_timeout + 1,
        )

    async def acall_tool(
        self,
        tool_name: str,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> CallToolResult:
        """
        Asynchronously call a tool on the MCP server.

        This method asynchronously invokes a tool on the connected MCP server with the
        specified arguments and returns the result.

        Parameters
        ----------
        tool_name : str
            The name of the tool to call.
        arguments : Optional[Dict[str, Any]]
            The arguments to pass to the tool. If None, an empty dictionary will be used.

        Returns
        -------
        CallToolResult
            The result of the tool call from the server, containing content and optionally
            structured content.

        Raises
        ------
        RuntimeError
            If the connection is not established and cannot be established.
        """
        return await self._get_manager().run_async(
            coro=self._call_tool_unsafe(tool_name=tool_name, arguments=arguments),
            timeout=self.request_timeout + 1,
        )

name instance-attribute

name: str = name

The name of the connected MCP server.

request_timeout instance-attribute

request_timeout: int = request_timeout or 30

The timeout in seconds for the requests to the MCP server. Default is 30 seconds.

encoding instance-attribute

encoding: str

The encoding to use for the connection.

client_kwargs instance-attribute

client_kwargs: Dict[str, Any] = kwargs

The keyword arguments to pass to the MCP client.

is_connected instance-attribute

is_connected: bool = False

Whether the connection is established.

connect

connect()

Establish a connection to the MCP server. Call this method once before using the connection.

If the connection is not registered in a specific manager explicitly, it will be registered in the default manager (manager_name="default-mcp-manager"). If the connection needs to be registered in a specific manager, the connect method should be called after the registration.

Notes

The event loop responsible for managing the session is determined at the time when connect() is called. Therefore, it is required to register the connection to the desired manager before calling connect(). Otherwise, the connection will be registered to the default manager. All registrations could not be changed later.

Examples:

Create a connection to a streamable HTTP MCP server and register it to a manager:

1
2
3
4
5
6
7
8
>>> connection = McpServerConnectionStreamableHttp(
...     name="streamable-http-server-connection",
...     url="http://localhost:8000",
...     request_timeout=5,
... )
>>> manager = McpServerConnectionManager.get_instance("my-manager")
>>> manager.register_connection(connection)
>>> connection.connect()
Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def connect(self):
    """
    Establish a connection to the MCP server. Call this method once before using the connection.

    If the connection is not registered in a specific manager explicitly, it will be registered
    in the default manager (manager_name="default-mcp-manager"). If the connection needs to be
    registered in a specific manager, the `connect` method should be called after the registration.

    Notes
    -----
    The event loop responsible for managing the session is determined at the time when `connect()` is called.
    Therefore, it is required to register the connection to the desired manager *before* calling `connect()`.
    Otherwise, the connection will be registered to the default manager. All registrations could not be changed later.

    Examples
    --------
    Create a connection to a streamable HTTP MCP server and register it to a manager:
    >>> connection = McpServerConnectionStreamableHttp(
    ...     name="streamable-http-server-connection",
    ...     url="http://localhost:8000",
    ...     request_timeout=5,
    ... )
    >>> manager = McpServerConnectionManager.get_instance("my-manager")
    >>> manager.register_connection(connection)
    >>> connection.connect()
    """
    if self.is_connected:
        return

    # Create events and lifecycle task in the manager's event loop.
    async def setup_and_wait():
        # Create events in the manager's event loop.
        self._connection_stop_event.clear()
        self._connection_ready_event.clear()
        self._connection_error = None

        # Create the lifecycle task.
        self._lifecycle_task = asyncio.create_task(self._lifecycle_task_coro())

        # Wait for connection to be ready
        await self._connection_ready_event.wait()

        # Check if there was an error
        if self._connection_error:
            raise self._connection_error

    # Setup the connection and make sure it could be closed within the same lifecycle task.
    self._get_manager().run_sync(
        coro=setup_and_wait(),
        timeout=self.request_timeout + 1,
    )

close

close()

Close the connection to the MCP server.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def close(self):
    """
    Close the connection to the MCP server.
    """
    if not self.is_connected:
        return

    # Signal the lifecycle task to stop and wait for it to complete.
    async def stop_and_wait():
        if self._connection_stop_event is not None:
            self._connection_stop_event.set()

        if self._lifecycle_task is not None:
            try:
                await self._lifecycle_task
            except Exception as e:
                warnings.warn(f"Exception occurred while waiting for the lifecycle task to complete: {e}")
            finally:
                self.is_connected = False
                self._lifecycle_task = None
                self._manager.unregister_connection(self)

    self._get_manager().run_sync(
        coro=stop_and_wait(),
        timeout=self.request_timeout + 1,
    )

get_mcp_client

abstractmethod
get_mcp_client() -> (
    _AsyncGeneratorContextManager[Any, None]
)

Get an MCP client.

Returns:

Type Description
_AsyncGeneratorContextManager[Any, None]

An async context manager for the MCP client transport.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
@abstractmethod
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
    """
    Get an MCP client.

    Returns
    -------
    _AsyncGeneratorContextManager[Any, None]
        An async context manager for the MCP client transport.
    """
    ...

list_prompts

list_prompts() -> List[McpPromptTemplate]

List the prompts from the MCP server.

Returns:

Type Description
List[McpPromptTemplate]

The list of prompt template instances from the server.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def list_prompts(self) -> List["McpPromptTemplate"]:
    """
    List the prompts from the MCP server.

    Returns
    -------
    List[McpPromptTemplate]
        The list of prompt template instances from the server.
    """
    from bridgic.protocols.mcp._mcp_template import McpPromptTemplate

    result = self._get_manager().run_sync(
        coro=self._list_prompts_unsafe(),
        timeout=self.request_timeout + 1,
    )

    return [
        McpPromptTemplate(
            prompt_name=prompt.name,
            prompt_info=prompt,
            server_connection=self
        )
        for prompt in result.prompts
    ]

alist_prompts

async
alist_prompts() -> List[McpPromptTemplate]

Asynchronously list the prompts from the MCP server.

Returns:

Type Description
List[McpPromptTemplate]

The list of prompt template instances from the server.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
async def alist_prompts(self) -> List["McpPromptTemplate"]:
    """
    Asynchronously list the prompts from the MCP server.

    Returns
    -------
    List[McpPromptTemplate]
        The list of prompt template instances from the server.
    """
    from bridgic.protocols.mcp._mcp_template import McpPromptTemplate

    result = await self._get_manager().run_async(
        coro=self._list_prompts_unsafe(),
        timeout=self.request_timeout + 1,
    )

    return [
        McpPromptTemplate(
            prompt_name=prompt.name,
            prompt_info=prompt,
            server_connection=self
        )
        for prompt in result.prompts
    ]

get_prompt

get_prompt(
    prompt_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> GetPromptResult

Synchronously get a prompt from the MCP server.

Parameters:

Name Type Description Default
prompt_name str

The name of the prompt to retrieve.

required
arguments Optional[Dict[str, Any]]

Arguments to pass to the prompt.

None

Returns:

Type Description
GetPromptResult

The prompt result from the server.

Raises:

Type Description
RuntimeError

If the connection is not established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def get_prompt(
    self,
    prompt_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> GetPromptResult:
    """
    Synchronously get a prompt from the MCP server.

    Parameters
    ----------
    prompt_name : str
        The name of the prompt to retrieve.
    arguments : Optional[Dict[str, Any]]
        Arguments to pass to the prompt.

    Returns
    -------
    GetPromptResult
        The prompt result from the server.

    Raises
    ------
    RuntimeError
        If the connection is not established.
    """
    return self._get_manager().run_sync(
        coro=self._get_prompt_unsafe(prompt_name=prompt_name, arguments=arguments),
        timeout=self.request_timeout + 1,
    )

aget_prompt

async
aget_prompt(
    prompt_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> GetPromptResult

Asynchronously get a prompt from the MCP server.

Parameters:

Name Type Description Default
prompt_name str

The name of the prompt to retrieve.

required
arguments Optional[Dict[str, Any]]

Arguments to pass to the prompt.

None

Returns:

Type Description
GetPromptResult

The prompt result from the server.

Raises:

Type Description
RuntimeError

If the connection is not established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
async def aget_prompt(
    self,
    prompt_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> GetPromptResult:
    """
    Asynchronously get a prompt from the MCP server.

    Parameters
    ----------
    prompt_name : str
        The name of the prompt to retrieve.
    arguments : Optional[Dict[str, Any]]
        Arguments to pass to the prompt.

    Returns
    -------
    GetPromptResult
        The prompt result from the server.

    Raises
    ------
    RuntimeError
        If the connection is not established.
    """
    return await self._get_manager().run_async(
        coro=self._get_prompt_unsafe(prompt_name=prompt_name, arguments=arguments),
        timeout=self.request_timeout + 1,
    )

list_tools

list_tools() -> List[McpToolSpec]

List the tools from the MCP server.

This method synchronously retrieves the list of tools available from the connected MCP server and wraps each tool in an McpToolSpec instance for use within the bridgic framework.

Returns:

Type Description
List[McpToolSpec]

The list of tool specification instances from the server.

Raises:

Type Description
RuntimeError

If the connection is not established and cannot be established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def list_tools(self) -> List["McpToolSpec"]:
    """
    List the tools from the MCP server.

    This method synchronously retrieves the list of tools available from the connected
    MCP server and wraps each tool in an `McpToolSpec` instance for use within the
    bridgic framework.

    Returns
    -------
    List[McpToolSpec]
        The list of tool specification instances from the server.

    Raises
    ------
    RuntimeError
        If the connection is not established and cannot be established.
    """
    from bridgic.protocols.mcp._mcp_tool_spec import McpToolSpec

    result = self._get_manager().run_sync(
        coro=self._list_tools_unsafe(),
        timeout=self.request_timeout + 1,
    )

    return [
        McpToolSpec(
            tool_name=tool.name,
            tool_info=tool,
            server_connection=self
        )
        for tool in result.tools
    ]

alist_tools

async
alist_tools() -> List[McpToolSpec]

Asynchronously list the tools from the MCP server.

This method asynchronously retrieves the list of tools available from the connected MCP server and wraps each tool in an McpToolSpec instance for use within the bridgic framework.

Returns:

Type Description
List[McpToolSpec]

The list of tool specification instances from the server.

Raises:

Type Description
RuntimeError

If the connection is not established and cannot be established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
async def alist_tools(self) -> List["McpToolSpec"]:
    """
    Asynchronously list the tools from the MCP server.

    This method asynchronously retrieves the list of tools available from the connected
    MCP server and wraps each tool in an `McpToolSpec` instance for use within the
    bridgic framework.

    Returns
    -------
    List[McpToolSpec]
        The list of tool specification instances from the server.

    Raises
    ------
    RuntimeError
        If the connection is not established and cannot be established.
    """
    from bridgic.protocols.mcp._mcp_tool_spec import McpToolSpec

    result = await self._get_manager().run_async(
        coro=self._list_tools_unsafe(),
        timeout=self.request_timeout + 1,
    )

    return [
        McpToolSpec(
            tool_name=tool.name,
            tool_info=tool,
            server_connection=self
        )
        for tool in result.tools
    ]

call_tool

call_tool(
    tool_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> CallToolResult

Synchronously call a tool on the MCP server.

This method synchronously invokes a tool on the connected MCP server with the specified arguments and returns the result.

Parameters:

Name Type Description Default
tool_name str

The name of the tool to call.

required
arguments Optional[Dict[str, Any]]

The arguments to pass to the tool. If None, an empty dictionary will be used.

None

Returns:

Type Description
CallToolResult

The result of the tool call from the server, containing content and optionally structured content.

Raises:

Type Description
RuntimeError

If the connection is not established and cannot be established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def call_tool(
    self,
    tool_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> CallToolResult:
    """
    Synchronously call a tool on the MCP server.

    This method synchronously invokes a tool on the connected MCP server with the
    specified arguments and returns the result.

    Parameters
    ----------
    tool_name : str
        The name of the tool to call.
    arguments : Optional[Dict[str, Any]]
        The arguments to pass to the tool. If None, an empty dictionary will be used.

    Returns
    -------
    CallToolResult
        The result of the tool call from the server, containing content and optionally
        structured content.

    Raises
    ------
    RuntimeError
        If the connection is not established and cannot be established.
    """
    return self._get_manager().run_sync(
        coro=self._call_tool_unsafe(tool_name=tool_name, arguments=arguments),
        timeout=self.request_timeout + 1,
    )

acall_tool

async
acall_tool(
    tool_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> CallToolResult

Asynchronously call a tool on the MCP server.

This method asynchronously invokes a tool on the connected MCP server with the specified arguments and returns the result.

Parameters:

Name Type Description Default
tool_name str

The name of the tool to call.

required
arguments Optional[Dict[str, Any]]

The arguments to pass to the tool. If None, an empty dictionary will be used.

None

Returns:

Type Description
CallToolResult

The result of the tool call from the server, containing content and optionally structured content.

Raises:

Type Description
RuntimeError

If the connection is not established and cannot be established.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
async def acall_tool(
    self,
    tool_name: str,
    arguments: Optional[Dict[str, Any]] = None,
) -> CallToolResult:
    """
    Asynchronously call a tool on the MCP server.

    This method asynchronously invokes a tool on the connected MCP server with the
    specified arguments and returns the result.

    Parameters
    ----------
    tool_name : str
        The name of the tool to call.
    arguments : Optional[Dict[str, Any]]
        The arguments to pass to the tool. If None, an empty dictionary will be used.

    Returns
    -------
    CallToolResult
        The result of the tool call from the server, containing content and optionally
        structured content.

    Raises
    ------
    RuntimeError
        If the connection is not established and cannot be established.
    """
    return await self._get_manager().run_async(
        coro=self._call_tool_unsafe(tool_name=tool_name, arguments=arguments),
        timeout=self.request_timeout + 1,
    )

McpServerConnectionStdio

Bases: McpServerConnection

The connection to an MCP server using stdio.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
class McpServerConnectionStdio(McpServerConnection):
    """
    The connection to an MCP server using stdio.
    """

    command: str
    """The command to use for the connection."""

    encoding: str
    """The encoding to use for the connection."""

    args: List[str]
    """The arguments to use for the connection."""

    env: Dict[str, str]
    """The environment variables to use for the connection."""

    def __init__(
        self,
        name: str,
        command: str,
        *,
        args: Optional[List[str]] = None,
        env: Optional[Dict[str, str]] = None,
        encoding: Optional[str] = None,
        request_timeout: Optional[int] = None,
        **kwargs: Any,
    ):
        super().__init__(
            name,
            request_timeout=request_timeout,
            **kwargs,
        )
        self.command = command
        self.encoding = encoding or "utf-8"
        self.args = args
        self.env = env

    def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
        """
        Get an MCP client transport for stdio.

        Returns
        -------
        _AsyncGeneratorContextManager[Any, None]
            An async context manager for the stdio client transport.
        """
        start_args = {
            "command": self.command,
            "args": self.args,
            "env": self.env,
        }
        if self.encoding:
            start_args["encoding"] = self.encoding
        if self.client_kwargs:
            start_args.update(self.client_kwargs)

        return stdio_client(server=StdioServerParameters(**start_args))

command instance-attribute

command: str = command

The command to use for the connection.

encoding instance-attribute

encoding: str = encoding or 'utf-8'

The encoding to use for the connection.

args instance-attribute

args: List[str] = args

The arguments to use for the connection.

env instance-attribute

env: Dict[str, str] = env

The environment variables to use for the connection.

get_mcp_client

get_mcp_client() -> (
    _AsyncGeneratorContextManager[Any, None]
)

Get an MCP client transport for stdio.

Returns:

Type Description
_AsyncGeneratorContextManager[Any, None]

An async context manager for the stdio client transport.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
    """
    Get an MCP client transport for stdio.

    Returns
    -------
    _AsyncGeneratorContextManager[Any, None]
        An async context manager for the stdio client transport.
    """
    start_args = {
        "command": self.command,
        "args": self.args,
        "env": self.env,
    }
    if self.encoding:
        start_args["encoding"] = self.encoding
    if self.client_kwargs:
        start_args.update(self.client_kwargs)

    return stdio_client(server=StdioServerParameters(**start_args))

McpServerConnectionStreamableHttp

Bases: McpServerConnection

The connection to an MCP server using streamable http.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
class McpServerConnectionStreamableHttp(McpServerConnection):
    """
    The connection to an MCP server using streamable http.
    """

    url: str
    """The URL of the MCP server."""

    http_client: httpx.AsyncClient
    """The HTTP client to use for the connection."""

    terminate_on_close: bool
    """Whether to terminate the session when the connection is closed."""

    def __init__(
        self,
        name: str,
        url: str,
        *,
        http_client: Optional[httpx.AsyncClient] = None,
        terminate_on_close: Optional[bool] = None,
        request_timeout: Optional[int] = None,
    ):
        """
        Initialize a streamable HTTP connection to an MCP server.

        Parameters
        ----------
        name : str
            The name of the connection.
        url : str
            The URL of the MCP server.
        http_client : Optional[httpx.AsyncClient]
            Optional pre-configured httpx.AsyncClient. If None, a default
            client with recommended MCP timeouts will be created. To configure headers,
            authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
        terminate_on_close : Optional[bool]
            If True, send a DELETE request to terminate the session when the connection
            is closed. Defaults to True.
        request_timeout : Optional[int]
            The timeout in seconds for MCP requests. Default is 30 seconds.
        """
        super().__init__(
            name,
            request_timeout=request_timeout,
        )
        self.url = url
        self.http_client = http_client
        self.terminate_on_close = terminate_on_close or True

    def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
        """
        Get an MCP client transport for streamable http.

        Returns
        -------
        _AsyncGeneratorContextManager[Any, None]
            An async context manager for the streamable HTTP client transport.
        """
        return streamable_http_client(
            url=self.url,
            http_client=self.http_client,
            terminate_on_close=self.terminate_on_close,
        )

url instance-attribute

url: str = url

The URL of the MCP server.

http_client instance-attribute

http_client: AsyncClient = http_client

The HTTP client to use for the connection.

terminate_on_close instance-attribute

terminate_on_close: bool = terminate_on_close or True

Whether to terminate the session when the connection is closed.

get_mcp_client

get_mcp_client() -> (
    _AsyncGeneratorContextManager[Any, None]
)

Get an MCP client transport for streamable http.

Returns:

Type Description
_AsyncGeneratorContextManager[Any, None]

An async context manager for the streamable HTTP client transport.

Source code in bridgic/protocols/mcp/_mcp_server_connection.py
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
    """
    Get an MCP client transport for streamable http.

    Returns
    -------
    _AsyncGeneratorContextManager[Any, None]
        An async context manager for the streamable HTTP client transport.
    """
    return streamable_http_client(
        url=self.url,
        http_client=self.http_client,
        terminate_on_close=self.terminate_on_close,
    )

McpServerConnectionManager

Manages multiple MCP server connections, sharing a single thread and event loop.

This manager ensures that all MCP operations run in a dedicated thread with its own event loop, avoiding issues with cross-thread event loop usage.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
class McpServerConnectionManager:
    """
    Manages multiple MCP server connections, sharing a single thread and event loop.

    This manager ensures that all MCP operations run in a dedicated thread with its own
    event loop, avoiding issues with cross-thread event loop usage.
    """

    _instances_lock: ClassVar[threading.Lock] = threading.Lock()
    _instances: ClassVar[Dict[str, "McpServerConnectionManager"]] = {}
    _connection_to_manager: ClassVar[Dict[str, str]] = {}

    _name: str

    _connections_lock: threading.Lock
    _connections: weakref.WeakValueDictionary[str, "McpServerConnection"]

    _thread: threading.Thread
    _loop: asyncio.AbstractEventLoop
    _shutdown: bool

    def __init__(self, name: str):
        self._name = name
        self._connections_lock = threading.Lock()
        self._connections = weakref.WeakValueDictionary()
        self._thread = None
        self._loop = None
        self._shutdown = False

        cls = type(self)

        # Automatically register this instance in the class-level registry.
        with cls._instances_lock:
            if name in cls._instances:
                raise ValueError(f"Manager with name '{name}' already exists")
            cls._instances[name] = self

    @classmethod
    def get_instance(cls, manager_name: str = "default-mcp-manager") -> "McpServerConnectionManager":
        """
        Get a manager instance by name, creating it if it doesn't exist.

        Parameters
        ----------
        manager_name : str
            The name of the manager instance to retrieve. Defaults to "default-mcp-manager".

        Returns
        -------
        McpServerConnectionManager
            The manager instance with the specified name.
        """
        if manager_name not in cls._instances:
            cls._instances[manager_name] = cls(name=manager_name)
        return cls._instances[manager_name]

    @classmethod
    def get_connection(cls, connection_name: str) -> Optional["McpServerConnection"]:
        """
        Get a connection by its name across all manager instances.

        It first finds the manager that owns the connection using the class-level 
        connection-to-manager mapping, then retrieves the connection from that manager.

        Parameters
        ----------
        connection_name : str
            The name of the connection to retrieve.

        Returns
        -------
        Optional[McpServerConnection]
            The connection with the specified name, or None if not found, the manager
            doesn't exist, or the connection has been garbage collected.

        Raises
        ------
        KeyError
            If the connection is not found.
        """
        manager_name = cls._connection_to_manager.get(connection_name)
        manager = cls.get_instance(manager_name) if manager_name is not None else None

        if manager is None:
            raise KeyError(
                f"McpServerConnectionManager-[{manager_name}] is not found: "
                f"connection_name={connection_name}"
            )

        return manager.get_connection_by_name(connection_name)

    def register_connection(self, connection: "McpServerConnection"):
        """
        Register a connection into the manager.

        Parameters
        ----------
        connection : McpServerConnection
            The connection to register.

        Raises
        ------
        McpServerConnectionError
            If a connection with the same name is already registered.
        """
        cls = type(self)
        if connection.name in cls._connection_to_manager:
            raise McpServerConnectionError(
                f"A connection with the name '{connection.name}' is already registered."
            )

        with cls._instances_lock:
            cls._connection_to_manager[connection.name] = self._name

        with self._connections_lock:
            self._connections[connection.name] = connection

        connection._manager = self


    def unregister_connection(self, connection: "McpServerConnection"):
        """
        Unregister a connection from the manager.

        Parameters
        ----------
        connection : McpServerConnection
            The connection to unregister.
        """
        with self._connections_lock:
            self._connections.pop(connection.name, None)

        cls = type(self)
        with cls._instances_lock:
            cls._connection_to_manager.pop(connection.name, None)

        connection._manager = None

    def get_connection_by_name(self, name: str) -> Optional["McpServerConnection"]:
        """
        Get a connection by its name from this manager instance.

        This method looks up a registered connection by its name within this manager.
        If the connection has been garbage collected, `weakref.WeakValueDictionary` will
        automatically remove it and None will be returned.

        Parameters
        ----------
        name : str
            The name of the connection to retrieve.

        Returns
        -------
        Optional[McpServerConnection]
            The connection with the specified name, or None if not found or has been
            garbage collected.

        Raises
        ------
        KeyError
            If the connection is not found.
        """
        if name not in self._connections:
            raise KeyError(
                f"Connection '{name}' is not found in McpServerConnectionManager-[{self._name}]"
            )

        return self._connections.get(name)

    def run_sync(
        self,
        coro: Coroutine[Any, Any, Any],
        timeout: Optional[float] = None,
    ) -> Any:
        """
        Submit a coroutine to the manager's event loop and wait for the result synchronously.

        This method blocks until the coroutine completes, suitable for use in synchronous contexts.

        Parameters
        ----------
        coro : Coroutine
            The coroutine to run.
        timeout : Optional[float]
            Timeout in seconds. If None, no timeout.

        Returns
        -------
        Any
            The result of the coroutine execution.

        Raises
        ------
        RuntimeError
            If the event loop is not running.
        TimeoutError
            If the coroutine execution times out.
        """
        self._ensure_loop_running()

        if self._loop is None or not self._loop.is_running():
            raise RuntimeError(f"Event loop is not running in McpServerConnectionManager-[{self._name}]")

        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        return future.result(timeout=timeout)

    async def run_async(
        self,
        coro: Coroutine[Any, Any, Any],
        timeout: Optional[float] = None,
    ) -> Any:
        """
        Submit a coroutine to the manager's event loop and await its result in a non-blocking way.

        This method submits the coroutine to the manager's dedicated event loop, and then 
        waits for its completion in a non-blocking way.

        Parameters
        ----------
        coro : Coroutine
            The coroutine to run.
        timeout : Optional[float]
            Timeout in seconds. If None, no timeout.

        Returns
        -------
        Any
            The result of the coroutine execution.

        Raises
        ------
        RuntimeError
            If the event loop is not running.
        TimeoutError
            If the coroutine execution times out.
        """
        self._ensure_loop_running()

        if self._loop is None or not self._loop.is_running():
            raise RuntimeError(f"Event loop is not running in McpServerConnectionManager-[{self._name}]")

        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        asyncio_future = asyncio.wrap_future(future)

        if timeout is not None:
            return await asyncio.wait_for(asyncio_future, timeout=timeout)
        else:
            return await asyncio_future

    def shutdown(self):
        """
        Shutdown the manager and stop the event loop.

        This method also removes the manager from the class-level registry and cleans up
        all connection-to-manager mappings for connections registered with this manager.
        """
        # Mark the manager is going to shutdown.
        self._shutdown = True

        # Stop the event loop as soon as possible.
        if self._loop is not None and self._loop.is_running():
            self._loop.call_soon_threadsafe(self._loop.stop)

        # Join the thread to wait for it to finish.
        if self._thread is not None:
            self._thread.join(timeout=5)

        self._loop = None
        self._thread = None

        cls = type(self)

        # Remove the registered item.
        with cls._instances_lock:
            # Remove the manager from the class-level registry.
            cls._instances.pop(self._name, None)

        # Clean up connection-to-manager mappings.
        conn_names_to_remove = [
            conn_name for conn_name, mgr_name in cls._connection_to_manager.items() if mgr_name == self._name
        ]
        with cls._instances_lock:
            for conn_name in conn_names_to_remove:
                cls._connection_to_manager.pop(conn_name, None)

    def _ensure_loop_running(self):
        """
        Ensure the manager's event loop is running in a dedicated thread.
        """
        def run_until_shutdown():
            asyncio.set_event_loop(self._loop)
            try:
                self._loop.run_forever()
            finally:
                self._loop.close()

        with self._connections_lock:
            if self._loop is not None and self._loop.is_running():
                return

            if self._shutdown:
                raise RuntimeError(f"McpServerConnectionManager-[{self._name}] has been shut down")

            self._loop = asyncio.new_event_loop()
            self._thread = threading.Thread(
                target=run_until_shutdown,
                daemon=True,
            )
            self._thread.start()

get_instance

classmethod
get_instance(
    manager_name: str = "default-mcp-manager",
) -> McpServerConnectionManager

Get a manager instance by name, creating it if it doesn't exist.

Parameters:

Name Type Description Default
manager_name str

The name of the manager instance to retrieve. Defaults to "default-mcp-manager".

'default-mcp-manager'

Returns:

Type Description
McpServerConnectionManager

The manager instance with the specified name.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
@classmethod
def get_instance(cls, manager_name: str = "default-mcp-manager") -> "McpServerConnectionManager":
    """
    Get a manager instance by name, creating it if it doesn't exist.

    Parameters
    ----------
    manager_name : str
        The name of the manager instance to retrieve. Defaults to "default-mcp-manager".

    Returns
    -------
    McpServerConnectionManager
        The manager instance with the specified name.
    """
    if manager_name not in cls._instances:
        cls._instances[manager_name] = cls(name=manager_name)
    return cls._instances[manager_name]

get_connection

classmethod
get_connection(
    connection_name: str,
) -> Optional[McpServerConnection]

Get a connection by its name across all manager instances.

It first finds the manager that owns the connection using the class-level connection-to-manager mapping, then retrieves the connection from that manager.

Parameters:

Name Type Description Default
connection_name str

The name of the connection to retrieve.

required

Returns:

Type Description
Optional[McpServerConnection]

The connection with the specified name, or None if not found, the manager doesn't exist, or the connection has been garbage collected.

Raises:

Type Description
KeyError

If the connection is not found.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
@classmethod
def get_connection(cls, connection_name: str) -> Optional["McpServerConnection"]:
    """
    Get a connection by its name across all manager instances.

    It first finds the manager that owns the connection using the class-level 
    connection-to-manager mapping, then retrieves the connection from that manager.

    Parameters
    ----------
    connection_name : str
        The name of the connection to retrieve.

    Returns
    -------
    Optional[McpServerConnection]
        The connection with the specified name, or None if not found, the manager
        doesn't exist, or the connection has been garbage collected.

    Raises
    ------
    KeyError
        If the connection is not found.
    """
    manager_name = cls._connection_to_manager.get(connection_name)
    manager = cls.get_instance(manager_name) if manager_name is not None else None

    if manager is None:
        raise KeyError(
            f"McpServerConnectionManager-[{manager_name}] is not found: "
            f"connection_name={connection_name}"
        )

    return manager.get_connection_by_name(connection_name)

register_connection

register_connection(connection: McpServerConnection)

Register a connection into the manager.

Parameters:

Name Type Description Default
connection McpServerConnection

The connection to register.

required

Raises:

Type Description
McpServerConnectionError

If a connection with the same name is already registered.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
def register_connection(self, connection: "McpServerConnection"):
    """
    Register a connection into the manager.

    Parameters
    ----------
    connection : McpServerConnection
        The connection to register.

    Raises
    ------
    McpServerConnectionError
        If a connection with the same name is already registered.
    """
    cls = type(self)
    if connection.name in cls._connection_to_manager:
        raise McpServerConnectionError(
            f"A connection with the name '{connection.name}' is already registered."
        )

    with cls._instances_lock:
        cls._connection_to_manager[connection.name] = self._name

    with self._connections_lock:
        self._connections[connection.name] = connection

    connection._manager = self

unregister_connection

unregister_connection(connection: McpServerConnection)

Unregister a connection from the manager.

Parameters:

Name Type Description Default
connection McpServerConnection

The connection to unregister.

required
Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
def unregister_connection(self, connection: "McpServerConnection"):
    """
    Unregister a connection from the manager.

    Parameters
    ----------
    connection : McpServerConnection
        The connection to unregister.
    """
    with self._connections_lock:
        self._connections.pop(connection.name, None)

    cls = type(self)
    with cls._instances_lock:
        cls._connection_to_manager.pop(connection.name, None)

    connection._manager = None

get_connection_by_name

get_connection_by_name(
    name: str,
) -> Optional[McpServerConnection]

Get a connection by its name from this manager instance.

This method looks up a registered connection by its name within this manager. If the connection has been garbage collected, weakref.WeakValueDictionary will automatically remove it and None will be returned.

Parameters:

Name Type Description Default
name str

The name of the connection to retrieve.

required

Returns:

Type Description
Optional[McpServerConnection]

The connection with the specified name, or None if not found or has been garbage collected.

Raises:

Type Description
KeyError

If the connection is not found.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
def get_connection_by_name(self, name: str) -> Optional["McpServerConnection"]:
    """
    Get a connection by its name from this manager instance.

    This method looks up a registered connection by its name within this manager.
    If the connection has been garbage collected, `weakref.WeakValueDictionary` will
    automatically remove it and None will be returned.

    Parameters
    ----------
    name : str
        The name of the connection to retrieve.

    Returns
    -------
    Optional[McpServerConnection]
        The connection with the specified name, or None if not found or has been
        garbage collected.

    Raises
    ------
    KeyError
        If the connection is not found.
    """
    if name not in self._connections:
        raise KeyError(
            f"Connection '{name}' is not found in McpServerConnectionManager-[{self._name}]"
        )

    return self._connections.get(name)

run_sync

run_sync(
    coro: Coroutine[Any, Any, Any],
    timeout: Optional[float] = None,
) -> Any

Submit a coroutine to the manager's event loop and wait for the result synchronously.

This method blocks until the coroutine completes, suitable for use in synchronous contexts.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run.

required
timeout Optional[float]

Timeout in seconds. If None, no timeout.

None

Returns:

Type Description
Any

The result of the coroutine execution.

Raises:

Type Description
RuntimeError

If the event loop is not running.

TimeoutError

If the coroutine execution times out.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
def run_sync(
    self,
    coro: Coroutine[Any, Any, Any],
    timeout: Optional[float] = None,
) -> Any:
    """
    Submit a coroutine to the manager's event loop and wait for the result synchronously.

    This method blocks until the coroutine completes, suitable for use in synchronous contexts.

    Parameters
    ----------
    coro : Coroutine
        The coroutine to run.
    timeout : Optional[float]
        Timeout in seconds. If None, no timeout.

    Returns
    -------
    Any
        The result of the coroutine execution.

    Raises
    ------
    RuntimeError
        If the event loop is not running.
    TimeoutError
        If the coroutine execution times out.
    """
    self._ensure_loop_running()

    if self._loop is None or not self._loop.is_running():
        raise RuntimeError(f"Event loop is not running in McpServerConnectionManager-[{self._name}]")

    future = asyncio.run_coroutine_threadsafe(coro, self._loop)
    return future.result(timeout=timeout)

run_async

async
run_async(
    coro: Coroutine[Any, Any, Any],
    timeout: Optional[float] = None,
) -> Any

Submit a coroutine to the manager's event loop and await its result in a non-blocking way.

This method submits the coroutine to the manager's dedicated event loop, and then waits for its completion in a non-blocking way.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run.

required
timeout Optional[float]

Timeout in seconds. If None, no timeout.

None

Returns:

Type Description
Any

The result of the coroutine execution.

Raises:

Type Description
RuntimeError

If the event loop is not running.

TimeoutError

If the coroutine execution times out.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
async def run_async(
    self,
    coro: Coroutine[Any, Any, Any],
    timeout: Optional[float] = None,
) -> Any:
    """
    Submit a coroutine to the manager's event loop and await its result in a non-blocking way.

    This method submits the coroutine to the manager's dedicated event loop, and then 
    waits for its completion in a non-blocking way.

    Parameters
    ----------
    coro : Coroutine
        The coroutine to run.
    timeout : Optional[float]
        Timeout in seconds. If None, no timeout.

    Returns
    -------
    Any
        The result of the coroutine execution.

    Raises
    ------
    RuntimeError
        If the event loop is not running.
    TimeoutError
        If the coroutine execution times out.
    """
    self._ensure_loop_running()

    if self._loop is None or not self._loop.is_running():
        raise RuntimeError(f"Event loop is not running in McpServerConnectionManager-[{self._name}]")

    future = asyncio.run_coroutine_threadsafe(coro, self._loop)
    asyncio_future = asyncio.wrap_future(future)

    if timeout is not None:
        return await asyncio.wait_for(asyncio_future, timeout=timeout)
    else:
        return await asyncio_future

shutdown

shutdown()

Shutdown the manager and stop the event loop.

This method also removes the manager from the class-level registry and cleans up all connection-to-manager mappings for connections registered with this manager.

Source code in bridgic/protocols/mcp/_mcp_server_connection_manager.py
def shutdown(self):
    """
    Shutdown the manager and stop the event loop.

    This method also removes the manager from the class-level registry and cleans up
    all connection-to-manager mappings for connections registered with this manager.
    """
    # Mark the manager is going to shutdown.
    self._shutdown = True

    # Stop the event loop as soon as possible.
    if self._loop is not None and self._loop.is_running():
        self._loop.call_soon_threadsafe(self._loop.stop)

    # Join the thread to wait for it to finish.
    if self._thread is not None:
        self._thread.join(timeout=5)

    self._loop = None
    self._thread = None

    cls = type(self)

    # Remove the registered item.
    with cls._instances_lock:
        # Remove the manager from the class-level registry.
        cls._instances.pop(self._name, None)

    # Clean up connection-to-manager mappings.
    conn_names_to_remove = [
        conn_name for conn_name, mgr_name in cls._connection_to_manager.items() if mgr_name == self._name
    ]
    with cls._instances_lock:
        for conn_name in conn_names_to_remove:
            cls._connection_to_manager.pop(conn_name, None)

McpToolSpec

Bases: ToolSpec

A tool specification that represents an MCP tool from a connected MCP server.

This class provides a bridge between MCP tools and the Bridgic framework, allowing MCP tools to be used seamlessly within Bridgic agentic systems.

Source code in bridgic/protocols/mcp/_mcp_tool_spec.py
class McpToolSpec(ToolSpec):
    """
    A tool specification that represents an MCP tool from a connected MCP server.

    This class provides a bridge between MCP tools and the Bridgic framework,
    allowing MCP tools to be used seamlessly within Bridgic agentic systems.
    """

    tool_info: McpTool
    """The raw MCP tool definition from the server."""

    _server_connection: McpServerConnection
    """The connection to the MCP server that provides this tool."""

    _server_connection_name: Optional[str]
    """The name of the server connection, used for lookup after deserialization."""

    def __init__(
        self,
        tool_name: str,
        tool_info: McpTool,
        server_connection: Union[str, McpServerConnection],
    ):
        super().__init__(
            tool_name=tool_name,
            tool_description=tool_info.description or "",
            tool_parameters=tool_info.inputSchema or {},
        )

        # Store the corresponding McpTool object.
        self.tool_info = tool_info

        # Try to associate with the real connection object.
        if isinstance(server_connection, str):
            self._server_connection = McpServerConnectionManager.get_connection(server_connection)
            self._server_connection_name = server_connection
        elif isinstance(server_connection, McpServerConnection):
            self._server_connection = server_connection
            self._server_connection_name = server_connection.name
        else:
            raise TypeError(f"Invalid type for server connection: {type(server_connection)}")

    @classmethod
    def from_raw(
        cls,
        tool_name: str,
        server_connection: Union[str, McpServerConnection],
    ) -> "McpToolSpec":
        """
        Create a McpToolSpec from a specified server connection and tool name.
        """
        # Try to associate with the real connection object.
        connection = None
        if isinstance(server_connection, str):
            connection = McpServerConnectionManager.get_connection(server_connection)
        elif isinstance(server_connection, McpServerConnection):
            connection = server_connection
        else:
            raise TypeError(f"Invalid type for server connection: {type(server_connection)}")

        # Use the connection to get the McpToolSpec object corresponding to tool_name
        all_tools = connection.list_tools()
        tool_spec = next((tool for tool in all_tools if tool.tool_name == tool_name), None)
        if tool_spec is None:
            raise ValueError(f"Cannot find tool '{tool_name}' in the provided server connection.")
        return tool_spec

    @property
    def server_connection(self) -> McpServerConnection:
        """
        Get the server connection, loading it from the server connection manager if necessary.

        This property implements lazy loading of the server connection. If the connection 
        is not available (e.g., after deserialization), it will be retrieved from the 
        server connection manager by its name.

        Returns
        -------
        McpServerConnection
            The server connection instance.

        Raises
        ------
        McpServerConnectionError
            If the connection cannot be found in the manager.
        """
        if self._server_connection is None:
            if self._server_connection_name is None:
                raise McpServerConnectionError(
                    f"Cannot load server connection for McpToolSpec '{self._tool_name}': "
                    f"connection name is not available."
                )

            try:
                connection = McpServerConnectionManager.get_connection(self._server_connection_name)
            except KeyError as e:
                raise McpServerConnectionError(
                    f"Failed to load the server connection for McpToolSpec \"{self._tool_name}\", because the "
                    f"connection named \"{self._server_connection_name}\" was not found in any connection manager. "
                    f"You must create a McpServerConnection with name \"{self._server_connection_name}\" and "
                    f"ensure it is properly registered in a connection manager before using this tool spec."
                ) from e

            self._server_connection = connection

        return self._server_connection

    @override
    def to_tool(self) -> Tool:
        """
        Transform this McpToolSpec to a `Tool` object used by LLM.

        Returns
        -------
        Tool
            A `Tool` object that can be used by LLM for tool selection.
        """
        return Tool(
            name=self._tool_name,
            description=self._tool_description,
            parameters=self._tool_parameters
        )

    @override
    def create_worker(self) -> Worker:
        """
        Create a Worker from the information included in this McpToolSpec.

        Returns
        -------
        Worker
            A new `McpToolWorker` object that can be added to an Automa to execute the tool.
        """
        from bridgic.protocols.mcp._mcp_tool_worker import McpToolWorker
        return McpToolWorker(
            tool_name=self._tool_name,
            server_connection=self.server_connection,
        )

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["server_connection_name"] = self._server_connection_name
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        connection_name = state_dict.get("server_connection_name", None)

        if connection_name is None:
            raise McpServerConnectionError(
                f"Cannot load server connection for McpToolSpec \"{self._tool_name}\", because "
                f"its connection name is not available."
            )

        # Try to reload the server connection from the global manager.
        self._server_connection = McpServerConnectionManager.get_connection(connection_name)
        self._server_connection_name = connection_name

        # Load tool_info from the real connection.
        all_tools = self.server_connection.list_tools()
        reload_tool_spec = next((tool for tool in all_tools if tool.tool_name == self._tool_name), None)
        if reload_tool_spec is None:
            raise RuntimeError(
                f"Cannot restore McpToolSpec because the tool is not found on server: "
                f"server_connection_name=\"{connection_name}\", tool_name=\"{self._tool_name}\"."
            )
        self.tool_info = reload_tool_spec.tool_info

tool_info instance-attribute

tool_info: Tool = tool_info

The raw MCP tool definition from the server.

server_connection property

server_connection: McpServerConnection

Get the server connection, loading it from the server connection manager if necessary.

This property implements lazy loading of the server connection. If the connection is not available (e.g., after deserialization), it will be retrieved from the server connection manager by its name.

Returns:

Type Description
McpServerConnection

The server connection instance.

Raises:

Type Description
McpServerConnectionError

If the connection cannot be found in the manager.

from_raw

classmethod
from_raw(
    tool_name: str,
    server_connection: Union[str, McpServerConnection],
) -> McpToolSpec

Create a McpToolSpec from a specified server connection and tool name.

Source code in bridgic/protocols/mcp/_mcp_tool_spec.py
@classmethod
def from_raw(
    cls,
    tool_name: str,
    server_connection: Union[str, McpServerConnection],
) -> "McpToolSpec":
    """
    Create a McpToolSpec from a specified server connection and tool name.
    """
    # Try to associate with the real connection object.
    connection = None
    if isinstance(server_connection, str):
        connection = McpServerConnectionManager.get_connection(server_connection)
    elif isinstance(server_connection, McpServerConnection):
        connection = server_connection
    else:
        raise TypeError(f"Invalid type for server connection: {type(server_connection)}")

    # Use the connection to get the McpToolSpec object corresponding to tool_name
    all_tools = connection.list_tools()
    tool_spec = next((tool for tool in all_tools if tool.tool_name == tool_name), None)
    if tool_spec is None:
        raise ValueError(f"Cannot find tool '{tool_name}' in the provided server connection.")
    return tool_spec

to_tool

to_tool() -> Tool

Transform this McpToolSpec to a Tool object used by LLM.

Returns:

Type Description
Tool

A Tool object that can be used by LLM for tool selection.

Source code in bridgic/protocols/mcp/_mcp_tool_spec.py
@override
def to_tool(self) -> Tool:
    """
    Transform this McpToolSpec to a `Tool` object used by LLM.

    Returns
    -------
    Tool
        A `Tool` object that can be used by LLM for tool selection.
    """
    return Tool(
        name=self._tool_name,
        description=self._tool_description,
        parameters=self._tool_parameters
    )

create_worker

create_worker() -> Worker

Create a Worker from the information included in this McpToolSpec.

Returns:

Type Description
Worker

A new McpToolWorker object that can be added to an Automa to execute the tool.

Source code in bridgic/protocols/mcp/_mcp_tool_spec.py
@override
def create_worker(self) -> Worker:
    """
    Create a Worker from the information included in this McpToolSpec.

    Returns
    -------
    Worker
        A new `McpToolWorker` object that can be added to an Automa to execute the tool.
    """
    from bridgic.protocols.mcp._mcp_tool_worker import McpToolWorker
    return McpToolWorker(
        tool_name=self._tool_name,
        server_connection=self.server_connection,
    )

McpToolSetBuilder

Bases: ToolSetBuilder

A builder for creating exclusive McpToolSpec instances from a newly created MCP server connection.

This builder creates a new MCP server connection from scratch, ensuring that the connection is exclusive and not shared with other instances. This is important for stateful connections that should not be shared to different owners.

Examples:

Create a builder for stdio connection and build a tool set:

1
2
3
4
5
6
>>> builder = McpToolSetBuilder.stdio(
...     command="npx",
...     args=["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"],
...     tool_names=["read_file", "write_file"]  # Optional: filter specific tools
... )
>>> tool_set = builder.build()["tool_specs"]

Create a builder for streamable HTTP connection and build a tool set:

1
2
3
4
5
>>> builder = McpToolSetBuilder.streamable_http(
...     url="http://localhost:8000",
...     tool_names=["get_weather"]  # Optional: filter specific tools
... )
>>> tool_set = builder.build()["tool_specs"]
Source code in bridgic/protocols/mcp/_mcp_tool_set_builder.py
class McpToolSetBuilder(ToolSetBuilder):
    """
    A builder for creating exclusive `McpToolSpec` instances from a newly created MCP server connection.

    This builder creates a new MCP server connection from scratch, ensuring that the connection is 
    exclusive and not shared with other instances. This is important for stateful connections that 
    should not be shared to different owners.

    Examples
    --------
    Create a builder for stdio connection and build a tool set:
    >>> builder = McpToolSetBuilder.stdio(
    ...     command="npx",
    ...     args=["-y", "@modelcontextprotocol/server-filesystem", "/path/to/dir"],
    ...     tool_names=["read_file", "write_file"]  # Optional: filter specific tools
    ... )
    >>> tool_set = builder.build()["tool_specs"]

    Create a builder for streamable HTTP connection and build a tool set:
    >>> builder = McpToolSetBuilder.streamable_http(
    ...     url="http://localhost:8000",
    ...     tool_names=["get_weather"]  # Optional: filter specific tools
    ... )
    >>> tool_set = builder.build()["tool_specs"]
    """

    _connection_type: Literal[McpServerConnectionType.STDIO, McpServerConnectionType.STREAMABLE_HTTP]
    """The type of connection: 'stdio' or 'streamable_http'."""

    _connection_config: Dict[str, Any]
    """Configuration parameters for creating the connection."""

    _tool_names: Optional[List[str]]
    """Optional list of tool names to include. If None, all tools will be included."""

    def __init__(
        self,
        connection_type: str,
        connection_config: Dict[str, Any],
        tool_names: Optional[List[str]] = None,
    ):
        """
        Initialize the McpToolSetBuilder.

        Parameters
        ----------
        connection_type : str
            The type of connection: 'stdio' or 'streamable_http'.
        connection_config : Dict[str, Any]
            Configuration parameters for creating the connection.
            - For stdio connections, this should include 'command', and optionally 'args', 'env', 'encoding', 'request_timeout'.
            - For streamable_http connections, this should include 'url', and optionally 'http_client_config', 'terminate_on_close', 'request_timeout'.
        tool_names : Optional[List[str]]
            Optional list of tool names to include.
            If None, all available tools from the connection will be included.
        """
        if connection_type not in (McpServerConnectionType.STDIO, McpServerConnectionType.STREAMABLE_HTTP):
            raise ValueError(
                f"Invalid connection_type: {connection_type}. "
                f"Expected 'stdio' or 'streamable_http'."
            )

        self._connection_type = connection_type
        self._connection_config = connection_config
        self._tool_names = tool_names

    @classmethod
    def stdio(
        cls,
        command: str,
        *,
        args: Optional[List[str]] = None,
        env: Optional[Dict[str, str]] = None,
        encoding: Optional[str] = None,
        request_timeout: Optional[int] = None,
        tool_names: Optional[List[str]] = None,
    ) -> "McpToolSetBuilder":
        """
        Create a builder for a stdio-based MCP server connection.

        Parameters
        ----------
        command : str
            The command to use for the connection (e.g., "npx", "python").
        args : Optional[List[str]]
            The arguments to pass to the command.
        env : Optional[Dict[str, str]]
            Environment variables to set for the process.
        encoding : Optional[str]
            The encoding to use for the connection. Defaults to "utf-8".
        request_timeout : Optional[int]
            The timeout in seconds for MCP requests. Default is 30 seconds.
        tool_names : Optional[List[str]]
            Optional list of tool names to include. If None, all available tools will be included.

        Returns
        -------
        McpToolSetBuilder
            A builder instance configured for stdio connection.
        """
        connection_config = {
            "command": command,
        }
        if args is not None:
            connection_config["args"] = args
        if env is not None:
            connection_config["env"] = env
        if encoding is not None:
            connection_config["encoding"] = encoding
        if request_timeout is not None:
            connection_config["request_timeout"] = request_timeout

        return cls(
            connection_type="stdio",
            connection_config=connection_config,
            tool_names=tool_names,
        )

    @classmethod
    def streamable_http(
        cls,
        url: str,
        *,
        http_client_config: Optional[HttpClientConfig] = None,
        terminate_on_close: Optional[bool] = None,
        request_timeout: Optional[int] = None,
        tool_names: Optional[List[str]] = None,
    ) -> "McpToolSetBuilder":
        """
        Create a builder for a streamable HTTP-based MCP server connection.

        Parameters
        ----------
        url : str
            The URL of the MCP server.
        http_client_config : Optional[HttpClientConfig]
            Optional configuration for creating the HTTP client.
            If None, a default client will be created with MCP defaults.
        terminate_on_close : Optional[bool]
            If True, send a DELETE request to terminate the session when the connection
            is closed. Defaults to True.
        request_timeout : Optional[int]
            The timeout in seconds for MCP requests. Default is 30 seconds.
        tool_names : Optional[List[str]]
            Optional list of tool names to include. If None, all available tools will be included.

        Returns
        -------
        McpToolSetBuilder
            A builder instance configured for streamable HTTP connection.
        """
        connection_config = {
            "url": url,
        }
        if http_client_config is not None:
            connection_config["http_client_config"] = http_client_config
        if terminate_on_close is not None:
            connection_config["terminate_on_close"] = terminate_on_close
        if request_timeout is not None:
            connection_config["request_timeout"] = request_timeout

        return cls(
            connection_type="streamable_http",
            connection_config=connection_config,
            tool_names=tool_names,
        )

    @override
    def build(self) -> ToolSetResponse:
        """
        Build and return McpToolSpec instances from a newly created server connection.

        This method creates a new connection on each call, ensuring that each call gets its own 
        exclusive connection instance. The builder acts as a factory for creating connections.

        Returns
        -------
        ToolSetResponse
            A response containing the list of McpToolSpec instances with `_from_builder=True`,
            along with optional extras.

        Raises
        ------
        McpServerConnectionError
            If the connection cannot be created or accessed.
        RuntimeError
            If the connection fails to establish.
        """
        # Create a new connection for this build call.
        connection = self._create_connection()


        # Get all available tools from the connection.
        all_tool_specs = connection.list_tools()

        # Filter by tool_names if specified.
        if self._tool_names is not None:
            tool_specs = [
                tool_spec for tool_spec in all_tool_specs
                if tool_spec.tool_name in self._tool_names
            ]

            # Check if all requested tools were found.
            found_tool_names = {tool_spec.tool_name for tool_spec in tool_specs}
            missing_tool_names = set(self._tool_names) - found_tool_names
            if missing_tool_names:
                raise ValueError(
                    f"The following tools were not found in the MCP server connection "
                    f"'{connection.name}': {sorted(missing_tool_names)}"
                )
        else:
            tool_specs = all_tool_specs

        # Mark all tool specs as from builder.
        for tool_spec in tool_specs:
            tool_spec._from_builder = True

        return ToolSetResponse(
            tool_specs=tool_specs,
            extras={
                "mcp_server_connection_name": connection.name,
            }
        )

    def _create_connection(self) -> McpServerConnection:
        """
        Create and connect a new MCP server connection.

        Each call to this method creates a new connection with a unique name, allowing the 
        builder to act as a factory for creating multiple connection instances.

        Returns
        -------
        McpServerConnection
            The created and connected connection instance.
        """
        # Generate a unique connection name for each connection instance
        connection_name = f"mcp-builder-{uuid.uuid4().hex[:8]}"

        # Create connection based on type
        if self._connection_type == McpServerConnectionType.STDIO:
            connection = McpServerConnectionStdio(
                name=connection_name,
                **self._connection_config,
            )
        elif self._connection_type == McpServerConnectionType.STREAMABLE_HTTP:
            # Copy connection config to avoid modifying the original config.
            connection_config = self._connection_config.copy()

            # Pop http_client_config because the real initialization of the connection doesn't need it.
            http_client_config: HttpClientConfig = connection_config.pop("http_client_config", None)

            if not http_client_config:
                http_client = None
            else:
                if "timeout" not in http_client_config or http_client_config["timeout"] is None:
                    http_client_config["timeout"] = {
                        "default": MCP_DEFAULT_TIMEOUT,
                        "read": MCP_DEFAULT_SSE_READ_TIMEOUT,
                    }
                http_client = create_http_client_from_config(http_client_config.copy(), is_async=True)

            connection = McpServerConnectionStreamableHttp(
                name=connection_name,
                http_client=http_client,
                **connection_config,
            )
        else:
            raise ValueError(f"Unknown connection type: {self._connection_type}")

        # Register and connect
        manager = McpServerConnectionManager.get_instance()
        manager.register_connection(connection)
        connection.connect()

        return connection

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = {
            "connection_type": self._connection_type,
            "connection_config": self._connection_config,
        }
        if self._tool_names is not None:
            state_dict["tool_names"] = self._tool_names
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        self._connection_type = state_dict["connection_type"]
        self._connection_config = state_dict["connection_config"]
        self._tool_names = state_dict.get("tool_names")

stdio

classmethod
stdio(
    command: str,
    *,
    args: Optional[List[str]] = None,
    env: Optional[Dict[str, str]] = None,
    encoding: Optional[str] = None,
    request_timeout: Optional[int] = None,
    tool_names: Optional[List[str]] = None
) -> McpToolSetBuilder

Create a builder for a stdio-based MCP server connection.

Parameters:

Name Type Description Default
command str

The command to use for the connection (e.g., "npx", "python").

required
args Optional[List[str]]

The arguments to pass to the command.

None
env Optional[Dict[str, str]]

Environment variables to set for the process.

None
encoding Optional[str]

The encoding to use for the connection. Defaults to "utf-8".

None
request_timeout Optional[int]

The timeout in seconds for MCP requests. Default is 30 seconds.

None
tool_names Optional[List[str]]

Optional list of tool names to include. If None, all available tools will be included.

None

Returns:

Type Description
McpToolSetBuilder

A builder instance configured for stdio connection.

Source code in bridgic/protocols/mcp/_mcp_tool_set_builder.py
@classmethod
def stdio(
    cls,
    command: str,
    *,
    args: Optional[List[str]] = None,
    env: Optional[Dict[str, str]] = None,
    encoding: Optional[str] = None,
    request_timeout: Optional[int] = None,
    tool_names: Optional[List[str]] = None,
) -> "McpToolSetBuilder":
    """
    Create a builder for a stdio-based MCP server connection.

    Parameters
    ----------
    command : str
        The command to use for the connection (e.g., "npx", "python").
    args : Optional[List[str]]
        The arguments to pass to the command.
    env : Optional[Dict[str, str]]
        Environment variables to set for the process.
    encoding : Optional[str]
        The encoding to use for the connection. Defaults to "utf-8".
    request_timeout : Optional[int]
        The timeout in seconds for MCP requests. Default is 30 seconds.
    tool_names : Optional[List[str]]
        Optional list of tool names to include. If None, all available tools will be included.

    Returns
    -------
    McpToolSetBuilder
        A builder instance configured for stdio connection.
    """
    connection_config = {
        "command": command,
    }
    if args is not None:
        connection_config["args"] = args
    if env is not None:
        connection_config["env"] = env
    if encoding is not None:
        connection_config["encoding"] = encoding
    if request_timeout is not None:
        connection_config["request_timeout"] = request_timeout

    return cls(
        connection_type="stdio",
        connection_config=connection_config,
        tool_names=tool_names,
    )

streamable_http

classmethod
streamable_http(
    url: str,
    *,
    http_client_config: Optional[HttpClientConfig] = None,
    terminate_on_close: Optional[bool] = None,
    request_timeout: Optional[int] = None,
    tool_names: Optional[List[str]] = None
) -> McpToolSetBuilder

Create a builder for a streamable HTTP-based MCP server connection.

Parameters:

Name Type Description Default
url str

The URL of the MCP server.

required
http_client_config Optional[HttpClientConfig]

Optional configuration for creating the HTTP client. If None, a default client will be created with MCP defaults.

None
terminate_on_close Optional[bool]

If True, send a DELETE request to terminate the session when the connection is closed. Defaults to True.

None
request_timeout Optional[int]

The timeout in seconds for MCP requests. Default is 30 seconds.

None
tool_names Optional[List[str]]

Optional list of tool names to include. If None, all available tools will be included.

None

Returns:

Type Description
McpToolSetBuilder

A builder instance configured for streamable HTTP connection.

Source code in bridgic/protocols/mcp/_mcp_tool_set_builder.py
@classmethod
def streamable_http(
    cls,
    url: str,
    *,
    http_client_config: Optional[HttpClientConfig] = None,
    terminate_on_close: Optional[bool] = None,
    request_timeout: Optional[int] = None,
    tool_names: Optional[List[str]] = None,
) -> "McpToolSetBuilder":
    """
    Create a builder for a streamable HTTP-based MCP server connection.

    Parameters
    ----------
    url : str
        The URL of the MCP server.
    http_client_config : Optional[HttpClientConfig]
        Optional configuration for creating the HTTP client.
        If None, a default client will be created with MCP defaults.
    terminate_on_close : Optional[bool]
        If True, send a DELETE request to terminate the session when the connection
        is closed. Defaults to True.
    request_timeout : Optional[int]
        The timeout in seconds for MCP requests. Default is 30 seconds.
    tool_names : Optional[List[str]]
        Optional list of tool names to include. If None, all available tools will be included.

    Returns
    -------
    McpToolSetBuilder
        A builder instance configured for streamable HTTP connection.
    """
    connection_config = {
        "url": url,
    }
    if http_client_config is not None:
        connection_config["http_client_config"] = http_client_config
    if terminate_on_close is not None:
        connection_config["terminate_on_close"] = terminate_on_close
    if request_timeout is not None:
        connection_config["request_timeout"] = request_timeout

    return cls(
        connection_type="streamable_http",
        connection_config=connection_config,
        tool_names=tool_names,
    )

build

build() -> ToolSetResponse

Build and return McpToolSpec instances from a newly created server connection.

This method creates a new connection on each call, ensuring that each call gets its own exclusive connection instance. The builder acts as a factory for creating connections.

Returns:

Type Description
ToolSetResponse

A response containing the list of McpToolSpec instances with _from_builder=True, along with optional extras.

Raises:

Type Description
McpServerConnectionError

If the connection cannot be created or accessed.

RuntimeError

If the connection fails to establish.

Source code in bridgic/protocols/mcp/_mcp_tool_set_builder.py
@override
def build(self) -> ToolSetResponse:
    """
    Build and return McpToolSpec instances from a newly created server connection.

    This method creates a new connection on each call, ensuring that each call gets its own 
    exclusive connection instance. The builder acts as a factory for creating connections.

    Returns
    -------
    ToolSetResponse
        A response containing the list of McpToolSpec instances with `_from_builder=True`,
        along with optional extras.

    Raises
    ------
    McpServerConnectionError
        If the connection cannot be created or accessed.
    RuntimeError
        If the connection fails to establish.
    """
    # Create a new connection for this build call.
    connection = self._create_connection()


    # Get all available tools from the connection.
    all_tool_specs = connection.list_tools()

    # Filter by tool_names if specified.
    if self._tool_names is not None:
        tool_specs = [
            tool_spec for tool_spec in all_tool_specs
            if tool_spec.tool_name in self._tool_names
        ]

        # Check if all requested tools were found.
        found_tool_names = {tool_spec.tool_name for tool_spec in tool_specs}
        missing_tool_names = set(self._tool_names) - found_tool_names
        if missing_tool_names:
            raise ValueError(
                f"The following tools were not found in the MCP server connection "
                f"'{connection.name}': {sorted(missing_tool_names)}"
            )
    else:
        tool_specs = all_tool_specs

    # Mark all tool specs as from builder.
    for tool_spec in tool_specs:
        tool_spec._from_builder = True

    return ToolSetResponse(
        tool_specs=tool_specs,
        extras={
            "mcp_server_connection_name": connection.name,
        }
    )

McpToolWorker

Bases: Worker

A worker that executes an MCP tool on a connected MCP server.

This worker receives tool arguments as keyword arguments and calls the corresponding tool on the MCP server, returning the result.

Source code in bridgic/protocols/mcp/_mcp_tool_worker.py
class McpToolWorker(Worker):
    """
    A worker that executes an MCP tool on a connected MCP server.

    This worker receives tool arguments as keyword arguments and calls the
    corresponding tool on the MCP server, returning the result.
    """

    _tool_name: str
    """The name of the MCP tool to call."""

    _server_connection: Optional[McpServerConnection]
    """The connection to the MCP server that provides this tool."""

    _server_connection_name: Optional[str]
    """The name of the server connection, used for lookup after deserialization."""

    def __init__(
        self,
        tool_name: str,
        server_connection: Union[str, McpServerConnection],
    ):
        super().__init__()
        self._tool_name = tool_name

        # Try to associate with the real connection object.
        if isinstance(server_connection, str):
            self._server_connection = McpServerConnectionManager.get_connection(server_connection)
            self._server_connection_name = server_connection
        elif isinstance(server_connection, McpServerConnection):
            self._server_connection = server_connection
            self._server_connection_name = server_connection.name
        else:
            raise TypeError(f"Invalid type for server connection: {type(server_connection)}")

    @property
    def tool_name(self) -> str:
        """Get the name of the tool."""
        return self._tool_name

    @property
    def server_connection(self) -> McpServerConnection:
        """
        Get the server connection, loading it from the server connection manager if necessary.

        This property implements lazy loading of the server connection. If the connection 
        is not available (e.g., after deserialization), it will be retrieved from the 
        server connection manager by its name.

        Returns
        -------
        McpServerConnection
            The server connection instance.

        Raises
        ------
        McpServerConnectionError
            If the connection cannot be found in the manager.
        """
        if self._server_connection is None:
            if self._server_connection_name is None:
                raise McpServerConnectionError(
                    f"Cannot load server connection for McpToolWorker '{self._tool_name}': "
                    f"connection name is not available."
                )

            try:
                connection = McpServerConnectionManager.get_connection(self._server_connection_name)
            except KeyError as e:
                raise McpServerConnectionError(
                    f"Failed to load the server connection for McpToolWorker \"{self._tool_name}\", because the "
                    f"connection named \"{self._server_connection_name}\" was not found in any connection manager. "
                    f"You must create a McpServerConnection with name \"{self._server_connection_name}\" and "
                    f"ensure it is properly registered in a connection manager before using this worker."
                ) from e

            self._server_connection = connection

        return self._server_connection

    async def arun(self, **kwargs: Dict[str, Any]) -> CallToolResult:
        """
        Asynchronously execute the MCP tool with the provided arguments.

        Parameters
        ----------
        **kwargs : Dict[str, Any]
            The arguments to pass to the tool. These are typically provided
            from the tool call arguments generated by the LLM.

        Returns
        -------
        CallToolResult
            The result of the tool call from the MCP server, containing content
            and optionally structured content.

        Raises
        ------
        RuntimeError
            If the connection is not established and cannot be established.
        """
        try:
            result = await self.server_connection.acall_tool(
                tool_name=self._tool_name,
                arguments=kwargs if kwargs else None,
            )
        except Exception as e:
            result = CallToolResult(
                content=[TextContent(text=f"[MCP Tool Error] {type(e).__name__}: {str(e)}")],
                structuredContent=None,
                isError=True,
            )
        return result

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["tool_name"] = self._tool_name
        state_dict["server_connection_name"] = self._server_connection_name
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._tool_name = state_dict.get("tool_name")
        connection_name = state_dict.get("server_connection_name", None)

        if connection_name is None:
            raise McpServerConnectionError(
                f"Cannot load server connection for McpToolWorker \"{self._tool_name}\", because "
                f"its connection name is not available."
            )

        # Try to reload the server connection from the global manager.
        try:
            self._server_connection = McpServerConnectionManager.get_connection(connection_name)
            self._server_connection_name = connection_name
        except KeyError as e:
            raise McpServerConnectionError(
                f"Failed to load the server connection for McpToolWorker \"{self._tool_name}\", because the "
                f"connection named \"{connection_name}\" was not found in any connection manager. "
                f"You must create a McpServerConnection with name \"{connection_name}\" and "
                f"ensure it is properly registered in a connection manager before using this worker."
            ) from e

tool_name property

tool_name: str

Get the name of the tool.

server_connection property

server_connection: McpServerConnection

Get the server connection, loading it from the server connection manager if necessary.

This property implements lazy loading of the server connection. If the connection is not available (e.g., after deserialization), it will be retrieved from the server connection manager by its name.

Returns:

Type Description
McpServerConnection

The server connection instance.

Raises:

Type Description
McpServerConnectionError

If the connection cannot be found in the manager.

arun

async
arun(**kwargs: Dict[str, Any]) -> CallToolResult

Asynchronously execute the MCP tool with the provided arguments.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

The arguments to pass to the tool. These are typically provided from the tool call arguments generated by the LLM.

{}

Returns:

Type Description
CallToolResult

The result of the tool call from the MCP server, containing content and optionally structured content.

Raises:

Type Description
RuntimeError

If the connection is not established and cannot be established.

Source code in bridgic/protocols/mcp/_mcp_tool_worker.py
async def arun(self, **kwargs: Dict[str, Any]) -> CallToolResult:
    """
    Asynchronously execute the MCP tool with the provided arguments.

    Parameters
    ----------
    **kwargs : Dict[str, Any]
        The arguments to pass to the tool. These are typically provided
        from the tool call arguments generated by the LLM.

    Returns
    -------
    CallToolResult
        The result of the tool call from the MCP server, containing content
        and optionally structured content.

    Raises
    ------
    RuntimeError
        If the connection is not established and cannot be established.
    """
    try:
        result = await self.server_connection.acall_tool(
            tool_name=self._tool_name,
            arguments=kwargs if kwargs else None,
        )
    except Exception as e:
        result = CallToolResult(
            content=[TextContent(text=f"[MCP Tool Error] {type(e).__name__}: {str(e)}")],
            structuredContent=None,
            isError=True,
        )
    return result

McpPromptTemplate

Bases: BasePromptTemplate

This template implementation is used to generate a prompt from a connected MCP server.

Source code in bridgic/protocols/mcp/_mcp_template.py
class McpPromptTemplate(BasePromptTemplate):
    """
    This template implementation is used to generate a prompt from a connected MCP server.
    """

    prompt_name: str
    """The name of the prompt template."""

    prompt_info: Prompt
    """The raw information of the prompt."""

    _server_connection: McpServerConnection
    """The connection to the MCP server."""

    def __init__(
        self,
        prompt_name: str,
        prompt_info: Prompt,
        server_connection: Union[str, McpServerConnection],
    ):
        super().__init__()
        self.prompt_name = prompt_name
        self.prompt_info = prompt_info

        # Try to associate with the real connection object.
        if isinstance(server_connection, str):
            self._server_connection = McpServerConnectionManager.get_connection(server_connection)
        elif isinstance(server_connection, McpServerConnection):
            self._server_connection = server_connection
        else:
            raise TypeError(f"Invalid type for server connection: {type(server_connection)}")

    def format_messages(self, **kwargs) -> List[Message]:
        """
        Format the prompt template from a connected MCP server into messages.

        Parameters
        ----------
        **kwargs : Any
            The keyword arguments to pass to the prompt template.

        Returns
        -------
        List[Message]
            The list of messages.
        """
        if not self._server_connection or not self._server_connection.is_connected:
            raise RuntimeError("MCP session is not connected, unable to render prompt.")

        mcp_result = self._server_connection.get_prompt(
            prompt_name=self.prompt_name,
            arguments=kwargs,
        )

        mcp_messages: List[PromptMessage] = mcp_result.messages

        messages: List[Message] = []
        if mcp_messages:
            for msg in mcp_messages:
                messages.append(Message.from_text(text=msg.content.text, role=msg.role))

        return messages

prompt_name instance-attribute

prompt_name: str = prompt_name

The name of the prompt template.

prompt_info instance-attribute

prompt_info: Prompt = prompt_info

The raw information of the prompt.

format_messages

format_messages(**kwargs) -> List[Message]

Format the prompt template from a connected MCP server into messages.

Parameters:

Name Type Description Default
**kwargs Any

The keyword arguments to pass to the prompt template.

{}

Returns:

Type Description
List[Message]

The list of messages.

Source code in bridgic/protocols/mcp/_mcp_template.py
def format_messages(self, **kwargs) -> List[Message]:
    """
    Format the prompt template from a connected MCP server into messages.

    Parameters
    ----------
    **kwargs : Any
        The keyword arguments to pass to the prompt template.

    Returns
    -------
    List[Message]
        The list of messages.
    """
    if not self._server_connection or not self._server_connection.is_connected:
        raise RuntimeError("MCP session is not connected, unable to render prompt.")

    mcp_result = self._server_connection.get_prompt(
        prompt_name=self.prompt_name,
        arguments=kwargs,
    )

    mcp_messages: List[PromptMessage] = mcp_result.messages

    messages: List[Message] = []
    if mcp_messages:
        for msg in mcp_messages:
            messages.append(Message.from_text(text=msg.content.text, role=msg.role))

    return messages

McpServerConnectionError

Bases: Exception

Raised when the connection to an MCP server fails.

Source code in bridgic/protocols/mcp/_error.py
5
6
7
8
9
class McpServerConnectionError(Exception):
    """
    Raised when the connection to an MCP server fails.
    """
    pass