Skip to content

opik

OpikTraceCallback

Bases: WorkerCallback

Opik tracing callback handler for Bridgic.

This callback handler integrates Opik tracing with Bridgic framework, providing step-level tracing for worker execution and automa orchestration. It tracks worker execution, creates spans for each worker, and manages trace lifecycle for top-level automa instances.

Parameters:

Name Type Description Default
project_name Optional[str]

The name of the project. If None, uses Default Project project name.

None
workspace Optional[str]

The name of the workspace. If None, uses default workspace name.

None
host Optional[str]

The host URL for the Opik server. If None, it will default to https://www.comet.com/opik/api.

None
api_key Optional[str]

The API key for Opik. This parameter is ignored for local installations.

None
use_local bool

Whether to use local Opik server.

False
Notes

Since tracing requires the execution within an automa to establish the corresponding record root, only global configurations (via GlobalSetting) and automa-level configurations (via RunningOptions) will take effect. In other words, if you set the callback by using @worker or add_worker, it will not work.

Examples:

If you want to report tracking information to the self-hosted Opik service, you can initialize the callback instance like this:

OpikTraceCallback(project_name="my-project", use_local=True)

If you want to report tracking information to the Opik Cloud service, you can initialize the callback instance like this:

OpikTraceCallback(project_name="my-project", api_key="my-api-key")

Source code in bridgic/traces/opik/_opik_trace_callback.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
class OpikTraceCallback(WorkerCallback):
    """
    Opik tracing callback handler for Bridgic.

    This callback handler integrates Opik tracing with Bridgic framework,
    providing step-level tracing for worker execution and automa orchestration.
    It tracks worker execution, creates spans for each worker, and manages
    trace lifecycle for top-level automa instances.

    Parameters
    ----------
    project_name : Optional[str], default=None
        The name of the project. If None, uses `Default Project` project name.
    workspace : Optional[str], default=None
        The name of the workspace. If None, uses `default` workspace name.
    host : Optional[str], default=None
        The host URL for the Opik server. If None, it will default to `https://www.comet.com/opik/api`.
    api_key : Optional[str], default=None
        The API key for Opik. This parameter is ignored for local installations.
    use_local : bool, default=False
        Whether to use local Opik server.

    Notes
    ------
    Since tracing requires the execution within an automa to establish the corresponding record root,
    only global configurations (via `GlobalSetting`) and automa-level configurations (via `RunningOptions`) will take effect. 
    In other words, if you set the callback by using `@worker` or `add_worker`, it will not work.

    Examples
    ------
    If you want to report tracking information to the self-hosted Opik service, you can initialize the callback instance like this:
    ```python
    OpikTraceCallback(project_name="my-project", use_local=True)
    ```

    If you want to report tracking information to the Opik Cloud service, you can initialize the callback instance like this:
    ```python
    OpikTraceCallback(project_name="my-project", api_key="my-api-key")
    ```
    """

    _project_name: Optional[str]
    _workspace: Optional[str]
    _is_ready: bool
    _api_key: Optional[str]
    _host: Optional[str]
    _use_local: bool
    _opik_client: opik_client.Opik

    def __init__(
        self,
        project_name: Optional[str] = None,
        workspace: Optional[str] = None,
        host: Optional[str] = None,
        api_key: Optional[str] = None,
        use_local: bool = False,
    ):
        super().__init__()
        self._project_name = project_name
        self._workspace = workspace
        self._api_key = api_key
        self._host = host
        self._use_local = use_local
        self._is_ready = False
        self._setup_opik()

    def _setup_opik(self) -> None:
        if self._use_local:
            opik.configure(use_local=True)
        self._opik_client = opik_client.Opik(_use_batching=True, project_name=self._project_name, workspace=self._workspace, api_key=self._api_key, host=self._host)
        missing_configuration, _ = self._opik_client._config.get_misconfiguration_detection_results()
        if missing_configuration:
            self._is_ready = False # for serialization compatibility
            return
        self._check_opik_auth()

    def _check_opik_auth(self) -> None:
        try:
            self._opik_client.auth_check()
        except Exception as e:
            self._is_ready = False # for serialization compatibility
            warnings.warn(f"Opik auth check failed, OpikTracer will be disabled: {e}")
        else:
            self._is_ready = True

    def _get_worker_instance(self, key: str, parent: Optional[Automa]) -> Worker:
        """
        Get worker instance from parent automa.

        Returns
        -------
        Worker
            The worker instance.
        """
        if parent is None:
            raise ValueError("Parent automa is required to get worker instance")
        return parent._get_worker_instance(key)

    def _create_trace_data(self, trace_name: Optional[str] = None) -> trace.TraceData:
        return trace.TraceData(
            name=trace_name, 
            metadata={"created_from": "bridgic"}, 
            project_name=self._project_name
        )

    def _get_or_create_trace_data(self, trace_name: Optional[str] = None) -> trace.TraceData:
        """Initialize or reuse existing trace."""
        existing_trace = opik_context_storage.get_trace_data()
        if existing_trace:
            return existing_trace

        # Create new trace and set in context
        trace_data = self._create_trace_data(trace_name)
        opik_context_storage.set_trace_data(trace_data)

        if self._opik_client.config.log_start_trace_span:
            self._opik_client.trace(**trace_data.as_start_parameters)
        return trace_data

    def _complete_trace(self, output: Optional[Dict[str, Any]], error_info: Optional[ErrorInfoDict]) -> None:
        """Finalize and log trace we own."""
        trace_data = opik_context_storage.get_trace_data()
        if trace_data is None:
            return

        trace_data.init_end_time()

        # Compute execution duration from trace start_time
        if trace_data.start_time:
            end_time = trace_data.end_time.timestamp() if trace_data.end_time else time.time()
            start_time = trace_data.start_time.timestamp()
            trace_data.metadata = merge_optional_dicts(
                trace_data.metadata,
                {"execution_duration": end_time - start_time, "end_time": end_time}
            )

        if output:
            trace_data.update(output=output)

        if error_info:
            trace_data.update(error_info=error_info)

        self._opik_client.trace(**trace_data.as_parameters)
        opik_context_storage.pop_trace_data(ensure_id=trace_data.id)
        self._flush()

    def _start_span(
        self,
        step_name: str,
        inputs: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Start a span for a worker execution step and push it to context."""
        trace_data = opik_context_storage.get_trace_data()

        parent_span = opik_context_storage.top_span_data()

        project_name = helpers.resolve_child_span_project_name(
            parent_project_name=trace_data.project_name,
            child_project_name=self._project_name,
            show_warning=True,
        )

        span_data = span.SpanData(
            trace_id=trace_data.id,
            name=step_name,
            parent_span_id=parent_span.id if parent_span else None,
            input=inputs,
            metadata=metadata,
            project_name=project_name,
        )
        # Store start_time in metadata for later duration calculation
        if span_data.start_time and metadata is not None:
            metadata["start_time"] = span_data.start_time.timestamp()
            span_data.update(metadata=metadata)
        # Add span to context stack
        opik_context_storage.add_span_data(span_data)

        if self._opik_client.config.log_start_trace_span:
            self._opik_client.span(**span_data.as_start_parameters)

    def _finish_span(self, span_data: span.SpanData, worker_metadata: Optional[Dict[str, Any]] = None) -> None:
        """Finish a worker span with metadata and output, then pop from context."""
        if worker_metadata:
            output = worker_metadata.get("output")
            # Merge all metadata except 'output' into span metadata
            current_metadata = span_data.metadata or {}
            current_metadata.update({k: v for k, v in worker_metadata.items() if k != "output"})
            span_data.update(metadata=current_metadata)

            if output is not None:
                span_data.update(output=output)

        span_data.init_end_time()
        self._opik_client.span(**span_data.as_parameters)

        # Pop span from context stack
        opik_context_storage.pop_span_data(ensure_id=span_data.id)

    def _start_top_level_trace(self, key: str, arguments: Optional[Dict[str, Any]]) -> None:
        """Start trace initialization for top-level automa."""
        trace_data = self._get_or_create_trace_data(trace_name=key or "top_level_automa")

        serialized_args = serialize_data(arguments)
        metadata_updates = {"key": key, "nesting_level": 0}
        if trace_data.start_time:
            metadata_updates["start_time"] = trace_data.start_time.timestamp()

        trace_data.metadata = merge_optional_dicts(trace_data.metadata, metadata_updates)

        if serialized_args:
            trace_data.input = serialized_args

    def _start_worker_span(self, key: str, worker: Worker, parent: Automa, arguments: Optional[Dict[str, Any]]) -> None:
        """Start a span for worker execution."""
        step_name = get_worker_tracing_step_name(key, worker)
        worker_tracing_dict = build_worker_tracing_dict(worker, parent)
        self._start_span(
            step_name=step_name,
            inputs=serialize_data(arguments),
            metadata=worker_tracing_dict,
        )

    async def on_worker_start(
        self,
        key: str,
        is_top_level: bool = False,
        parent: Optional[Automa] = None,
        arguments: Optional[Dict[str, Any]] = None,
    ) -> None:
        """
        Hook invoked before worker execution.

        For top-level automa, initializes a new trace. For workers, creates
        a new span. Handles nested automa as workers by checking if the
        decorated worker is an automa instance.

        Parameters
        ----------
        key : str
            Worker identifier.
        is_top_level : bool, default=False
            Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
        parent : Optional[Automa], default=None
            Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
        arguments : Optional[Dict[str, Any]], default=None
            Execution arguments with keys "args" and "kwargs".
        """
        if not self._is_ready:
            return
        if is_top_level:
            self._start_top_level_trace(key, arguments)
            return

        try:
            worker = self._get_worker_instance(key, parent)
        except (KeyError, ValueError) as e:
            warnings.warn(f"Failed to get worker instance for key '{key}': {e}")
            return

        self._start_worker_span(key, worker, parent, arguments)

    def _finish_current_span(self, output: Dict[str, Any], error: Optional[Exception] = None) -> None:
        """Finish the current span and pop it from context."""
        current_span = opik_context_storage.top_span_data()
        if not current_span:
            warnings.warn("No span found in context when finishing worker span")
            return

        # Calculate execution timing
        end_time = time.time()
        start_time = current_span.start_time.timestamp() if current_span.start_time else end_time

        # Build worker metadata with timing and output
        worker_metadata = {
            "end_time": end_time,
            "execution_duration": end_time - start_time,
            "output": serialize_data(output),
        }

        # Handle error if present
        if error:
            error_info = error_info_collector.collect(error)
            if error_info:
                current_span.update(error_info=error_info)

        # Finish the span (this will merge metadata and pop from context)
        self._finish_span(current_span, worker_metadata=worker_metadata)

    def _build_output_payload(self, result: Any = None, error: Optional[Exception] = None) -> Dict[str, Any]:
        """Build a standardized output dictionary for results or errors."""
        if error:
            return {"error_type": type(error).__name__, "error_message": str(error)}
        return {
            "result_type": type(result).__name__ if result is not None else None,
            "result": serialize_data(result),
        }

    def _complete_worker_execution(self, output: Dict[str, Any], is_top_level: bool, error: Optional[Exception] = None) -> None:
        """Complete worker or trace execution."""
        if is_top_level:
            trace_data = opik_context_storage.get_trace_data()
            if trace_data:
                execution_status = "failed" if error else "completed"
                trace_data.metadata = merge_optional_dicts(
                    trace_data.metadata, {"execution_status": execution_status}
                )

            error_info = error_info_collector.collect(error) if error else None
            self._complete_trace(output, error_info)
        else:
            self._finish_current_span(output=output, error=error)

    async def on_worker_end(
        self,
        key: str,
        is_top_level: bool = False,
        parent: Optional[Automa] = None,
        arguments: Optional[Dict[str, Any]] = None,
        result: Any = None,
    ) -> None:
        """
        Hook invoked after worker execution.

        For top-level automa, ends the trace. For workers, ends the span
        with execution results.

        Parameters
        ----------
        key : str
            Worker identifier.
        is_top_level : bool, default=False
            Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
        parent : Optional[Automa], default=None
            Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
        arguments : Optional[Dict[str, Any]], default=None
            Execution arguments with keys "args" and "kwargs".
        result : Any, default=None
            Worker execution result.
        """
        if not self._is_ready:
            return
        output = self._build_output_payload(result=result)
        self._complete_worker_execution(output, is_top_level)

    async def on_worker_error(
        self,
        key: str,
        is_top_level: bool = False,
        parent: Optional[Automa] = None,
        arguments: Optional[Dict[str, Any]] = None,
        error: Exception = None,
    ) -> bool:
        """
        Hook invoked when worker execution raises an exception.

        For top-level automa, ends the trace with error information.
        For workers, ends the span with error information.

        Parameters
        ----------
        key : str
            Worker identifier.
        is_top_level : bool, default=False
            Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
        parent : Optional[Automa], default=None
            Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
        arguments : Optional[Dict[str, Any]], default=None
            Execution arguments with keys "args" and "kwargs".
        error : Exception, default=None
            The exception raised during worker execution.

        Returns
        -------
        bool
            Always returns False, indicating the exception should not be suppressed.
        """
        if not self._is_ready:
            return False
        if not is_top_level and parent:
            try:
                self._get_worker_instance(key, parent)
            except (KeyError, ValueError) as e:
                warnings.warn(f"Failed to get worker instance for key '{key}': {e}")
                return False

        output = self._build_output_payload(error=error)
        self._complete_worker_execution(output, is_top_level, error=error)
        return False

    def _flush(self) -> None:
        self._opik_client.flush()

    @override
    def dump_to_dict(self) -> Dict[str, Any]:
        state_dict = super().dump_to_dict()
        state_dict["project_name"] = self._project_name
        state_dict["workspace"] = self._workspace
        state_dict["api_key"] = self._api_key
        state_dict["host"] = self._host
        state_dict["use_local"] = self._use_local
        return state_dict

    @override
    def load_from_dict(self, state_dict: Dict[str, Any]) -> None:
        super().load_from_dict(state_dict)
        self._project_name = state_dict["project_name"]
        self._workspace = state_dict["workspace"]
        self._api_key = state_dict["api_key"]
        self._host = state_dict["host"]
        self._use_local = state_dict["use_local"]
        self._setup_opik() # if opik is not ready, it will be set to False

on_worker_start

async
on_worker_start(
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
) -> None

Hook invoked before worker execution.

For top-level automa, initializes a new trace. For workers, creates a new span. Handles nested automa as workers by checking if the decorated worker is an automa instance.

Parameters:

Name Type Description Default
key str

Worker identifier.

required
is_top_level bool

Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).

False
parent Optional[Automa]

Parent automa instance containing this worker. For top-level automa, parent is the automa itself.

None
arguments Optional[Dict[str, Any]]

Execution arguments with keys "args" and "kwargs".

None
Source code in bridgic/traces/opik/_opik_trace_callback.py
async def on_worker_start(
    self,
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Hook invoked before worker execution.

    For top-level automa, initializes a new trace. For workers, creates
    a new span. Handles nested automa as workers by checking if the
    decorated worker is an automa instance.

    Parameters
    ----------
    key : str
        Worker identifier.
    is_top_level : bool, default=False
        Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
    parent : Optional[Automa], default=None
        Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
    arguments : Optional[Dict[str, Any]], default=None
        Execution arguments with keys "args" and "kwargs".
    """
    if not self._is_ready:
        return
    if is_top_level:
        self._start_top_level_trace(key, arguments)
        return

    try:
        worker = self._get_worker_instance(key, parent)
    except (KeyError, ValueError) as e:
        warnings.warn(f"Failed to get worker instance for key '{key}': {e}")
        return

    self._start_worker_span(key, worker, parent, arguments)

on_worker_end

async
on_worker_end(
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
    result: Any = None,
) -> None

Hook invoked after worker execution.

For top-level automa, ends the trace. For workers, ends the span with execution results.

Parameters:

Name Type Description Default
key str

Worker identifier.

required
is_top_level bool

Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).

False
parent Optional[Automa]

Parent automa instance containing this worker. For top-level automa, parent is the automa itself.

None
arguments Optional[Dict[str, Any]]

Execution arguments with keys "args" and "kwargs".

None
result Any

Worker execution result.

None
Source code in bridgic/traces/opik/_opik_trace_callback.py
async def on_worker_end(
    self,
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
    result: Any = None,
) -> None:
    """
    Hook invoked after worker execution.

    For top-level automa, ends the trace. For workers, ends the span
    with execution results.

    Parameters
    ----------
    key : str
        Worker identifier.
    is_top_level : bool, default=False
        Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
    parent : Optional[Automa], default=None
        Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
    arguments : Optional[Dict[str, Any]], default=None
        Execution arguments with keys "args" and "kwargs".
    result : Any, default=None
        Worker execution result.
    """
    if not self._is_ready:
        return
    output = self._build_output_payload(result=result)
    self._complete_worker_execution(output, is_top_level)

on_worker_error

async
on_worker_error(
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
    error: Exception = None,
) -> bool

Hook invoked when worker execution raises an exception.

For top-level automa, ends the trace with error information. For workers, ends the span with error information.

Parameters:

Name Type Description Default
key str

Worker identifier.

required
is_top_level bool

Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).

False
parent Optional[Automa]

Parent automa instance containing this worker. For top-level automa, parent is the automa itself.

None
arguments Optional[Dict[str, Any]]

Execution arguments with keys "args" and "kwargs".

None
error Exception

The exception raised during worker execution.

None

Returns:

Type Description
bool

Always returns False, indicating the exception should not be suppressed.

Source code in bridgic/traces/opik/_opik_trace_callback.py
async def on_worker_error(
    self,
    key: str,
    is_top_level: bool = False,
    parent: Optional[Automa] = None,
    arguments: Optional[Dict[str, Any]] = None,
    error: Exception = None,
) -> bool:
    """
    Hook invoked when worker execution raises an exception.

    For top-level automa, ends the trace with error information.
    For workers, ends the span with error information.

    Parameters
    ----------
    key : str
        Worker identifier.
    is_top_level : bool, default=False
        Whether the worker is the top-level automa. When True, parent will be the automa itself (parent is self).
    parent : Optional[Automa], default=None
        Parent automa instance containing this worker. For top-level automa, parent is the automa itself.
    arguments : Optional[Dict[str, Any]], default=None
        Execution arguments with keys "args" and "kwargs".
    error : Exception, default=None
        The exception raised during worker execution.

    Returns
    -------
    bool
        Always returns False, indicating the exception should not be suppressed.
    """
    if not self._is_ready:
        return False
    if not is_top_level and parent:
        try:
            self._get_worker_instance(key, parent)
        except (KeyError, ValueError) as e:
            warnings.warn(f"Failed to get worker instance for key '{key}': {e}")
            return False

    output = self._build_output_payload(error=error)
    self._complete_worker_execution(output, is_top_level, error=error)
    return False

start_opik_trace

start_opik_trace(
    project_name: Optional[str] = None,
    workspace: Optional[str] = None,
    host: Optional[str] = None,
    api_key: Optional[str] = None,
    use_local: bool = False,
) -> None

Start a Opik trace for a given project and service.

Parameters:

Name Type Description Default
project_name Optional[str]

The name of the project. If None, uses Default Project project name.

None
workspace Optional[str]

The name of the workspace. If None, uses default workspace name.

None
host Optional[str]

The host URL for the Opik server. If None, it will default to https://www.comet.com/opik/api.

None
api_key Optional[str]

The API key for Opik. This parameter is ignored for local installations.

None
use_local bool

Whether to use local Opik server.

False

Returns:

Type Description
None
Source code in bridgic/traces/opik/_utils.py
def start_opik_trace(
    project_name: Optional[str] = None,
    workspace: Optional[str] = None,
    host: Optional[str] = None,
    api_key: Optional[str] = None,
    use_local: bool = False,
) -> None:
    """Start a Opik trace for a given project and service.

    Parameters
    ----------
    project_name : Optional[str], default=None
        The name of the project. If None, uses `Default Project` project name.
    workspace : Optional[str], default=None
        The name of the workspace. If None, uses `default` workspace name.
    host : Optional[str], default=None
        The host URL for the Opik server. If None, it will default to `https://www.comet.com/opik/api`.
    api_key : Optional[str], default=None
        The API key for Opik. This parameter is ignored for local installations.
    use_local : bool, default=False
        Whether to use local Opik server.

    Returns
    -------
    None
    """
    from bridgic.core.config import GlobalSetting
    builder = WorkerCallbackBuilder(
        OpikTraceCallback, 
        init_kwargs={"project_name": project_name, "workspace": workspace, "host": host, "api_key": api_key, "use_local": use_local}
    )
    GlobalSetting.add(callback_builder=builder)