Skip to content

model

The Model module provides core abstraction entities for LLMs (Large Language Models).

This module defines core abstraction entities for interacting with models, providing foundational type abstractions for different model implementations.

BaseLlm

Bases: ABC, Serializable

Base class for Large Language Model implementations.

Source code in bridgic/core/model/_base_llm.py
class BaseLlm(ABC, Serializable):
    """
    Base class for Large Language Model implementations.
    """

    @abstractmethod
    def chat(self, messages: List[Message], **kwargs) -> Response:
        ...

    @abstractmethod
    def stream(self, messages: List[Message], **kwargs) -> StreamResponse:
        ...

    @abstractmethod
    async def achat(self, messages: List[Message], **kwargs) -> Response:
        ...

    @abstractmethod
    async def astream(self, messages: List[Message], **kwargs) -> AsyncStreamResponse:
        ...

RetryPolicyConfig

Bases: BaseModel

Retry policy for decorating model invocation methods.

Source code in bridgic/core/model/_model_retry.py
class RetryPolicyConfig(BaseModel):
    """
    Retry policy for decorating model invocation methods.
    """

    max_attempts: int = Field(default=3, ge=1)
    base_delay: float = Field(default=0.2, ge=0.0)
    max_delay: float = Field(default=2.0, ge=0.0)
    exponential_base: float = Field(default=2.0, ge=1.0)
    jitter_ratio: float = Field(default=0.2, ge=0.0, le=1.0)

ModelRetryLimitError

Bases: RuntimeError

Raised when recoverable errors persist until retry budget is exhausted.

Source code in bridgic/core/model/_model_error.py
class ModelRetryLimitError(RuntimeError):
    """
    Raised when recoverable errors persist until retry budget is exhausted.
    """

    def __init__(
        self,
        message: str,
        *,
        operation: str,
        original_exception: Optional[Exception] = None,
    ):
        super().__init__(message)
        self.operation = operation
        self.original_exception = original_exception

ModelUnrecoverableError

Bases: RuntimeError

Raised when an exception is classified as non-retryable.

Source code in bridgic/core/model/_model_error.py
class ModelUnrecoverableError(RuntimeError):
    """
    Raised when an exception is classified as non-retryable.
    """

    def __init__(
        self,
        message: str,
        *,
        operation: str,
        original_exception: Optional[Exception] = None,
    ):
        super().__init__(message)
        self.operation = operation
        self.original_exception = original_exception

is_recoverable_exception

is_recoverable_exception(exc: Exception) -> bool

Heuristic classifier for retryable exceptions.

Source code in bridgic/core/model/_model_retry.py
def is_recoverable_exception(exc: Exception) -> bool:
    """
    Heuristic classifier for retryable exceptions.
    """
    if isinstance(exc, (TimeoutError, ConnectionError)):
        return True

    text = (
        f"{exc.__class__.__name__} "
        f"{exc.__class__.__module__} "
        f"{str(exc)}"
    ).lower()
    retry_markers = (
        "429",
        "timeout",
        "timed out",
        "limit",
        "exceeded",
        "overloaded",
        "too many requests",
        "network",
        "connection",
        "harmful",
        "policy",
        "violated",
        "violation",
        "not allowed",
    )
    return any(marker in text for marker in retry_markers)

retryable_model_call

retryable_model_call(
    config: Optional[RetryPolicyConfig] = None,
    recoverable_checker: Optional[
        Callable[[Exception], bool]
    ] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator for model non-streaming methods.

Behavior: - Retry recoverable exceptions up to max attempts. - Raise ModelUnrecoverableError immediately for non-recoverable exceptions. - Raise ModelRetryLimitError after retry attempts are exhausted.

Source code in bridgic/core/model/_model_retry.py
def retryable_model_call(
    config: Optional[RetryPolicyConfig] = None,
    recoverable_checker: Optional[Callable[[Exception], bool]] = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """
    Decorator for model non-streaming methods.

    Behavior:
    - Retry recoverable exceptions up to max attempts.
    - Raise `ModelUnrecoverableError` immediately for non-recoverable exceptions.
    - Raise `ModelRetryLimitError` after retry attempts are exhausted.
    """
    config = config or RetryPolicyConfig()
    checker = recoverable_checker or is_recoverable_exception

    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        if asyncio.iscoroutinefunction(func):

            @wraps(func)
            async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
                op = func.__name__
                last_exc: Optional[Exception] = None
                for attempt in range(1, config.max_attempts + 1):
                    try:
                        return await func(*args, **kwargs)
                    except Exception as exc:
                        if not checker(exc):
                            raise ModelUnrecoverableError(
                                f"Model operation `{op}` failed with non-recoverable error",
                                operation=op,
                                original_exception=exc,
                            ) from exc
                        else:
                            last_exc = exc
                            if attempt < config.max_attempts:
                                await asyncio.sleep(_backoff_delay(attempt, config))
                raise ModelRetryLimitError(
                    (
                        f"Model operation `{op}` exceeded retry attempts "
                        f"({config.max_attempts} attempts)"
                    ),
                    operation=op,
                    original_exception=last_exc,
                ) from last_exc

            return async_wrapper

        @wraps(func)
        def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
            op = func.__name__
            last_exc: Optional[Exception] = None
            for attempt in range(1, config.max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as exc:
                    if not checker(exc):
                        raise ModelUnrecoverableError(
                            f"Model operation `{op}` failed with non-recoverable error",
                            operation=op,
                            original_exception=exc,
                        ) from exc
                    else:
                        last_exc = exc
                        if attempt < config.max_attempts:
                            delay = _backoff_delay(attempt, config)
                            if delay > 0:
                                time.sleep(delay)
            raise ModelRetryLimitError(
                (
                    f"Model operation `{op}` exceeded retry attempts "
                    f"({config.max_attempts} attempts)"
                ),
                operation=op,
                original_exception=last_exc,
            ) from last_exc

        return sync_wrapper

    return decorator