跳过内容

Agents

ToolsToFinalOutputFunction 模块属性

ToolsToFinalOutputFunction: TypeAlias = Callable[
    [RunContextWrapper[TContext], list[FunctionToolResult]],
    MaybeAwaitable[ToolsToFinalOutputResult],
]

一个函数,它接收运行上下文和工具结果列表,并返回一个 ToolsToFinalOutputResult

ToolsToFinalOutputResult dataclass

源代码在 src/agents/agent.py
@dataclass
class ToolsToFinalOutputResult:
    is_final_output: bool
    """Whether this is the final output. If False, the LLM will run again and receive the tool call
    output.
    """

    final_output: Any | None = None
    """The final output. Can be None if `is_final_output` is False, otherwise must match the
    `output_type` of the agent.
    """

is_final_output 实例属性

is_final_output: bool

是否为最终输出。如果为 False,LLM 将再次运行并接收工具调用的输出。

final_output 类属性 实例属性

final_output: Any | None = None

最终输出。如果 is_final_output 为 False,则可以为 None,否则必须与代理的 output_type 匹配。

AgentToolStreamEvent

基类: TypedDict

当代理作为工具调用时发出的流式事件。

源代码在 src/agents/agent.py
class AgentToolStreamEvent(TypedDict):
    """Streaming event emitted when an agent is invoked as a tool."""

    event: StreamEvent
    """The streaming event from the nested agent run."""

    agent: Agent[Any]
    """The nested agent emitting the event."""

    tool_call: ResponseFunctionToolCall | None
    """The originating tool call, if available."""

event 实例属性

event: StreamEvent

来自嵌套代理运行的流式事件。

agent 实例属性

agent: Agent[Any]

发出事件的嵌套代理。

tool_call 实例属性

tool_call: ResponseFunctionToolCall | None

原始工具调用(如果可用)。

StopAtTools

基类: TypedDict

源代码在 src/agents/agent.py
class StopAtTools(TypedDict):
    stop_at_tool_names: list[str]
    """A list of tool names, any of which will stop the agent from running further."""

stop_at_tool_names 实例属性

stop_at_tool_names: list[str]

一个工具名称列表,其中任何一个都将停止代理进一步运行。

MCPConfig

基类: TypedDict

MCP 服务器的配置。

源代码在 src/agents/agent.py
class MCPConfig(TypedDict):
    """Configuration for MCP servers."""

    convert_schemas_to_strict: NotRequired[bool]
    """If True, we will attempt to convert the MCP schemas to strict-mode schemas. This is a
    best-effort conversion, so some schemas may not be convertible. Defaults to False.
    """

convert_schemas_to_strict 实例属性

convert_schemas_to_strict: NotRequired[bool]

如果为 True,我们将尝试将 MCP 模式转换为严格模式模式。这是一个尽力而为的转换,因此某些模式可能无法转换。默认为 False。

AgentBase dataclass

基类: Generic[TContext]

AgentRealtimeAgent 的基类。

源代码在 src/agents/agent.py
@dataclass
class AgentBase(Generic[TContext]):
    """Base class for `Agent` and `RealtimeAgent`."""

    name: str
    """The name of the agent."""

    handoff_description: str | None = None
    """A description of the agent. This is used when the agent is used as a handoff, so that an
    LLM knows what it does and when to invoke it.
    """

    tools: list[Tool] = field(default_factory=list)
    """A list of tools that the agent can use."""

    mcp_servers: list[MCPServer] = field(default_factory=list)
    """A list of [Model Context Protocol](https://modelcontextprotocol.com.cn/) servers that
    the agent can use. Every time the agent runs, it will include tools from these servers in the
    list of available tools.

    NOTE: You are expected to manage the lifecycle of these servers. Specifically, you must call
    `server.connect()` before passing it to the agent, and `server.cleanup()` when the server is no
    longer needed.
    """

    mcp_config: MCPConfig = field(default_factory=lambda: MCPConfig())
    """Configuration for MCP servers."""

    async def get_mcp_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
        """Fetches the available tools from the MCP servers."""
        convert_schemas_to_strict = self.mcp_config.get("convert_schemas_to_strict", False)
        return await MCPUtil.get_all_function_tools(
            self.mcp_servers, convert_schemas_to_strict, run_context, self
        )

    async def get_all_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
        """All agent tools, including MCP tools and function tools."""
        mcp_tools = await self.get_mcp_tools(run_context)

        async def _check_tool_enabled(tool: Tool) -> bool:
            if not isinstance(tool, FunctionTool):
                return True

            attr = tool.is_enabled
            if isinstance(attr, bool):
                return attr
            res = attr(run_context, self)
            if inspect.isawaitable(res):
                return bool(await res)
            return bool(res)

        results = await asyncio.gather(*(_check_tool_enabled(t) for t in self.tools))
        enabled: list[Tool] = [t for t, ok in zip(self.tools, results) if ok]
        return [*mcp_tools, *enabled]

name instance-attribute

name: str

代理的名称。

handoff_description 类属性 实例属性

handoff_description: str | None = None

代理的描述。当代理用作交接时使用,以便 LLM 知道它做什么以及何时调用它。

tools 类属性 实例属性

tools: list[Tool] = field(default_factory=list)

代理可以使用的工具列表。

mcp_servers 类属性 实例属性

mcp_servers: list[MCPServer] = field(default_factory=list)

一个 模型上下文协议 服务器列表,代理可以使用这些服务器。每次代理运行时,它都会将来自这些服务器的工具包含在可用工具列表中。

注意:您需要管理这些服务器的生命周期。具体来说,您必须在将其传递给代理之前调用 server.connect(),并在服务器不再需要时调用 server.cleanup()

mcp_config 类属性 实例属性

mcp_config: MCPConfig = field(
    default_factory=lambda: MCPConfig()
)

MCP 服务器的配置。

get_mcp_tools 异步

get_mcp_tools(
    run_context: RunContextWrapper[TContext],
) -> list[Tool]

从 MCP 服务器获取可用的工具。

源代码在 src/agents/agent.py
async def get_mcp_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
    """Fetches the available tools from the MCP servers."""
    convert_schemas_to_strict = self.mcp_config.get("convert_schemas_to_strict", False)
    return await MCPUtil.get_all_function_tools(
        self.mcp_servers, convert_schemas_to_strict, run_context, self
    )

get_all_tools 异步

get_all_tools(
    run_context: RunContextWrapper[TContext],
) -> list[Tool]

所有代理工具,包括 MCP 工具和函数工具。

源代码在 src/agents/agent.py
async def get_all_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
    """All agent tools, including MCP tools and function tools."""
    mcp_tools = await self.get_mcp_tools(run_context)

    async def _check_tool_enabled(tool: Tool) -> bool:
        if not isinstance(tool, FunctionTool):
            return True

        attr = tool.is_enabled
        if isinstance(attr, bool):
            return attr
        res = attr(run_context, self)
        if inspect.isawaitable(res):
            return bool(await res)
        return bool(res)

    results = await asyncio.gather(*(_check_tool_enabled(t) for t in self.tools))
    enabled: list[Tool] = [t for t, ok in zip(self.tools, results) if ok]
    return [*mcp_tools, *enabled]

Agent dataclass

基类: AgentBase, Generic[TContext]

代理是一种配置了指令、工具、护栏、交接等的人工智能模型。

我们强烈建议传递 instructions,这是代理的“系统提示”。此外,您可以传递 handoff_description,这是代理的人类可读描述,当代理在工具/交接中使用时使用。

代理在上下文类型上是通用的。上下文是您创建的(可变)对象。它传递给工具函数、交接、护栏等。

请参阅 AgentBase 以获取与 RealtimeAgent 共享的基本参数。

源代码在 src/agents/agent.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
@dataclass
class Agent(AgentBase, Generic[TContext]):
    """An agent is an AI model configured with instructions, tools, guardrails, handoffs and more.

    We strongly recommend passing `instructions`, which is the "system prompt" for the agent. In
    addition, you can pass `handoff_description`, which is a human-readable description of the
    agent, used when the agent is used inside tools/handoffs.

    Agents are generic on the context type. The context is a (mutable) object you create. It is
    passed to tool functions, handoffs, guardrails, etc.

    See `AgentBase` for base parameters that are shared with `RealtimeAgent`s.
    """

    instructions: (
        str
        | Callable[
            [RunContextWrapper[TContext], Agent[TContext]],
            MaybeAwaitable[str],
        ]
        | None
    ) = None
    """The instructions for the agent. Will be used as the "system prompt" when this agent is
    invoked. Describes what the agent should do, and how it responds.

    Can either be a string, or a function that dynamically generates instructions for the agent. If
    you provide a function, it will be called with the context and the agent instance. It must
    return a string.
    """

    prompt: Prompt | DynamicPromptFunction | None = None
    """A prompt object (or a function that returns a Prompt). Prompts allow you to dynamically
    configure the instructions, tools and other config for an agent outside of your code. Only
    usable with OpenAI models, using the Responses API.
    """

    handoffs: list[Agent[Any] | Handoff[TContext, Any]] = field(default_factory=list)
    """Handoffs are sub-agents that the agent can delegate to. You can provide a list of handoffs,
    and the agent can choose to delegate to them if relevant. Allows for separation of concerns and
    modularity.
    """

    model: str | Model | None = None
    """The model implementation to use when invoking the LLM.

    By default, if not set, the agent will use the default model configured in
    `agents.models.get_default_model()` (currently "gpt-4.1").
    """

    model_settings: ModelSettings = field(default_factory=get_default_model_settings)
    """Configures model-specific tuning parameters (e.g. temperature, top_p).
    """

    input_guardrails: list[InputGuardrail[TContext]] = field(default_factory=list)
    """A list of checks that run in parallel to the agent's execution, before generating a
    response. Runs only if the agent is the first agent in the chain.
    """

    output_guardrails: list[OutputGuardrail[TContext]] = field(default_factory=list)
    """A list of checks that run on the final output of the agent, after generating a response.
    Runs only if the agent produces a final output.
    """

    output_type: type[Any] | AgentOutputSchemaBase | None = None
    """The type of the output object. If not provided, the output will be `str`. In most cases,
    you should pass a regular Python type (e.g. a dataclass, Pydantic model, TypedDict, etc).
    You can customize this in two ways:
    1. If you want non-strict schemas, pass `AgentOutputSchema(MyClass, strict_json_schema=False)`.
    2. If you want to use a custom JSON schema (i.e. without using the SDK's automatic schema)
       creation, subclass and pass an `AgentOutputSchemaBase` subclass.
    """

    hooks: AgentHooks[TContext] | None = None
    """A class that receives callbacks on various lifecycle events for this agent.
    """

    tool_use_behavior: (
        Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction
    ) = "run_llm_again"
    """
    This lets you configure how tool use is handled.
    - "run_llm_again": The default behavior. Tools are run, and then the LLM receives the results
        and gets to respond.
    - "stop_on_first_tool": The output from the first tool call is treated as the final result.
        In other words, it isn’t sent back to the LLM for further processing but is used directly
        as the final output.
    - A StopAtTools object: The agent will stop running if any of the tools listed in
        `stop_at_tool_names` is called.
        The final output will be the output of the first matching tool call.
        The LLM does not process the result of the tool call.
    - A function: If you pass a function, it will be called with the run context and the list of
      tool results. It must return a `ToolsToFinalOutputResult`, which determines whether the tool
      calls result in a final output.

      NOTE: This configuration is specific to FunctionTools. Hosted tools, such as file search,
      web search, etc. are always processed by the LLM.
    """

    reset_tool_choice: bool = True
    """Whether to reset the tool choice to the default value after a tool has been called. Defaults
    to True. This ensures that the agent doesn't enter an infinite loop of tool usage."""

    def __post_init__(self):
        from typing import get_origin

        if not isinstance(self.name, str):
            raise TypeError(f"Agent name must be a string, got {type(self.name).__name__}")

        if self.handoff_description is not None and not isinstance(self.handoff_description, str):
            raise TypeError(
                f"Agent handoff_description must be a string or None, "
                f"got {type(self.handoff_description).__name__}"
            )

        if not isinstance(self.tools, list):
            raise TypeError(f"Agent tools must be a list, got {type(self.tools).__name__}")

        if not isinstance(self.mcp_servers, list):
            raise TypeError(
                f"Agent mcp_servers must be a list, got {type(self.mcp_servers).__name__}"
            )

        if not isinstance(self.mcp_config, dict):
            raise TypeError(
                f"Agent mcp_config must be a dict, got {type(self.mcp_config).__name__}"
            )

        if (
            self.instructions is not None
            and not isinstance(self.instructions, str)
            and not callable(self.instructions)
        ):
            raise TypeError(
                f"Agent instructions must be a string, callable, or None, "
                f"got {type(self.instructions).__name__}"
            )

        if (
            self.prompt is not None
            and not callable(self.prompt)
            and not hasattr(self.prompt, "get")
        ):
            raise TypeError(
                f"Agent prompt must be a Prompt, DynamicPromptFunction, or None, "
                f"got {type(self.prompt).__name__}"
            )

        if not isinstance(self.handoffs, list):
            raise TypeError(f"Agent handoffs must be a list, got {type(self.handoffs).__name__}")

        if self.model is not None and not isinstance(self.model, str):
            from .models.interface import Model

            if not isinstance(self.model, Model):
                raise TypeError(
                    f"Agent model must be a string, Model, or None, got {type(self.model).__name__}"
                )

        if not isinstance(self.model_settings, ModelSettings):
            raise TypeError(
                f"Agent model_settings must be a ModelSettings instance, "
                f"got {type(self.model_settings).__name__}"
            )

        if (
            # The user sets a non-default model
            self.model is not None
            and (
                # The default model is gpt-5
                is_gpt_5_default() is True
                # However, the specified model is not a gpt-5 model
                and (
                    isinstance(self.model, str) is False
                    or gpt_5_reasoning_settings_required(self.model) is False  # type: ignore
                )
                # The model settings are not customized for the specified model
                and self.model_settings == get_default_model_settings()
            )
        ):
            # In this scenario, we should use a generic model settings
            # because non-gpt-5 models are not compatible with the default gpt-5 model settings.
            # This is a best-effort attempt to make the agent work with non-gpt-5 models.
            self.model_settings = ModelSettings()

        if not isinstance(self.input_guardrails, list):
            raise TypeError(
                f"Agent input_guardrails must be a list, got {type(self.input_guardrails).__name__}"
            )

        if not isinstance(self.output_guardrails, list):
            raise TypeError(
                f"Agent output_guardrails must be a list, "
                f"got {type(self.output_guardrails).__name__}"
            )

        if self.output_type is not None:
            from .agent_output import AgentOutputSchemaBase

            if not (
                isinstance(self.output_type, (type, AgentOutputSchemaBase))
                or get_origin(self.output_type) is not None
            ):
                raise TypeError(
                    f"Agent output_type must be a type, AgentOutputSchemaBase, or None, "
                    f"got {type(self.output_type).__name__}"
                )

        if self.hooks is not None:
            from .lifecycle import AgentHooksBase

            if not isinstance(self.hooks, AgentHooksBase):
                raise TypeError(
                    f"Agent hooks must be an AgentHooks instance or None, "
                    f"got {type(self.hooks).__name__}"
                )

        if (
            not (
                isinstance(self.tool_use_behavior, str)
                and self.tool_use_behavior in ["run_llm_again", "stop_on_first_tool"]
            )
            and not isinstance(self.tool_use_behavior, dict)
            and not callable(self.tool_use_behavior)
        ):
            raise TypeError(
                f"Agent tool_use_behavior must be 'run_llm_again', 'stop_on_first_tool', "
                f"StopAtTools dict, or callable, got {type(self.tool_use_behavior).__name__}"
            )

        if not isinstance(self.reset_tool_choice, bool):
            raise TypeError(
                f"Agent reset_tool_choice must be a boolean, "
                f"got {type(self.reset_tool_choice).__name__}"
            )

    def clone(self, **kwargs: Any) -> Agent[TContext]:
        """Make a copy of the agent, with the given arguments changed.
        Notes:
            - Uses `dataclasses.replace`, which performs a **shallow copy**.
            - Mutable attributes like `tools` and `handoffs` are shallow-copied:
              new list objects are created only if overridden, but their contents
              (tool functions and handoff objects) are shared with the original.
            - To modify these independently, pass new lists when calling `clone()`.
        Example:
            ```python
            new_agent = agent.clone(instructions="New instructions")
            ```
        """
        return dataclasses.replace(self, **kwargs)

    def as_tool(
        self,
        tool_name: str | None,
        tool_description: str | None,
        custom_output_extractor: (
            Callable[[RunResult | RunResultStreaming], Awaitable[str]] | None
        ) = None,
        is_enabled: bool
        | Callable[[RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool]] = True,
        on_stream: Callable[[AgentToolStreamEvent], MaybeAwaitable[None]] | None = None,
        run_config: RunConfig | None = None,
        max_turns: int | None = None,
        hooks: RunHooks[TContext] | None = None,
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        session: Session | None = None,
        failure_error_function: ToolErrorFunction | None = default_tool_error_function,
    ) -> Tool:
        """Transform this agent into a tool, callable by other agents.

        This is different from handoffs in two ways:
        1. In handoffs, the new agent receives the conversation history. In this tool, the new agent
           receives generated input.
        2. In handoffs, the new agent takes over the conversation. In this tool, the new agent is
           called as a tool, and the conversation is continued by the original agent.

        Args:
            tool_name: The name of the tool. If not provided, the agent's name will be used.
            tool_description: The description of the tool, which should indicate what it does and
                when to use it.
            custom_output_extractor: A function that extracts the output from the agent. If not
                provided, the last message from the agent will be used.
            is_enabled: Whether the tool is enabled. Can be a bool or a callable that takes the run
                context and agent and returns whether the tool is enabled. Disabled tools are hidden
                from the LLM at runtime.
            on_stream: Optional callback (sync or async) to receive streaming events from the nested
                agent run. The callback receives an `AgentToolStreamEvent` containing the nested
                agent, the originating tool call (when available), and each stream event. When
                provided, the nested agent is executed in streaming mode.
            failure_error_function: If provided, generate an error message when the tool (agent) run
                fails. The message is sent to the LLM. If None, the exception is raised instead.
        """

        @function_tool(
            name_override=tool_name or _transforms.transform_string_function_style(self.name),
            description_override=tool_description or "",
            is_enabled=is_enabled,
            failure_error_function=failure_error_function,
        )
        async def run_agent(context: ToolContext, input: str) -> Any:
            from .run import DEFAULT_MAX_TURNS, Runner

            resolved_max_turns = max_turns if max_turns is not None else DEFAULT_MAX_TURNS
            run_result: RunResult | RunResultStreaming

            if on_stream is not None:
                run_result = Runner.run_streamed(
                    starting_agent=self,
                    input=input,
                    context=context.context,
                    run_config=run_config,
                    max_turns=resolved_max_turns,
                    hooks=hooks,
                    previous_response_id=previous_response_id,
                    conversation_id=conversation_id,
                    session=session,
                )
                # Dispatch callbacks in the background so slow handlers do not block
                # event consumption.
                event_queue: asyncio.Queue[AgentToolStreamEvent | None] = asyncio.Queue()

                async def _run_handler(payload: AgentToolStreamEvent) -> None:
                    """Execute the user callback while capturing exceptions."""
                    try:
                        maybe_result = on_stream(payload)
                        if inspect.isawaitable(maybe_result):
                            await maybe_result
                    except Exception:
                        logger.exception(
                            "Error while handling on_stream event for agent tool %s.",
                            self.name,
                        )

                async def dispatch_stream_events() -> None:
                    while True:
                        payload = await event_queue.get()
                        is_sentinel = payload is None  # None marks the end of the stream.
                        try:
                            if payload is not None:
                                await _run_handler(payload)
                        finally:
                            event_queue.task_done()

                        if is_sentinel:
                            break

                dispatch_task = asyncio.create_task(dispatch_stream_events())

                try:
                    from .stream_events import AgentUpdatedStreamEvent

                    current_agent = run_result.current_agent
                    async for event in run_result.stream_events():
                        if isinstance(event, AgentUpdatedStreamEvent):
                            current_agent = event.new_agent

                        payload: AgentToolStreamEvent = {
                            "event": event,
                            "agent": current_agent,
                            "tool_call": context.tool_call,
                        }
                        await event_queue.put(payload)
                finally:
                    await event_queue.put(None)
                    await event_queue.join()
                    await dispatch_task
            else:
                run_result = await Runner.run(
                    starting_agent=self,
                    input=input,
                    context=context.context,
                    run_config=run_config,
                    max_turns=resolved_max_turns,
                    hooks=hooks,
                    previous_response_id=previous_response_id,
                    conversation_id=conversation_id,
                    session=session,
                )
            if custom_output_extractor:
                return await custom_output_extractor(run_result)

            return run_result.final_output

        return run_agent

    async def get_system_prompt(self, run_context: RunContextWrapper[TContext]) -> str | None:
        if isinstance(self.instructions, str):
            return self.instructions
        elif callable(self.instructions):
            # Inspect the signature of the instructions function
            sig = inspect.signature(self.instructions)
            params = list(sig.parameters.values())

            # Enforce exactly 2 parameters
            if len(params) != 2:
                raise TypeError(
                    f"'instructions' callable must accept exactly 2 arguments (context, agent), "
                    f"but got {len(params)}: {[p.name for p in params]}"
                )

            # Call the instructions function properly
            if inspect.iscoroutinefunction(self.instructions):
                return await cast(Awaitable[str], self.instructions(run_context, self))
            else:
                return cast(str, self.instructions(run_context, self))

        elif self.instructions is not None:
            logger.error(
                f"Instructions must be a string or a callable function, "
                f"got {type(self.instructions).__name__}"
            )

        return None

    async def get_prompt(
        self, run_context: RunContextWrapper[TContext]
    ) -> ResponsePromptParam | None:
        """Get the prompt for the agent."""
        return await PromptUtil.to_model_input(self.prompt, run_context, self)

instructions 类属性 实例属性

instructions: (
    str
    | Callable[
        [RunContextWrapper[TContext], Agent[TContext]],
        MaybeAwaitable[str],
    ]
    | None
) = None

代理的指令。将在调用此代理时用作“系统提示”。描述了代理应该做什么以及如何响应。

可以是字符串,也可以是动态生成代理指令的函数。如果您提供一个函数,它将使用上下文和代理实例调用。它必须返回一个字符串。

prompt 类属性 实例属性

prompt: Prompt | DynamicPromptFunction | None = None

一个提示对象(或返回提示的函数)。提示允许您动态配置代理的指令、工具和其他配置,而无需在代码中进行配置。仅适用于使用 Responses API 的 OpenAI 模型。

handoffs 类属性 实例属性

handoffs: list[Agent[Any] | Handoff[TContext, Any]] = field(
    default_factory=list
)

代理可以委托的子代理。您可以提供一个交接列表,并且代理可以选择委托给它们(如果相关)。允许关注点分离和模块化。

model 类属性 实例属性

model: str | Model | None = None

调用 LLM 时要使用的模型实现。

默认情况下,如果未设置,代理将使用在 agents.models.get_default_model()(当前为“gpt-4.1”)中配置的默认模型。

model_settings 类属性 实例属性

model_settings: ModelSettings = field(
    default_factory=get_default_model_settings
)

配置特定于模型的调整参数(例如温度、top_p)。

input_guardrails 类属性 实例属性

input_guardrails: list[InputGuardrail[TContext]] = field(
    default_factory=list
)

在代理生成响应之前并行运行的检查列表。仅当代理是链中的第一个代理时才运行。

output_guardrails 类属性 实例属性

output_guardrails: list[OutputGuardrail[TContext]] = field(
    default_factory=list
)

在代理生成响应后运行在代理的最终输出上的检查列表。仅当代理生成最终输出时才运行。

output_type 类属性 实例属性

output_type: type[Any] | AgentOutputSchemaBase | None = None

输出对象的类型。如果未提供,输出将为 str。在大多数情况下,您应该传递一个常规的 Python 类型(例如,dataclass、Pydantic 模型、TypedDict 等)。您可以通过两种方式自定义它:1. 如果您想要非严格模式,请传递 AgentOutputSchema(MyClass, strict_json_schema=False)。 2. 如果您想要使用自定义 JSON 模式(即,不使用 SDK 的自动模式创建),请子类化并传递一个 AgentOutputSchemaBase 子类。

hooks 类属性 实例属性

hooks: AgentHooks[TContext] | None = None

一个类,它接收有关此代理的各种生命周期事件的回调。

tool_use_behavior 类属性 实例属性

tool_use_behavior: (
    Literal["run_llm_again", "stop_on_first_tool"]
    | StopAtTools
    | ToolsToFinalOutputFunction
) = "run_llm_again"

这让您可以配置如何处理工具使用。 - "run_llm_again": 默认行为。工具运行后,LLM 接收结果并做出响应。 - "stop_on_first_tool": 第一个工具调用的输出被视为最终结果。换句话说,它不会被发送回 LLM 以进行进一步处理,而是直接用作最终输出。 - 一个 StopAtTools 对象:如果 stop_at_tool_names 中列出的任何工具被调用,代理将停止运行。最终输出将是第一个匹配工具调用的输出。LLM 不处理工具调用的结果。 - 一个函数:如果您传递一个函数,它将使用运行上下文和工具结果列表调用。它必须返回一个 ToolsToFinalOutputResult,它确定工具调用的结果是否产生最终输出。

注意:此配置特定于 FunctionTools。托管工具,例如文件搜索、网络搜索等,始终由 LLM 处理。

reset_tool_choice 类属性 实例属性

reset_tool_choice: bool = True

工具调用后是否重置工具选择。默认为 True。这可确保代理不会进入无限的工具使用循环。

name instance-attribute

name: str

代理的名称。

handoff_description 类属性 实例属性

handoff_description: str | None = None

代理的描述。当代理用作交接时使用,以便 LLM 知道它做什么以及何时调用它。

tools 类属性 实例属性

tools: list[Tool] = field(default_factory=list)

代理可以使用的工具列表。

mcp_servers 类属性 实例属性

mcp_servers: list[MCPServer] = field(default_factory=list)

一个 模型上下文协议 服务器列表,代理可以使用这些服务器。每次代理运行时,它都会将来自这些服务器的工具包含在可用工具列表中。

注意:您需要管理这些服务器的生命周期。具体来说,您必须在将其传递给代理之前调用 server.connect(),并在服务器不再需要时调用 server.cleanup()

mcp_config 类属性 实例属性

mcp_config: MCPConfig = field(
    default_factory=lambda: MCPConfig()
)

MCP 服务器的配置。

clone

clone(**kwargs: Any) -> Agent[TContext]

使用给定的参数更改的代理的副本。注意:- 使用 dataclasses.replace,它执行 浅拷贝。 - 可变属性(如 toolshandoffs)是浅拷贝的:仅当被覆盖时才创建新的列表对象,但其内容(工具函数和交接对象)与原始对象共享。 - 要独立修改这些属性,请在调用 clone() 时传递新的列表。示例

new_agent = agent.clone(instructions="New instructions")

源代码在 src/agents/agent.py
def clone(self, **kwargs: Any) -> Agent[TContext]:
    """Make a copy of the agent, with the given arguments changed.
    Notes:
        - Uses `dataclasses.replace`, which performs a **shallow copy**.
        - Mutable attributes like `tools` and `handoffs` are shallow-copied:
          new list objects are created only if overridden, but their contents
          (tool functions and handoff objects) are shared with the original.
        - To modify these independently, pass new lists when calling `clone()`.
    Example:
        ```python
        new_agent = agent.clone(instructions="New instructions")
        ```
    """
    return dataclasses.replace(self, **kwargs)

as_tool

as_tool(
    tool_name: str | None,
    tool_description: str | None,
    custom_output_extractor: Callable[
        [RunResult | RunResultStreaming], Awaitable[str]
    ]
    | None = None,
    is_enabled: bool
    | Callable[
        [RunContextWrapper[Any], AgentBase[Any]],
        MaybeAwaitable[bool],
    ] = True,
    on_stream: Callable[
        [AgentToolStreamEvent], MaybeAwaitable[None]
    ]
    | None = None,
    run_config: RunConfig | None = None,
    max_turns: int | None = None,
    hooks: RunHooks[TContext] | None = None,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    session: Session | None = None,
    failure_error_function: ToolErrorFunction
    | None = default_tool_error_function,
) -> Tool

将此代理转换为工具,其他代理可以调用它。

这与交接有两点不同:1. 在交接中,新的代理接收对话历史记录。在此工具中,新的代理接收生成的输入。 2. 在交接中,新的代理接管对话。在此工具中,新的代理作为工具调用,对话由原始代理继续进行。

参数

名称 类型 描述 默认
tool_name str | None

工具的名称。如果未提供,将使用代理的名称。

required
tool_description str | None

工具的描述,应说明它做什么以及何时使用它。

required
custom_output_extractor Callable[[RunResult | RunResultStreaming], Awaitable[str]] | None

一个提取代理输出的函数。如果未提供,将使用代理的最后一条消息。

None
is_enabled bool | Callable[[RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool]]

工具是否启用。可以是布尔值,也可以是接受运行上下文和代理并返回工具是否启用的可调用对象。禁用的工具在运行时对 LLM 隐藏。

True
on_stream Callable[[AgentToolStreamEvent], MaybeAwaitable[None]] | None

可选的回调函数(同步或异步),用于接收嵌套代理运行的流式事件。该回调函数接收一个 AgentToolStreamEvent,其中包含嵌套代理、原始工具调用(如果可用)以及每个流式事件。如果提供,嵌套代理将以流式模式执行。

None
failure_error_function ToolErrorFunction | None

如果提供,则在工具(代理)运行失败时生成错误消息。该消息将发送到 LLM。如果为 None,则会引发异常。

default_tool_error_function
源代码在 src/agents/agent.py
def as_tool(
    self,
    tool_name: str | None,
    tool_description: str | None,
    custom_output_extractor: (
        Callable[[RunResult | RunResultStreaming], Awaitable[str]] | None
    ) = None,
    is_enabled: bool
    | Callable[[RunContextWrapper[Any], AgentBase[Any]], MaybeAwaitable[bool]] = True,
    on_stream: Callable[[AgentToolStreamEvent], MaybeAwaitable[None]] | None = None,
    run_config: RunConfig | None = None,
    max_turns: int | None = None,
    hooks: RunHooks[TContext] | None = None,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    session: Session | None = None,
    failure_error_function: ToolErrorFunction | None = default_tool_error_function,
) -> Tool:
    """Transform this agent into a tool, callable by other agents.

    This is different from handoffs in two ways:
    1. In handoffs, the new agent receives the conversation history. In this tool, the new agent
       receives generated input.
    2. In handoffs, the new agent takes over the conversation. In this tool, the new agent is
       called as a tool, and the conversation is continued by the original agent.

    Args:
        tool_name: The name of the tool. If not provided, the agent's name will be used.
        tool_description: The description of the tool, which should indicate what it does and
            when to use it.
        custom_output_extractor: A function that extracts the output from the agent. If not
            provided, the last message from the agent will be used.
        is_enabled: Whether the tool is enabled. Can be a bool or a callable that takes the run
            context and agent and returns whether the tool is enabled. Disabled tools are hidden
            from the LLM at runtime.
        on_stream: Optional callback (sync or async) to receive streaming events from the nested
            agent run. The callback receives an `AgentToolStreamEvent` containing the nested
            agent, the originating tool call (when available), and each stream event. When
            provided, the nested agent is executed in streaming mode.
        failure_error_function: If provided, generate an error message when the tool (agent) run
            fails. The message is sent to the LLM. If None, the exception is raised instead.
    """

    @function_tool(
        name_override=tool_name or _transforms.transform_string_function_style(self.name),
        description_override=tool_description or "",
        is_enabled=is_enabled,
        failure_error_function=failure_error_function,
    )
    async def run_agent(context: ToolContext, input: str) -> Any:
        from .run import DEFAULT_MAX_TURNS, Runner

        resolved_max_turns = max_turns if max_turns is not None else DEFAULT_MAX_TURNS
        run_result: RunResult | RunResultStreaming

        if on_stream is not None:
            run_result = Runner.run_streamed(
                starting_agent=self,
                input=input,
                context=context.context,
                run_config=run_config,
                max_turns=resolved_max_turns,
                hooks=hooks,
                previous_response_id=previous_response_id,
                conversation_id=conversation_id,
                session=session,
            )
            # Dispatch callbacks in the background so slow handlers do not block
            # event consumption.
            event_queue: asyncio.Queue[AgentToolStreamEvent | None] = asyncio.Queue()

            async def _run_handler(payload: AgentToolStreamEvent) -> None:
                """Execute the user callback while capturing exceptions."""
                try:
                    maybe_result = on_stream(payload)
                    if inspect.isawaitable(maybe_result):
                        await maybe_result
                except Exception:
                    logger.exception(
                        "Error while handling on_stream event for agent tool %s.",
                        self.name,
                    )

            async def dispatch_stream_events() -> None:
                while True:
                    payload = await event_queue.get()
                    is_sentinel = payload is None  # None marks the end of the stream.
                    try:
                        if payload is not None:
                            await _run_handler(payload)
                    finally:
                        event_queue.task_done()

                    if is_sentinel:
                        break

            dispatch_task = asyncio.create_task(dispatch_stream_events())

            try:
                from .stream_events import AgentUpdatedStreamEvent

                current_agent = run_result.current_agent
                async for event in run_result.stream_events():
                    if isinstance(event, AgentUpdatedStreamEvent):
                        current_agent = event.new_agent

                    payload: AgentToolStreamEvent = {
                        "event": event,
                        "agent": current_agent,
                        "tool_call": context.tool_call,
                    }
                    await event_queue.put(payload)
            finally:
                await event_queue.put(None)
                await event_queue.join()
                await dispatch_task
        else:
            run_result = await Runner.run(
                starting_agent=self,
                input=input,
                context=context.context,
                run_config=run_config,
                max_turns=resolved_max_turns,
                hooks=hooks,
                previous_response_id=previous_response_id,
                conversation_id=conversation_id,
                session=session,
            )
        if custom_output_extractor:
            return await custom_output_extractor(run_result)

        return run_result.final_output

    return run_agent

get_prompt async

get_prompt(
    run_context: RunContextWrapper[TContext],
) -> ResponsePromptParam | None

获取代理的提示语。

源代码在 src/agents/agent.py
async def get_prompt(
    self, run_context: RunContextWrapper[TContext]
) -> ResponsePromptParam | None:
    """Get the prompt for the agent."""
    return await PromptUtil.to_model_input(self.prompt, run_context, self)

get_mcp_tools 异步

get_mcp_tools(
    run_context: RunContextWrapper[TContext],
) -> list[Tool]

从 MCP 服务器获取可用的工具。

源代码在 src/agents/agent.py
async def get_mcp_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
    """Fetches the available tools from the MCP servers."""
    convert_schemas_to_strict = self.mcp_config.get("convert_schemas_to_strict", False)
    return await MCPUtil.get_all_function_tools(
        self.mcp_servers, convert_schemas_to_strict, run_context, self
    )

get_all_tools 异步

get_all_tools(
    run_context: RunContextWrapper[TContext],
) -> list[Tool]

所有代理工具,包括 MCP 工具和函数工具。

源代码在 src/agents/agent.py
async def get_all_tools(self, run_context: RunContextWrapper[TContext]) -> list[Tool]:
    """All agent tools, including MCP tools and function tools."""
    mcp_tools = await self.get_mcp_tools(run_context)

    async def _check_tool_enabled(tool: Tool) -> bool:
        if not isinstance(tool, FunctionTool):
            return True

        attr = tool.is_enabled
        if isinstance(attr, bool):
            return attr
        res = attr(run_context, self)
        if inspect.isawaitable(res):
            return bool(await res)
        return bool(res)

    results = await asyncio.gather(*(_check_tool_enabled(t) for t in self.tools))
    enabled: list[Tool] = [t for t, ok in zip(self.tools, results) if ok]
    return [*mcp_tools, *enabled]