跳过内容

Processor 接口

TracingProcessor

基础: ABC

OpenAI Agents 系统中处理和监控追踪和跨度的接口。

这个抽象类定义了所有追踪处理器必须实现接口。处理器在追踪和跨度开始和结束时接收通知,允许它们收集、处理和导出追踪数据。

示例
class CustomProcessor(TracingProcessor):
    def __init__(self):
        self.active_traces = {}
        self.active_spans = {}

    def on_trace_start(self, trace):
        self.active_traces[trace.trace_id] = trace

    def on_trace_end(self, trace):
        # Process completed trace
        del self.active_traces[trace.trace_id]

    def on_span_start(self, span):
        self.active_spans[span.span_id] = span

    def on_span_end(self, span):
        # Process completed span
        del self.active_spans[span.span_id]

    def shutdown(self):
        # Clean up resources
        self.active_traces.clear()
        self.active_spans.clear()

    def force_flush(self):
        # Force processing of any queued items
        pass
注意事项
  • 所有方法都应该是线程安全的
  • 方法不应该长时间阻塞
  • 优雅地处理错误,以防止干扰代理执行
源代码位于 src/agents/tracing/processor_interface.py
class TracingProcessor(abc.ABC):
    """Interface for processing and monitoring traces and spans in the OpenAI Agents system.

    This abstract class defines the interface that all tracing processors must implement.
    Processors receive notifications when traces and spans start and end, allowing them
    to collect, process, and export tracing data.

    Example:
        ```python
        class CustomProcessor(TracingProcessor):
            def __init__(self):
                self.active_traces = {}
                self.active_spans = {}

            def on_trace_start(self, trace):
                self.active_traces[trace.trace_id] = trace

            def on_trace_end(self, trace):
                # Process completed trace
                del self.active_traces[trace.trace_id]

            def on_span_start(self, span):
                self.active_spans[span.span_id] = span

            def on_span_end(self, span):
                # Process completed span
                del self.active_spans[span.span_id]

            def shutdown(self):
                # Clean up resources
                self.active_traces.clear()
                self.active_spans.clear()

            def force_flush(self):
                # Force processing of any queued items
                pass
        ```

    Notes:
        - All methods should be thread-safe
        - Methods should not block for long periods
        - Handle errors gracefully to prevent disrupting agent execution
    """

    @abc.abstractmethod
    def on_trace_start(self, trace: "Trace") -> None:
        """Called when a new trace begins execution.

        Args:
            trace: The trace that started. Contains workflow name and metadata.

        Notes:
            - Called synchronously on trace start
            - Should return quickly to avoid blocking execution
            - Any errors should be caught and handled internally
        """
        pass

    @abc.abstractmethod
    def on_trace_end(self, trace: "Trace") -> None:
        """Called when a trace completes execution.

        Args:
            trace: The completed trace containing all spans and results.

        Notes:
            - Called synchronously when trace finishes
            - Good time to export/process the complete trace
            - Should handle cleanup of any trace-specific resources
        """
        pass

    @abc.abstractmethod
    def on_span_start(self, span: "Span[Any]") -> None:
        """Called when a new span begins execution.

        Args:
            span: The span that started. Contains operation details and context.

        Notes:
            - Called synchronously on span start
            - Should return quickly to avoid blocking execution
            - Spans are automatically nested under current trace/span
        """
        pass

    @abc.abstractmethod
    def on_span_end(self, span: "Span[Any]") -> None:
        """Called when a span completes execution.

        Args:
            span: The completed span containing execution results.

        Notes:
            - Called synchronously when span finishes
            - Should not block or raise exceptions
            - Good time to export/process the individual span
        """
        pass

    @abc.abstractmethod
    def shutdown(self) -> None:
        """Called when the application stops to clean up resources.

        Should perform any necessary cleanup like:
        - Flushing queued traces/spans
        - Closing connections
        - Releasing resources
        """
        pass

    @abc.abstractmethod
    def force_flush(self) -> None:
        """Forces immediate processing of any queued traces/spans.

        Notes:
            - Should process all queued items before returning
            - Useful before shutdown or when immediate processing is needed
            - May block while processing completes
        """
        pass

on_trace_start abstractmethod

on_trace_start(trace: Trace) -> None

当新的追踪开始执行时调用。

参数

名称 类型 描述 默认
trace Trace

开始的追踪。包含工作流名称和元数据。

required
注意事项
  • 在追踪开始时同步调用
  • 应该快速返回以避免阻塞执行
  • 任何错误都应该被捕获并在内部处理
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def on_trace_start(self, trace: "Trace") -> None:
    """Called when a new trace begins execution.

    Args:
        trace: The trace that started. Contains workflow name and metadata.

    Notes:
        - Called synchronously on trace start
        - Should return quickly to avoid blocking execution
        - Any errors should be caught and handled internally
    """
    pass

on_trace_end abstractmethod

on_trace_end(trace: Trace) -> None

当追踪完成执行时调用。

参数

名称 类型 描述 默认
trace Trace

包含所有跨度和结果的完成的追踪。

required
注意事项
  • 在追踪完成时同步调用
  • 导出/处理完整的追踪的好时机
  • 应该处理任何特定于追踪的资源的清理
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def on_trace_end(self, trace: "Trace") -> None:
    """Called when a trace completes execution.

    Args:
        trace: The completed trace containing all spans and results.

    Notes:
        - Called synchronously when trace finishes
        - Good time to export/process the complete trace
        - Should handle cleanup of any trace-specific resources
    """
    pass

on_span_start abstractmethod

on_span_start(span: Span[Any]) -> None

当新的跨度开始执行时调用。

参数

名称 类型 描述 默认
span Span[Any]

开始的跨度。包含操作细节和上下文。

required
注意事项
  • 在跨度开始时同步调用
  • 应该快速返回以避免阻塞执行
  • 跨度会自动嵌套在当前追踪/跨度下
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def on_span_start(self, span: "Span[Any]") -> None:
    """Called when a new span begins execution.

    Args:
        span: The span that started. Contains operation details and context.

    Notes:
        - Called synchronously on span start
        - Should return quickly to avoid blocking execution
        - Spans are automatically nested under current trace/span
    """
    pass

on_span_end abstractmethod

on_span_end(span: Span[Any]) -> None

当跨度完成执行时调用。

参数

名称 类型 描述 默认
span Span[Any]

包含执行结果的完成的跨度。

required
注意事项
  • 在跨度完成时同步调用
  • 不应该阻塞或引发异常
  • 导出/处理单个跨度的好时机
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def on_span_end(self, span: "Span[Any]") -> None:
    """Called when a span completes execution.

    Args:
        span: The completed span containing execution results.

    Notes:
        - Called synchronously when span finishes
        - Should not block or raise exceptions
        - Good time to export/process the individual span
    """
    pass

shutdown abstractmethod

shutdown() -> None

在应用程序停止时调用,以清理资源。

应该执行任何必要的清理,例如:- 刷新排队的追踪/跨度 - 关闭连接 - 释放资源

源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def shutdown(self) -> None:
    """Called when the application stops to clean up resources.

    Should perform any necessary cleanup like:
    - Flushing queued traces/spans
    - Closing connections
    - Releasing resources
    """
    pass

force_flush abstractmethod

force_flush() -> None

强制立即处理任何排队的追踪/跨度。

注意事项
  • 应该在返回之前处理所有排队的项目
  • 在关闭之前或需要立即处理时很有用
  • 在处理完成时可能会阻塞
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def force_flush(self) -> None:
    """Forces immediate processing of any queued traces/spans.

    Notes:
        - Should process all queued items before returning
        - Useful before shutdown or when immediate processing is needed
        - May block while processing completes
    """
    pass

TracingExporter

基础: ABC

导出追踪和跨度。例如,可以将其记录或发送到后端。

源代码位于 src/agents/tracing/processor_interface.py
class TracingExporter(abc.ABC):
    """Exports traces and spans. For example, could log them or send them to a backend."""

    @abc.abstractmethod
    def export(self, items: list["Trace | Span[Any]"]) -> None:
        """Exports a list of traces and spans.

        Args:
            items: The items to export.
        """
        pass

export abstractmethod

export(items: list[Trace | Span[Any]]) -> None

导出追踪和跨度列表。

参数

名称 类型 描述 默认
条目 list[Trace | Span[Any]]

要导出的项目。

required
源代码位于 src/agents/tracing/processor_interface.py
@abc.abstractmethod
def export(self, items: list["Trace | Span[Any]"]) -> None:
    """Exports a list of traces and spans.

    Args:
        items: The items to export.
    """
    pass