基类: SessionABC
带有基于TTL(生存时间)的过期功能的 Session 实现的加密包装器。
此类包装任何 SessionABC 实现,以使用 Fernet 加密、基于会话密钥派生以及自动过期旧数据,从而提供存储项目的透明加密/解密。
当项目过期(超过 TTL)时,在检索期间会被静默跳过。
注意:过期令牌基于应用程序服务器的系统时钟进行拒绝。为了避免由于时钟漂移而拒绝有效的令牌,请确保您的环境中的所有服务器都使用 NTP 同步。
源代码位于 src/agents/extensions/memory/encrypt_session.py
| class EncryptedSession(SessionABC):
"""Encrypted wrapper for Session implementations with TTL-based expiration.
This class wraps any SessionABC implementation to provide transparent
encryption/decryption of stored items using Fernet encryption with
per-session key derivation and automatic expiration of old data.
When items expire (exceed TTL), they are silently skipped during retrieval.
Note: Expired tokens are rejected based on the system clock of the application server.
To avoid valid tokens being rejected due to clock drift, ensure all servers in
your environment are synchronized using NTP.
"""
def __init__(
self,
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
):
"""
Args:
session_id: ID for this session
underlying_session: The real session store (e.g. SQLiteSession, SQLAlchemySession)
encryption_key: Master key (Fernet key or raw secret)
ttl: Token time-to-live in seconds (default 10 min)
"""
self.session_id = session_id
self.underlying_session = underlying_session
self.ttl = ttl
master = _ensure_fernet_key_bytes(encryption_key)
self.cipher = _derive_session_fernet_key(master, session_id)
self._kid = "hkdf-v1"
self._ver = 1
def __getattr__(self, name):
return getattr(self.underlying_session, name)
def _wrap(self, item: TResponseInputItem) -> EncryptedEnvelope:
if isinstance(item, dict):
payload = item
elif hasattr(item, "model_dump"):
payload = item.model_dump()
elif hasattr(item, "__dict__"):
payload = item.__dict__
else:
payload = dict(item)
token = self.cipher.encrypt(_to_json_bytes(payload)).decode("utf-8")
return {"__enc__": 1, "v": self._ver, "kid": self._kid, "payload": token}
def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInputItem | None:
if not _is_encrypted_envelope(item):
return cast(TResponseInputItem, item)
try:
token = item["payload"].encode("utf-8")
plaintext = self.cipher.decrypt(token, ttl=self.ttl)
return cast(TResponseInputItem, _from_json_bytes(plaintext))
except (InvalidToken, KeyError):
return None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit)
valid_items: list[TResponseInputItem] = []
for enc in encrypted_items:
item = self._unwrap(enc)
if item is not None:
valid_items.append(item)
return valid_items
async def add_items(self, items: list[TResponseInputItem]) -> None:
wrapped: list[EncryptedEnvelope] = [self._wrap(it) for it in items]
await self.underlying_session.add_items(cast(list[TResponseInputItem], wrapped))
async def pop_item(self) -> TResponseInputItem | None:
while True:
enc = await self.underlying_session.pop_item()
if not enc:
return None
item = self._unwrap(enc)
if item is not None:
return item
async def clear_session(self) -> None:
await self.underlying_session.clear_session()
|
__init__
__init__(
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
)
参数
| 名称 |
类型 |
描述 |
默认 |
session_id
|
str
|
|
required
|
underlying_session
|
SessionABC
|
实际的会话存储(例如 SQLiteSession、SQLAlchemySession)
|
required
|
encryption_key
|
str
|
|
required
|
ttl
|
int
|
|
600
|
源代码位于 src/agents/extensions/memory/encrypt_session.py
| def __init__(
self,
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
):
"""
Args:
session_id: ID for this session
underlying_session: The real session store (e.g. SQLiteSession, SQLAlchemySession)
encryption_key: Master key (Fernet key or raw secret)
ttl: Token time-to-live in seconds (default 10 min)
"""
self.session_id = session_id
self.underlying_session = underlying_session
self.ttl = ttl
master = _ensure_fernet_key_bytes(encryption_key)
self.cipher = _derive_session_fernet_key(master, session_id)
self._kid = "hkdf-v1"
self._ver = 1
|