跳过内容

结果

RunResultBase dataclass

基础: ABC

源代码位于 src/agents/result.py
@dataclass
class RunResultBase(abc.ABC):
    input: str | list[TResponseInputItem]
    """The original input items i.e. the items before run() was called. This may be a mutated
    version of the input, if there are handoff input filters that mutate the input.
    """

    new_items: list[RunItem]
    """The new items generated during the agent run. These include things like new messages, tool
    calls and their outputs, etc.
    """

    raw_responses: list[ModelResponse]
    """The raw LLM responses generated by the model during the agent run."""

    final_output: Any
    """The output of the last agent."""

    input_guardrail_results: list[InputGuardrailResult]
    """Guardrail results for the input messages."""

    output_guardrail_results: list[OutputGuardrailResult]
    """Guardrail results for the final output of the agent."""

    tool_input_guardrail_results: list[ToolInputGuardrailResult]
    """Tool input guardrail results from all tools executed during the run."""

    tool_output_guardrail_results: list[ToolOutputGuardrailResult]
    """Tool output guardrail results from all tools executed during the run."""

    context_wrapper: RunContextWrapper[Any]
    """The context wrapper for the agent run."""

    @property
    @abc.abstractmethod
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""

    def release_agents(self, *, release_new_items: bool = True) -> None:
        """
        Release strong references to agents held by this result. After calling this method,
        accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
        collected. Callers can use this when they are done inspecting the result and want to
        eagerly drop any associated agent graph.
        """
        if release_new_items:
            for item in self.new_items:
                release = getattr(item, "release_agent", None)
                if callable(release):
                    release()
        self._release_last_agent_reference()

    def __del__(self) -> None:
        try:
            # Fall back to releasing agents automatically in case the caller never invoked
            # `release_agents()` explicitly so GC of the RunResult drops the last strong reference.
            # We pass `release_new_items=False` so RunItems that the user intentionally keeps
            # continue exposing their originating agent until that agent itself is collected.
            self.release_agents(release_new_items=False)
        except Exception:
            # Avoid raising from __del__.
            pass

    @abc.abstractmethod
    def _release_last_agent_reference(self) -> None:
        """Release stored agent reference specific to the concrete result type."""

    def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
        """A convenience method to cast the final output to a specific type. By default, the cast
        is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
        TypeError if the final output is not of the given type.

        Args:
            cls: The type to cast the final output to.
            raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
                the given type.

        Returns:
            The final output casted to the given type.
        """
        if raise_if_incorrect_type and not isinstance(self.final_output, cls):
            raise TypeError(f"Final output is not of type {cls.__name__}")

        return cast(T, self.final_output)

    def to_input_list(self) -> list[TResponseInputItem]:
        """Creates a new input list, merging the original input with all the new items generated."""
        original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
        new_items = [item.to_input_item() for item in self.new_items]

        return original_items + new_items

    @property
    def last_response_id(self) -> str | None:
        """Convenience method to get the response ID of the last model response."""
        if not self.raw_responses:
            return None

        return self.raw_responses[-1].response_id

input instance-attribute

input: str | list[TResponseInputItem]

原始输入项,即在调用 run() 之前的内容。如果存在修改输入的输入处理过滤器,这可能是输入的一个变异版本。

new_items instance-attribute

new_items: list[RunItem]

在代理运行期间生成的新项。这些包括新消息、工具调用及其输出等。

raw_responses instance-attribute

raw_responses: list[ModelResponse]

代理运行期间模型生成的原始 LLM 响应。

final_output instance-attribute

final_output: Any

最后一个代理的输出。

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

输入消息的护栏结果。

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

代理最终输出的护栏结果。

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

在运行期间执行的所有工具的工具输入护栏结果。

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

在运行期间执行的所有工具的工具输出护栏结果。

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

代理运行的上下文包装器。

last_agent abstractmethod property

last_agent: Agent[Any]

运行的最后一个代理。

last_response_id property

last_response_id: str | None

获取最后一个模型响应的响应 ID 的便捷方法。

release_agents

release_agents(*, release_new_items: bool = True) -> None

释放此结果中持有的代理的强引用。调用此方法后,访问 item.agentlast_agent 可能会返回 None,如果代理已被垃圾回收。调用者可以在完成检查结果后使用此方法,以主动释放任何关联的代理图。

源代码位于 src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

一种便捷的方法,用于将最终输出转换为特定类型。默认情况下,转换仅用于类型检查器。如果将 raise_if_incorrect_type 设置为 True,我们将在最终输出不是给定类型时引发 TypeError。

参数

名称 类型 描述 默认
cls type[T]

要将最终输出转换为的类型。

required
raise_if_incorrect_type bool

如果为 True,我们将在最终输出不是给定类型时引发 TypeError。

False

返回值

类型 描述
T

转换为给定类型的最终输出。

源代码位于 src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

创建一个新的输入列表,合并原始输入和生成的所有新项。

源代码位于 src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items

RunResult dataclass

基类:RunResultBase

源代码位于 src/agents/result.py
@dataclass
class RunResult(RunResultBase):
    _last_agent: Agent[Any]
    _last_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )

    def __post_init__(self) -> None:
        self._last_agent_ref = weakref.ref(self._last_agent)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is not None:
            return agent
        if self._last_agent_ref:
            agent = self._last_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is None:
            return
        self._last_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["_last_agent"] = None

    def __str__(self) -> str:
        return pretty_print_result(self)

last_agent property

last_agent: Agent[Any]

运行的最后一个代理。

input instance-attribute

input: str | list[TResponseInputItem]

原始输入项,即在调用 run() 之前的内容。如果存在修改输入的输入处理过滤器,这可能是输入的一个变异版本。

new_items instance-attribute

new_items: list[RunItem]

在代理运行期间生成的新项。这些包括新消息、工具调用及其输出等。

raw_responses instance-attribute

raw_responses: list[ModelResponse]

代理运行期间模型生成的原始 LLM 响应。

final_output instance-attribute

final_output: Any

最后一个代理的输出。

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

输入消息的护栏结果。

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

代理最终输出的护栏结果。

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

在运行期间执行的所有工具的工具输入护栏结果。

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

在运行期间执行的所有工具的工具输出护栏结果。

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

代理运行的上下文包装器。

last_response_id property

last_response_id: str | None

获取最后一个模型响应的响应 ID 的便捷方法。

release_agents

release_agents(*, release_new_items: bool = True) -> None

释放此结果中持有的代理的强引用。调用此方法后,访问 item.agentlast_agent 可能会返回 None,如果代理已被垃圾回收。调用者可以在完成检查结果后使用此方法,以主动释放任何关联的代理图。

源代码位于 src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

一种便捷的方法,用于将最终输出转换为特定类型。默认情况下,转换仅用于类型检查器。如果将 raise_if_incorrect_type 设置为 True,我们将在最终输出不是给定类型时引发 TypeError。

参数

名称 类型 描述 默认
cls type[T]

要将最终输出转换为的类型。

required
raise_if_incorrect_type bool

如果为 True,我们将在最终输出不是给定类型时引发 TypeError。

False

返回值

类型 描述
T

转换为给定类型的最终输出。

源代码位于 src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

创建一个新的输入列表,合并原始输入和生成的所有新项。

源代码位于 src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items

RunResultStreaming dataclass

基类:RunResultBase

流式模式下代理运行的结果。您可以使用 stream_events 方法接收在生成时产生的语义事件。

流式方法将引发:- 如果代理超过 max_turns 限制,则引发 MaxTurnsExceeded 异常。- 如果触发了护栏,则引发 GuardrailTripwireTriggered 异常。

源代码位于 src/agents/result.py
@dataclass
class RunResultStreaming(RunResultBase):
    """The result of an agent run in streaming mode. You can use the `stream_events` method to
    receive semantic events as they are generated.

    The streaming method will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """

    current_agent: Agent[Any]
    """The current agent that is running."""

    current_turn: int
    """The current turn number."""

    max_turns: int
    """The maximum number of turns the agent can run for."""

    final_output: Any
    """The final output of the agent. This is None until the agent has finished running."""

    _current_agent_output_schema: AgentOutputSchemaBase | None = field(repr=False)

    trace: Trace | None = field(repr=False)

    is_complete: bool = False
    """Whether the agent has finished running."""

    _current_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )

    # Queues that the background run_loop writes to
    _event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
        default_factory=asyncio.Queue, repr=False
    )
    _input_guardrail_queue: asyncio.Queue[InputGuardrailResult] = field(
        default_factory=asyncio.Queue, repr=False
    )

    # Store the asyncio tasks that we're waiting on
    _run_impl_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _input_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _stored_exception: Exception | None = field(default=None, repr=False)

    # Soft cancel state
    _cancel_mode: Literal["none", "immediate", "after_turn"] = field(default="none", repr=False)

    def __post_init__(self) -> None:
        self._current_agent_ref = weakref.ref(self.current_agent)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run. Updates as the agent run progresses, so the true last agent
        is only available after the agent run is complete.
        """
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is not None:
            return agent
        if self._current_agent_ref:
            agent = self._current_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is None:
            return
        self._current_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["current_agent"] = None

    def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
        """Cancel the streaming run.

        Args:
            mode: Cancellation strategy:
                - "immediate": Stop immediately, cancel all tasks, clear queues (default)
                - "after_turn": Complete current turn gracefully before stopping
                    * Allows LLM response to finish
                    * Executes pending tool calls
                    * Saves session state properly
                    * Tracks usage accurately
                    * Stops before next turn begins

        Example:
            ```python
            result = Runner.run_streamed(agent, "Task", session=session)

            async for event in result.stream_events():
                if user_interrupted():
                    result.cancel(mode="after_turn")  # Graceful
                    # result.cancel()  # Immediate (default)
            ```

        Note: After calling cancel(), you should continue consuming stream_events()
        to allow the cancellation to complete properly.
        """
        # Store the cancel mode for the background task to check
        self._cancel_mode = mode

        if mode == "immediate":
            # Existing behavior - immediate shutdown
            self._cleanup_tasks()  # Cancel all running tasks
            self.is_complete = True  # Mark the run as complete to stop event streaming

            # Optionally, clear the event queue to prevent processing stale events
            while not self._event_queue.empty():
                self._event_queue.get_nowait()
            while not self._input_guardrail_queue.empty():
                self._input_guardrail_queue.get_nowait()

        elif mode == "after_turn":
            # Soft cancel - just set the flag
            # The streaming loop will check this and stop gracefully
            # Don't call _cleanup_tasks() or clear queues yet
            pass

    async def stream_events(self) -> AsyncIterator[StreamEvent]:
        """Stream deltas for new items as they are generated. We're using the types from the
        OpenAI Responses API, so these are semantic events: each event has a `type` field that
        describes the type of the event, along with the data for that event.

        This will raise:
        - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
        - A GuardrailTripwireTriggered exception if a guardrail is tripped.
        """
        cancelled = False
        try:
            while True:
                self._check_errors()
                if self._stored_exception:
                    logger.debug("Breaking due to stored exception")
                    self.is_complete = True
                    break

                if self.is_complete and self._event_queue.empty():
                    break

                try:
                    item = await self._event_queue.get()
                except asyncio.CancelledError:
                    cancelled = True
                    self.cancel()
                    raise

                if isinstance(item, QueueCompleteSentinel):
                    # Await input guardrails if they are still running, so late
                    # exceptions are captured.
                    await self._await_task_safely(self._input_guardrails_task)

                    self._event_queue.task_done()

                    # Check for errors, in case the queue was completed
                    # due to an exception
                    self._check_errors()
                    break

                yield item
                self._event_queue.task_done()
        finally:
            if cancelled:
                # Cancellation should return promptly, so avoid waiting on long-running tasks.
                # Tasks have already been cancelled above.
                self._cleanup_tasks()
            else:
                # Ensure main execution completes before cleanup to avoid race conditions
                # with session operations
                await self._await_task_safely(self._run_impl_task)
                # Safely terminate all background tasks after main execution has finished
                self._cleanup_tasks()

        if self._stored_exception:
            raise self._stored_exception

    def _create_error_details(self) -> RunErrorDetails:
        """Return a `RunErrorDetails` object considering the current attributes of the class."""
        return RunErrorDetails(
            input=self.input,
            new_items=self.new_items,
            raw_responses=self.raw_responses,
            last_agent=self.current_agent,
            context_wrapper=self.context_wrapper,
            input_guardrail_results=self.input_guardrail_results,
            output_guardrail_results=self.output_guardrail_results,
        )

    def _check_errors(self):
        if self.current_turn > self.max_turns:
            max_turns_exc = MaxTurnsExceeded(f"Max turns ({self.max_turns}) exceeded")
            max_turns_exc.run_data = self._create_error_details()
            self._stored_exception = max_turns_exc

        # Fetch all the completed guardrail results from the queue and raise if needed
        while not self._input_guardrail_queue.empty():
            guardrail_result = self._input_guardrail_queue.get_nowait()
            if guardrail_result.output.tripwire_triggered:
                tripwire_exc = InputGuardrailTripwireTriggered(guardrail_result)
                tripwire_exc.run_data = self._create_error_details()
                self._stored_exception = tripwire_exc

        # Check the tasks for any exceptions
        if self._run_impl_task and self._run_impl_task.done():
            run_impl_exc = self._run_impl_task.exception()
            if run_impl_exc and isinstance(run_impl_exc, Exception):
                if isinstance(run_impl_exc, AgentsException) and run_impl_exc.run_data is None:
                    run_impl_exc.run_data = self._create_error_details()
                self._stored_exception = run_impl_exc

        if self._input_guardrails_task and self._input_guardrails_task.done():
            in_guard_exc = self._input_guardrails_task.exception()
            if in_guard_exc and isinstance(in_guard_exc, Exception):
                if isinstance(in_guard_exc, AgentsException) and in_guard_exc.run_data is None:
                    in_guard_exc.run_data = self._create_error_details()
                self._stored_exception = in_guard_exc

        if self._output_guardrails_task and self._output_guardrails_task.done():
            out_guard_exc = self._output_guardrails_task.exception()
            if out_guard_exc and isinstance(out_guard_exc, Exception):
                if isinstance(out_guard_exc, AgentsException) and out_guard_exc.run_data is None:
                    out_guard_exc.run_data = self._create_error_details()
                self._stored_exception = out_guard_exc

    def _cleanup_tasks(self):
        if self._run_impl_task and not self._run_impl_task.done():
            self._run_impl_task.cancel()

        if self._input_guardrails_task and not self._input_guardrails_task.done():
            self._input_guardrails_task.cancel()

        if self._output_guardrails_task and not self._output_guardrails_task.done():
            self._output_guardrails_task.cancel()

    def __str__(self) -> str:
        return pretty_print_run_result_streaming(self)

    async def _await_task_safely(self, task: asyncio.Task[Any] | None) -> None:
        """Await a task if present, ignoring cancellation and storing exceptions elsewhere.

        This ensures we do not lose late guardrail exceptions while not surfacing
        CancelledError to callers of stream_events.
        """
        if task and not task.done():
            try:
                await task
            except asyncio.CancelledError:
                # Task was cancelled (e.g., due to result.cancel()). Nothing to do here.
                pass
            except Exception:
                # The exception will be surfaced via _check_errors() if needed.
                pass

current_agent instance-attribute

current_agent: Agent[Any]

当前正在运行的代理。

current_turn instance-attribute

current_turn: int

当前轮数。

max_turns instance-attribute

max_turns: int

代理可以运行的最大轮数。

final_output instance-attribute

final_output: Any

代理的最终输出。在代理完成运行之前,此值为 None。

is_complete class-attribute instance-attribute

is_complete: bool = False

代理是否已完成运行。

last_agent property

last_agent: Agent[Any]

运行的最后一个代理。随着代理运行的进行而更新,因此真正的最后一个代理仅在代理运行完成后可用。

input instance-attribute

input: str | list[TResponseInputItem]

原始输入项,即在调用 run() 之前的内容。如果存在修改输入的输入处理过滤器,这可能是输入的一个变异版本。

new_items instance-attribute

new_items: list[RunItem]

在代理运行期间生成的新项。这些包括新消息、工具调用及其输出等。

raw_responses instance-attribute

raw_responses: list[ModelResponse]

代理运行期间模型生成的原始 LLM 响应。

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

输入消息的护栏结果。

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

代理最终输出的护栏结果。

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

在运行期间执行的所有工具的工具输入护栏结果。

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

在运行期间执行的所有工具的工具输出护栏结果。

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

代理运行的上下文包装器。

last_response_id property

last_response_id: str | None

获取最后一个模型响应的响应 ID 的便捷方法。

cancel

cancel(
    mode: Literal["immediate", "after_turn"] = "immediate",
) -> None

取消流式运行。

参数

名称 类型 描述 默认
模式 Literal['immediate', 'after_turn']

取消策略:- "immediate":立即停止,取消所有任务,清除队列(默认)- "after_turn":优雅地完成当前轮次后停止 * 允许 LLM 响应完成 * 执行待处理的工具调用 * 正确保存会话状态 * 准确跟踪使用情况 * 在开始下一轮次之前停止

'immediate'
示例
result = Runner.run_streamed(agent, "Task", session=session)

async for event in result.stream_events():
    if user_interrupted():
        result.cancel(mode="after_turn")  # Graceful
        # result.cancel()  # Immediate (default)

注意:调用 cancel() 后,您应该继续使用 stream_events() 以允许取消正确完成。

源代码位于 src/agents/result.py
def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
    """Cancel the streaming run.

    Args:
        mode: Cancellation strategy:
            - "immediate": Stop immediately, cancel all tasks, clear queues (default)
            - "after_turn": Complete current turn gracefully before stopping
                * Allows LLM response to finish
                * Executes pending tool calls
                * Saves session state properly
                * Tracks usage accurately
                * Stops before next turn begins

    Example:
        ```python
        result = Runner.run_streamed(agent, "Task", session=session)

        async for event in result.stream_events():
            if user_interrupted():
                result.cancel(mode="after_turn")  # Graceful
                # result.cancel()  # Immediate (default)
        ```

    Note: After calling cancel(), you should continue consuming stream_events()
    to allow the cancellation to complete properly.
    """
    # Store the cancel mode for the background task to check
    self._cancel_mode = mode

    if mode == "immediate":
        # Existing behavior - immediate shutdown
        self._cleanup_tasks()  # Cancel all running tasks
        self.is_complete = True  # Mark the run as complete to stop event streaming

        # Optionally, clear the event queue to prevent processing stale events
        while not self._event_queue.empty():
            self._event_queue.get_nowait()
        while not self._input_guardrail_queue.empty():
            self._input_guardrail_queue.get_nowait()

    elif mode == "after_turn":
        # Soft cancel - just set the flag
        # The streaming loop will check this and stop gracefully
        # Don't call _cleanup_tasks() or clear queues yet
        pass

stream_events async

stream_events() -> AsyncIterator[StreamEvent]

流式传输生成的新项的增量变化。我们使用的是 OpenAI Responses API 中的类型,因此这些是语义事件:每个事件都有一个 type 字段,用于描述事件的类型,以及该事件的数据。

这将引发:- 如果代理超过 max_turns 限制,则引发 MaxTurnsExceeded 异常。- 如果触发了护栏,则引发 GuardrailTripwireTriggered 异常。

源代码位于 src/agents/result.py
async def stream_events(self) -> AsyncIterator[StreamEvent]:
    """Stream deltas for new items as they are generated. We're using the types from the
    OpenAI Responses API, so these are semantic events: each event has a `type` field that
    describes the type of the event, along with the data for that event.

    This will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """
    cancelled = False
    try:
        while True:
            self._check_errors()
            if self._stored_exception:
                logger.debug("Breaking due to stored exception")
                self.is_complete = True
                break

            if self.is_complete and self._event_queue.empty():
                break

            try:
                item = await self._event_queue.get()
            except asyncio.CancelledError:
                cancelled = True
                self.cancel()
                raise

            if isinstance(item, QueueCompleteSentinel):
                # Await input guardrails if they are still running, so late
                # exceptions are captured.
                await self._await_task_safely(self._input_guardrails_task)

                self._event_queue.task_done()

                # Check for errors, in case the queue was completed
                # due to an exception
                self._check_errors()
                break

            yield item
            self._event_queue.task_done()
    finally:
        if cancelled:
            # Cancellation should return promptly, so avoid waiting on long-running tasks.
            # Tasks have already been cancelled above.
            self._cleanup_tasks()
        else:
            # Ensure main execution completes before cleanup to avoid race conditions
            # with session operations
            await self._await_task_safely(self._run_impl_task)
            # Safely terminate all background tasks after main execution has finished
            self._cleanup_tasks()

    if self._stored_exception:
        raise self._stored_exception

release_agents

release_agents(*, release_new_items: bool = True) -> None

释放此结果中持有的代理的强引用。调用此方法后,访问 item.agentlast_agent 可能会返回 None,如果代理已被垃圾回收。调用者可以在完成检查结果后使用此方法,以主动释放任何关联的代理图。

源代码位于 src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

一种便捷的方法,用于将最终输出转换为特定类型。默认情况下,转换仅用于类型检查器。如果将 raise_if_incorrect_type 设置为 True,我们将在最终输出不是给定类型时引发 TypeError。

参数

名称 类型 描述 默认
cls type[T]

要将最终输出转换为的类型。

required
raise_if_incorrect_type bool

如果为 True,我们将在最终输出不是给定类型时引发 TypeError。

False

返回值

类型 描述
T

转换为给定类型的最终输出。

源代码位于 src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

创建一个新的输入列表,合并原始输入和生成的所有新项。

源代码位于 src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items