跳过内容

MCP Servers

MCPServer

基础: ABC

模型上下文协议服务器的基类。

源代码位于 src/agents/mcp/server.py
class MCPServer(abc.ABC):
    """Base class for Model Context Protocol servers."""

    def __init__(self, use_structured_content: bool = False):
        """
        Args:
            use_structured_content: Whether to use `tool_result.structured_content` when calling an
                MCP tool.Defaults to False for backwards compatibility - most MCP servers still
                include the structured content in the `tool_result.content`, and using it by
                default will cause duplicate content. You can set this to True if you know the
                server will not duplicate the structured content in the `tool_result.content`.
        """
        self.use_structured_content = use_structured_content

    @abc.abstractmethod
    async def connect(self):
        """Connect to the server. For example, this might mean spawning a subprocess or
        opening a network connection. The server is expected to remain connected until
        `cleanup()` is called.
        """
        pass

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """A readable name for the server."""
        pass

    @abc.abstractmethod
    async def cleanup(self):
        """Cleanup the server. For example, this might mean closing a subprocess or
        closing a network connection.
        """
        pass

    @abc.abstractmethod
    async def list_tools(
        self,
        run_context: RunContextWrapper[Any] | None = None,
        agent: AgentBase | None = None,
    ) -> list[MCPTool]:
        """List the tools available on the server."""
        pass

    @abc.abstractmethod
    async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
        """Invoke a tool on the server."""
        pass

    @abc.abstractmethod
    async def list_prompts(
        self,
    ) -> ListPromptsResult:
        """List the prompts available on the server."""
        pass

    @abc.abstractmethod
    async def get_prompt(
        self, name: str, arguments: dict[str, Any] | None = None
    ) -> GetPromptResult:
        """Get a specific prompt from the server."""
        pass

name abstractmethod property

name: str

服务器的可读名称。

__init__

__init__(use_structured_content: bool = False)

参数

名称 类型 描述 默认
使用结构化内容 bool

是否在调用 MCP 工具时使用 tool_result.structured_content。默认值为 False,为了向后兼容 - 大多数 MCP 服务器仍然将结构化内容包含在 tool_result.content 中,默认使用会导致重复内容。如果确定服务器不会在 tool_result.content 中重复结构化内容,则可以将此值设置为 True。

False
源代码位于 src/agents/mcp/server.py
def __init__(self, use_structured_content: bool = False):
    """
    Args:
        use_structured_content: Whether to use `tool_result.structured_content` when calling an
            MCP tool.Defaults to False for backwards compatibility - most MCP servers still
            include the structured content in the `tool_result.content`, and using it by
            default will cause duplicate content. You can set this to True if you know the
            server will not duplicate the structured content in the `tool_result.content`.
    """
    self.use_structured_content = use_structured_content

connect abstractmethod async

connect()

连接到服务器。例如,这可能意味着启动一个子进程或打开一个网络连接。服务器应保持连接,直到调用 cleanup()

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def connect(self):
    """Connect to the server. For example, this might mean spawning a subprocess or
    opening a network connection. The server is expected to remain connected until
    `cleanup()` is called.
    """
    pass

cleanup abstractmethod async

cleanup()

清理服务器。例如,这可能意味着关闭一个子进程或关闭一个网络连接。

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def cleanup(self):
    """Cleanup the server. For example, this might mean closing a subprocess or
    closing a network connection.
    """
    pass

list_tools abstractmethod async

list_tools(
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[Tool]

列出服务器上可用的工具。

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def list_tools(
    self,
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[MCPTool]:
    """List the tools available on the server."""
    pass

call_tool abstractmethod async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

调用服务器上的工具。

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    pass

list_prompts abstractmethod async

list_prompts() -> ListPromptsResult

列出服务器上可用的提示。

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def list_prompts(
    self,
) -> ListPromptsResult:
    """List the prompts available on the server."""
    pass

get_prompt abstractmethod async

get_prompt(
    name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult

从服务器获取特定提示。

源代码位于 src/agents/mcp/server.py
@abc.abstractmethod
async def get_prompt(
    self, name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult:
    """Get a specific prompt from the server."""
    pass

MCPServerStdioParams

基础: TypedDict

镜像 mcp.client.stdio.StdioServerParameters,但允许您在没有其他导入的情况下传递参数。

源代码位于 src/agents/mcp/server.py
class MCPServerStdioParams(TypedDict):
    """Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another
    import.
    """

    command: str
    """The executable to run to start the server. For example, `python` or `node`."""

    args: NotRequired[list[str]]
    """Command line args to pass to the `command` executable. For example, `['foo.py']` or
    `['server.js', '--port', '8080']`."""

    env: NotRequired[dict[str, str]]
    """The environment variables to set for the server. ."""

    cwd: NotRequired[str | Path]
    """The working directory to use when spawning the process."""

    encoding: NotRequired[str]
    """The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`."""

    encoding_error_handler: NotRequired[Literal["strict", "ignore", "replace"]]
    """The text encoding error handler. Defaults to `strict`.

    See https://docs.pythonlang.cn/3/library/codecs.html#codec-base-classes for
    explanations of possible values.
    """

command instance-attribute

command: str

启动服务器要运行的可执行文件。例如,pythonnode

args instance-attribute

args: NotRequired[list[str]]

传递给 command 可执行文件的命令行参数。例如,['foo.py']['server.js', '--port', '8080']

env instance-attribute

env: NotRequired[dict[str, str]]

服务器要设置的环境变量。

cwd instance-attribute

cwd: NotRequired[str | Path]

启动进程时要使用的当前工作目录。

encoding instance-attribute

encoding: NotRequired[str]

发送/接收到服务器的消息使用的文本编码。默认为 utf-8

encoding_error_handler instance-attribute

encoding_error_handler: NotRequired[
    Literal["strict", "ignore", "replace"]
]

文本编码错误处理程序。默认为 strict

有关可能值的说明,请参见 https://docs.pythonlang.cn/3/library/codecs.html#codec-base-classes。

MCPServerStdio

基类:_MCPServerWithClientSession

使用 stdio 传输的 MCP 服务器实现。有关详细信息,请参阅 [规范] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio)。

源代码位于 src/agents/mcp/server.py
class MCPServerStdio(_MCPServerWithClientSession):
    """MCP server implementation that uses the stdio transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for
    details.
    """

    def __init__(
        self,
        params: MCPServerStdioParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
        tool_filter: ToolFilter = None,
        use_structured_content: bool = False,
        max_retry_attempts: int = 0,
        retry_backoff_seconds_base: float = 1.0,
        message_handler: MessageHandlerFnT | None = None,
    ):
        """Create a new MCP server based on the stdio transport.

        Args:
            params: The params that configure the server. This includes the command to run to
                start the server, the args to pass to the command, the environment variables to
                set for the server, the working directory to use when spawning the process, and
                the text encoding used when sending/receiving messages to the server.
            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).
            name: A readable name for the server. If not provided, we'll create one from the
                command.
            client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
            tool_filter: The tool filter to use for filtering tools.
            use_structured_content: Whether to use `tool_result.structured_content` when calling an
                MCP tool. Defaults to False for backwards compatibility - most MCP servers still
                include the structured content in the `tool_result.content`, and using it by
                default will cause duplicate content. You can set this to True if you know the
                server will not duplicate the structured content in the `tool_result.content`.
            max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
                Defaults to no retries.
            retry_backoff_seconds_base: The base delay, in seconds, for exponential
                backoff between retries.
            message_handler: Optional handler invoked for session messages as delivered by the
                ClientSession.
        """
        super().__init__(
            cache_tools_list,
            client_session_timeout_seconds,
            tool_filter,
            use_structured_content,
            max_retry_attempts,
            retry_backoff_seconds_base,
            message_handler=message_handler,
        )

        self.params = StdioServerParameters(
            command=params["command"],
            args=params.get("args", []),
            env=params.get("env"),
            cwd=params.get("cwd"),
            encoding=params.get("encoding", "utf-8"),
            encoding_error_handler=params.get("encoding_error_handler", "strict"),
        )

        self._name = name or f"stdio: {self.params.command}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None,
        ]
    ]:
        """Create the streams for the server."""
        return stdio_client(self.params)

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
)

创建基于 stdio 传输的新 MCP 服务器。

参数

名称 类型 描述 默认
参数 MCPServerStdioParams

配置服务器的参数。这包括启动服务器要运行的命令、传递给命令的参数、服务器要设置的环境变量、启动进程时要使用的当前工作目录以及发送/接收到服务器的消息使用的文本编码。

required
缓存工具列表 bool

是否缓存工具列表。如果为 True,则将缓存工具列表,并且仅从服务器获取一次。如果为 False,则每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果知道服务器不会更改其工具列表,则应将其设置为 True,因为它可以大大提高延迟(避免每次往返服务器)。

False
name str | None

服务器的可读名称。如果未提供,我们将从命令创建它。

None
客户端会话超时秒数 float | None

传递给 MCP ClientSession 的读取超时时间。

5
工具过滤器 ToolFilter

用于过滤工具的工具过滤器。

None
使用结构化内容 bool

是否在调用 MCP 工具时使用 tool_result.structured_content。默认值为 False,为了向后兼容 - 大多数 MCP 服务器仍然将结构化内容包含在 tool_result.content 中,默认使用会导致重复内容。如果确定服务器不会在 tool_result.content 中重复结构化内容,则可以将此值设置为 True。

False
最大重试次数 int

失败的 list_tools/call_tool 调用的重试次数。默认为不重试。

0
重试退避秒数基数 float

重试之间指数退避的秒数基本延迟。

1.0
消息处理程序 MessageHandlerFnT | None

可选的处理程序,由 ClientSession 传递的消息调用。

None
源代码位于 src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
):
    """Create a new MCP server based on the stdio transport.

    Args:
        params: The params that configure the server. This includes the command to run to
            start the server, the args to pass to the command, the environment variables to
            set for the server, the working directory to use when spawning the process, and
            the text encoding used when sending/receiving messages to the server.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            command.
        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
        tool_filter: The tool filter to use for filtering tools.
        use_structured_content: Whether to use `tool_result.structured_content` when calling an
            MCP tool. Defaults to False for backwards compatibility - most MCP servers still
            include the structured content in the `tool_result.content`, and using it by
            default will cause duplicate content. You can set this to True if you know the
            server will not duplicate the structured content in the `tool_result.content`.
        max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
            Defaults to no retries.
        retry_backoff_seconds_base: The base delay, in seconds, for exponential
            backoff between retries.
        message_handler: Optional handler invoked for session messages as delivered by the
            ClientSession.
    """
    super().__init__(
        cache_tools_list,
        client_session_timeout_seconds,
        tool_filter,
        use_structured_content,
        max_retry_attempts,
        retry_backoff_seconds_base,
        message_handler=message_handler,
    )

    self.params = StdioServerParameters(
        command=params["command"],
        args=params.get("args", []),
        env=params.get("env"),
        cwd=params.get("cwd"),
        encoding=params.get("encoding", "utf-8"),
        encoding_error_handler=params.get("encoding_error_handler", "strict"),
    )

    self._name = name or f"stdio: {self.params.command}"

创建流

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

创建服务器的流。

源代码位于 src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]:
    """Create the streams for the server."""
    return stdio_client(self.params)

connect async

connect()

连接到服务器。

源代码位于 src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
                message_handler=self.message_handler,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

源代码位于 src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools(
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[Tool]

列出服务器上可用的工具。

源代码位于 src/agents/mcp/server.py
async def list_tools(
    self,
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        tools = self._tools_list
    else:
        # Fetch the tools from the server
        result = await self._run_with_retries(lambda: session.list_tools())
        self._tools_list = result.tools
        self._cache_dirty = False
        tools = self._tools_list

    # Filter tools based on tool_filter
    filtered_tools = tools
    if self.tool_filter is not None:
        filtered_tools = await self._apply_tool_filter(filtered_tools, run_context, agent)
    return filtered_tools

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

调用服务器上的工具。

源代码位于 src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    return await self._run_with_retries(lambda: session.call_tool(tool_name, arguments))

list_prompts async

list_prompts() -> ListPromptsResult

列出服务器上可用的提示。

源代码位于 src/agents/mcp/server.py
async def list_prompts(
    self,
) -> ListPromptsResult:
    """List the prompts available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.list_prompts()

get_prompt async

get_prompt(
    name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult

从服务器获取特定提示。

源代码位于 src/agents/mcp/server.py
async def get_prompt(
    self, name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult:
    """Get a specific prompt from the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.get_prompt(name, arguments)

使工具缓存失效

invalidate_tools_cache()

使工具缓存失效。

源代码位于 src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

MCPServerSseParams

基础: TypedDict

镜像 mcp.client.sse.sse_client 中的参数。

源代码位于 src/agents/mcp/server.py
class MCPServerSseParams(TypedDict):
    """Mirrors the params in`mcp.client.sse.sse_client`."""

    url: str
    """The URL of the server."""

    headers: NotRequired[dict[str, str]]
    """The headers to send to the server."""

    timeout: NotRequired[float]
    """The timeout for the HTTP request. Defaults to 5 seconds."""

    sse_read_timeout: NotRequired[float]
    """The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""

url 实例属性

url: str

服务器的 URL。

headers 实例属性

headers: NotRequired[dict[str, str]]

发送到服务器的标头。

timeout instance-attribute

timeout: NotRequired[float]

HTTP 请求的超时时间。默认为 5 秒。

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[float]

SSE 连接的超时时间,以秒为单位。默认为 5 分钟。

MCPServerSse

基类:_MCPServerWithClientSession

使用 HTTP 与 SSE 传输的 MCP 服务器实现。有关详细信息,请参阅 [规范] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)。

源代码位于 src/agents/mcp/server.py
class MCPServerSse(_MCPServerWithClientSession):
    """MCP server implementation that uses the HTTP with SSE transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)
    for details.
    """

    def __init__(
        self,
        params: MCPServerSseParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
        tool_filter: ToolFilter = None,
        use_structured_content: bool = False,
        max_retry_attempts: int = 0,
        retry_backoff_seconds_base: float = 1.0,
        message_handler: MessageHandlerFnT | None = None,
    ):
        """Create a new MCP server based on the HTTP with SSE transport.

        Args:
            params: The params that configure the server. This includes the URL of the server,
                the headers to send to the server, the timeout for the HTTP request, and the
                timeout for the SSE connection.

            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).

            name: A readable name for the server. If not provided, we'll create one from the
                URL.

            client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
            tool_filter: The tool filter to use for filtering tools.
            use_structured_content: Whether to use `tool_result.structured_content` when calling an
                MCP tool. Defaults to False for backwards compatibility - most MCP servers still
                include the structured content in the `tool_result.content`, and using it by
                default will cause duplicate content. You can set this to True if you know the
                server will not duplicate the structured content in the `tool_result.content`.
            max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
                Defaults to no retries.
            retry_backoff_seconds_base: The base delay, in seconds, for exponential
                backoff between retries.
            message_handler: Optional handler invoked for session messages as delivered by the
                ClientSession.
        """
        super().__init__(
            cache_tools_list,
            client_session_timeout_seconds,
            tool_filter,
            use_structured_content,
            max_retry_attempts,
            retry_backoff_seconds_base,
            message_handler=message_handler,
        )

        self.params = params
        self._name = name or f"sse: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None,
        ]
    ]:
        """Create the streams for the server."""
        return sse_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
        )

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
)

创建基于 HTTP 与 SSE 传输的新 MCP 服务器。

参数

名称 类型 描述 默认
参数 MCPServerSseParams

配置服务器的参数。这包括服务器的 URL、发送到服务器的标头、HTTP 请求的超时时间以及 SSE 连接的超时时间。

required
缓存工具列表 bool

是否缓存工具列表。如果为 True,则将缓存工具列表,并且仅从服务器获取一次。如果为 False,则每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果知道服务器不会更改其工具列表,则应将其设置为 True,因为它可以大大提高延迟(避免每次往返服务器)。

False
name str | None

服务器的可读名称。如果未提供,我们将从 URL 创建它。

None
客户端会话超时秒数 float | None

传递给 MCP ClientSession 的读取超时时间。

5
工具过滤器 ToolFilter

用于过滤工具的工具过滤器。

None
使用结构化内容 bool

是否在调用 MCP 工具时使用 tool_result.structured_content。默认值为 False,为了向后兼容 - 大多数 MCP 服务器仍然将结构化内容包含在 tool_result.content 中,默认使用会导致重复内容。如果确定服务器不会在 tool_result.content 中重复结构化内容,则可以将此值设置为 True。

False
最大重试次数 int

失败的 list_tools/call_tool 调用的重试次数。默认为不重试。

0
重试退避秒数基数 float

重试之间指数退避的秒数基本延迟。

1.0
消息处理程序 MessageHandlerFnT | None

可选的处理程序,由 ClientSession 传递的消息调用。

None
源代码位于 src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
):
    """Create a new MCP server based on the HTTP with SSE transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, and the
            timeout for the SSE connection.

        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).

        name: A readable name for the server. If not provided, we'll create one from the
            URL.

        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
        tool_filter: The tool filter to use for filtering tools.
        use_structured_content: Whether to use `tool_result.structured_content` when calling an
            MCP tool. Defaults to False for backwards compatibility - most MCP servers still
            include the structured content in the `tool_result.content`, and using it by
            default will cause duplicate content. You can set this to True if you know the
            server will not duplicate the structured content in the `tool_result.content`.
        max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
            Defaults to no retries.
        retry_backoff_seconds_base: The base delay, in seconds, for exponential
            backoff between retries.
        message_handler: Optional handler invoked for session messages as delivered by the
            ClientSession.
    """
    super().__init__(
        cache_tools_list,
        client_session_timeout_seconds,
        tool_filter,
        use_structured_content,
        max_retry_attempts,
        retry_backoff_seconds_base,
        message_handler=message_handler,
    )

    self.params = params
    self._name = name or f"sse: {self.params['url']}"

创建流

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

创建服务器的流。

源代码位于 src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]:
    """Create the streams for the server."""
    return sse_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
    )

connect async

connect()

连接到服务器。

源代码位于 src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
                message_handler=self.message_handler,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

源代码位于 src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools(
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[Tool]

列出服务器上可用的工具。

源代码位于 src/agents/mcp/server.py
async def list_tools(
    self,
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        tools = self._tools_list
    else:
        # Fetch the tools from the server
        result = await self._run_with_retries(lambda: session.list_tools())
        self._tools_list = result.tools
        self._cache_dirty = False
        tools = self._tools_list

    # Filter tools based on tool_filter
    filtered_tools = tools
    if self.tool_filter is not None:
        filtered_tools = await self._apply_tool_filter(filtered_tools, run_context, agent)
    return filtered_tools

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

调用服务器上的工具。

源代码位于 src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    return await self._run_with_retries(lambda: session.call_tool(tool_name, arguments))

list_prompts async

list_prompts() -> ListPromptsResult

列出服务器上可用的提示。

源代码位于 src/agents/mcp/server.py
async def list_prompts(
    self,
) -> ListPromptsResult:
    """List the prompts available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.list_prompts()

get_prompt async

get_prompt(
    name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult

从服务器获取特定提示。

源代码位于 src/agents/mcp/server.py
async def get_prompt(
    self, name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult:
    """Get a specific prompt from the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.get_prompt(name, arguments)

使工具缓存失效

invalidate_tools_cache()

使工具缓存失效。

源代码位于 src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

MCPServerStreamableHttpParams

基础: TypedDict

镜像 mcp.client.streamable_http.streamablehttp_client 中的参数。

源代码位于 src/agents/mcp/server.py
class MCPServerStreamableHttpParams(TypedDict):
    """Mirrors the params in`mcp.client.streamable_http.streamablehttp_client`."""

    url: str
    """The URL of the server."""

    headers: NotRequired[dict[str, str]]
    """The headers to send to the server."""

    timeout: NotRequired[timedelta | float]
    """The timeout for the HTTP request. Defaults to 5 seconds."""

    sse_read_timeout: NotRequired[timedelta | float]
    """The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""

    terminate_on_close: NotRequired[bool]
    """Terminate on close"""

    httpx_client_factory: NotRequired[HttpClientFactory]
    """Custom HTTP client factory for configuring httpx.AsyncClient behavior."""

url 实例属性

url: str

服务器的 URL。

headers 实例属性

headers: NotRequired[dict[str, str]]

发送到服务器的标头。

timeout instance-attribute

timeout: NotRequired[timedelta | float]

HTTP 请求的超时时间。默认为 5 秒。

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[timedelta | float]

SSE 连接的超时时间,以秒为单位。默认为 5 分钟。

terminate_on_close instance-attribute

terminate_on_close: NotRequired[bool]

关闭时终止

httpx_client_factory instance-attribute

httpx_client_factory: NotRequired[HttpClientFactory]

配置 httpx.AsyncClient 行为的自定义 HTTP 客户端工厂。

MCPServerStreamableHttp

基类:_MCPServerWithClientSession

使用 Streamable HTTP 传输的 MCP 服务器实现。有关详细信息,请参阅 [规范] (https://modelcontextprotocol.com.cn/specification/2025-03-26/basic/transports#streamable-http)。

源代码位于 src/agents/mcp/server.py
class MCPServerStreamableHttp(_MCPServerWithClientSession):
    """MCP server implementation that uses the Streamable HTTP transport. See the [spec]
    (https://modelcontextprotocol.com.cn/specification/2025-03-26/basic/transports#streamable-http)
    for details.
    """

    def __init__(
        self,
        params: MCPServerStreamableHttpParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
        tool_filter: ToolFilter = None,
        use_structured_content: bool = False,
        max_retry_attempts: int = 0,
        retry_backoff_seconds_base: float = 1.0,
        message_handler: MessageHandlerFnT | None = None,
    ):
        """Create a new MCP server based on the Streamable HTTP transport.

        Args:
            params: The params that configure the server. This includes the URL of the server,
                the headers to send to the server, the timeout for the HTTP request, the
                timeout for the Streamable HTTP connection, whether we need to
                terminate on close, and an optional custom HTTP client factory.

            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).

            name: A readable name for the server. If not provided, we'll create one from the
                URL.

            client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
            tool_filter: The tool filter to use for filtering tools.
            use_structured_content: Whether to use `tool_result.structured_content` when calling an
                MCP tool. Defaults to False for backwards compatibility - most MCP servers still
                include the structured content in the `tool_result.content`, and using it by
                default will cause duplicate content. You can set this to True if you know the
                server will not duplicate the structured content in the `tool_result.content`.
            max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
                Defaults to no retries.
            retry_backoff_seconds_base: The base delay, in seconds, for exponential
                backoff between retries.
            message_handler: Optional handler invoked for session messages as delivered by the
                ClientSession.
        """
        super().__init__(
            cache_tools_list,
            client_session_timeout_seconds,
            tool_filter,
            use_structured_content,
            max_retry_attempts,
            retry_backoff_seconds_base,
            message_handler=message_handler,
        )

        self.params = params
        self._name = name or f"streamable_http: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None,
        ]
    ]:
        """Create the streams for the server."""
        # Only pass httpx_client_factory if it's provided
        if "httpx_client_factory" in self.params:
            return streamablehttp_client(
                url=self.params["url"],
                headers=self.params.get("headers", None),
                timeout=self.params.get("timeout", 5),
                sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
                terminate_on_close=self.params.get("terminate_on_close", True),
                httpx_client_factory=self.params["httpx_client_factory"],
            )
        else:
            return streamablehttp_client(
                url=self.params["url"],
                headers=self.params.get("headers", None),
                timeout=self.params.get("timeout", 5),
                sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
                terminate_on_close=self.params.get("terminate_on_close", True),
            )

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerStreamableHttpParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
)

创建基于 Streamable HTTP 传输的新 MCP 服务器。

参数

名称 类型 描述 默认
参数 MCPServerStreamableHttpParams

配置服务器的参数。这包括服务器的 URL、发送到服务器的标头、HTTP 请求的超时时间、Streamable HTTP 连接的超时时间、是否需要在关闭时终止以及可选的自定义 HTTP 客户端工厂。

required
缓存工具列表 bool

是否缓存工具列表。如果为 True,则将缓存工具列表,并且仅从服务器获取一次。如果为 False,则每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果知道服务器不会更改其工具列表,则应将其设置为 True,因为它可以大大提高延迟(避免每次往返服务器)。

False
name str | None

服务器的可读名称。如果未提供,我们将从 URL 创建它。

None
客户端会话超时秒数 float | None

传递给 MCP ClientSession 的读取超时时间。

5
工具过滤器 ToolFilter

用于过滤工具的工具过滤器。

None
使用结构化内容 bool

是否在调用 MCP 工具时使用 tool_result.structured_content。默认值为 False,为了向后兼容 - 大多数 MCP 服务器仍然将结构化内容包含在 tool_result.content 中,默认使用会导致重复内容。如果确定服务器不会在 tool_result.content 中重复结构化内容,则可以将此值设置为 True。

False
最大重试次数 int

失败的 list_tools/call_tool 调用的重试次数。默认为不重试。

0
重试退避秒数基数 float

重试之间指数退避的秒数基本延迟。

1.0
消息处理程序 MessageHandlerFnT | None

可选的处理程序,由 ClientSession 传递的消息调用。

None
源代码位于 src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStreamableHttpParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
    tool_filter: ToolFilter = None,
    use_structured_content: bool = False,
    max_retry_attempts: int = 0,
    retry_backoff_seconds_base: float = 1.0,
    message_handler: MessageHandlerFnT | None = None,
):
    """Create a new MCP server based on the Streamable HTTP transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, the
            timeout for the Streamable HTTP connection, whether we need to
            terminate on close, and an optional custom HTTP client factory.

        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).

        name: A readable name for the server. If not provided, we'll create one from the
            URL.

        client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
        tool_filter: The tool filter to use for filtering tools.
        use_structured_content: Whether to use `tool_result.structured_content` when calling an
            MCP tool. Defaults to False for backwards compatibility - most MCP servers still
            include the structured content in the `tool_result.content`, and using it by
            default will cause duplicate content. You can set this to True if you know the
            server will not duplicate the structured content in the `tool_result.content`.
        max_retry_attempts: Number of times to retry failed list_tools/call_tool calls.
            Defaults to no retries.
        retry_backoff_seconds_base: The base delay, in seconds, for exponential
            backoff between retries.
        message_handler: Optional handler invoked for session messages as delivered by the
            ClientSession.
    """
    super().__init__(
        cache_tools_list,
        client_session_timeout_seconds,
        tool_filter,
        use_structured_content,
        max_retry_attempts,
        retry_backoff_seconds_base,
        message_handler=message_handler,
    )

    self.params = params
    self._name = name or f"streamable_http: {self.params['url']}"

创建流

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

创建服务器的流。

源代码位于 src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]:
    """Create the streams for the server."""
    # Only pass httpx_client_factory if it's provided
    if "httpx_client_factory" in self.params:
        return streamablehttp_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
            terminate_on_close=self.params.get("terminate_on_close", True),
            httpx_client_factory=self.params["httpx_client_factory"],
        )
    else:
        return streamablehttp_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
            terminate_on_close=self.params.get("terminate_on_close", True),
        )

connect async

connect()

连接到服务器。

源代码位于 src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
                message_handler=self.message_handler,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

源代码位于 src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools(
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[Tool]

列出服务器上可用的工具。

源代码位于 src/agents/mcp/server.py
async def list_tools(
    self,
    run_context: RunContextWrapper[Any] | None = None,
    agent: AgentBase | None = None,
) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        tools = self._tools_list
    else:
        # Fetch the tools from the server
        result = await self._run_with_retries(lambda: session.list_tools())
        self._tools_list = result.tools
        self._cache_dirty = False
        tools = self._tools_list

    # Filter tools based on tool_filter
    filtered_tools = tools
    if self.tool_filter is not None:
        filtered_tools = await self._apply_tool_filter(filtered_tools, run_context, agent)
    return filtered_tools

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

调用服务器上的工具。

源代码位于 src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")
    session = self.session
    assert session is not None

    return await self._run_with_retries(lambda: session.call_tool(tool_name, arguments))

list_prompts async

list_prompts() -> ListPromptsResult

列出服务器上可用的提示。

源代码位于 src/agents/mcp/server.py
async def list_prompts(
    self,
) -> ListPromptsResult:
    """List the prompts available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.list_prompts()

get_prompt async

get_prompt(
    name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult

从服务器获取特定提示。

源代码位于 src/agents/mcp/server.py
async def get_prompt(
    self, name: str, arguments: dict[str, Any] | None = None
) -> GetPromptResult:
    """Get a specific prompt from the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.get_prompt(name, arguments)

使工具缓存失效

invalidate_tools_cache()

使工具缓存失效。

源代码位于 src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True