简介(未完成)
初探分布式Agent系统架构相对于在单个进程空间运行的Agent系统,分布式智能体系统则是将这些智能体分布运行在不同的进程、主机甚至不同的平台上,它们需要通过网络通信协同工作,共同完成复杂任务。分布式的需求来自于Agent能力扩展的需要,本质上有两个维度:
- 横向扩展:单个 Agent 实例部署在多台服务器上,从而提高其处理性能和吞吐能力,这类似于传统 Web 服务的水平扩展。例如:当某个 Agent 需要维护大量长连接会话、处理大规模并发请求,或执行耗时的任务时,可以通过负载均衡将请求分散到多个实例,实现更高的容量和可靠性。
- 纵向扩展:多个不同职责的 Agent,将它们部署在不同服务器上协同工作,形成一个多 Agent 系统,类似于将单一智能体的功能按模块解耦为多个服务。例如:可以用一个 Agent 专门负责数据检索,另一个负责推理决策,再另一个负责执行操作,通过彼此通信来完成一个综合任务。这种方式提升了系统的模块化和灵活性,使各Agent各司其职,协同解决复杂问题。
分布式Agent系统
挑战
对于横向的水平扩展,其面临的最主要挑战是,由于Agent任务的特点,简单的无状态服务形式对其是不够的(长时间任务、多轮对话、流输出等);但有状态的服务又会带来扩展能力的下降,面临新的挑战:
- 任务状态一致性:如何保证Agent任务状态和上下文在多个实例间的一致性是难题。可能需要借助会话粘连(保证同一会话总是由同一实例处理)或状态持久化等手段,来实现状态同步。这对分布式会话管理有较高要求。
- 任务调度与容错:需要有效的负载均衡和任务调度策略来优化资源利用,是简单的轮询还是根据实例响应时间或资源使用的动态分配等。此外,还需要考虑实例故障的容错,通常需要借助成熟的负载均衡与容器编排策略。
而对于更复杂的多Agent系统跨网络的协作,还会面临更多问题:
- Agent 协同协议。如果每对Agent都用各自私有接口对话,随着Agent数量增加,连接关系将变得错综复杂,维护成本高且易出错,会导致极高的耦合度。
- 消息传递效率。Agent 间需要频繁交换消息,如果通信机制低效,多 Agent 协作的整体速度将受限。特别在涉及长时间任务时,如何让Agent间异步并行工作、及时通知任务状态变化,避免因为轮询等机制浪费资源,都是需要考虑的问题。
- 上下文共享。多个 Agent 合作时,每个 Agent 只掌握局部信息,如何在它们之间共享上下文尤为关键。需要机制让Agent能够方便地共享知识和中间结果(例如通过公共内存、黑板系统甚至数据库)。
- 能力复用。当不同Agent具备互补的技能时,应该允许互相调用对方的能力,而不是各自重复实现同样的功能。这要求在 Agent 之间建立服务发现与调用机制。
协作方案
- AutoGen 框架的分布式运行时。AutoGen在最新0.4版本后引入了实验性的分布式 Agent Runtime。其架构包含一个中央的主机服务(Host Service)和若干工作节点(Worker Runtime):
AutoGen 框架提供一体化的解决方案,适合于快速构建同构环境下的多Agent协作;但局限是协作的 Agent 需要在同一 AutoGen 框架内,跨不同框架和语言的 Agent 互通需另行考虑。
-
方案二:Agent 远程服务化(RPC/MCP)这种方法是将每个 Agent 作为独立服务来部署,通过远程调用的方式实现协作。利用成熟的分布式技术栈。同时,每个 Agent 服务可以独立扩展和部署(当然横向扩展也会面临上面提到的问题),天然具有模块边界清晰、故障隔离好的特点。然而,纯粹的 RPC 式协作也有明显的局限:
- 需要开发者自行设计请求/响应的数据格式和流程。
- Agent 之间没有共享的“会话”概念或上下文维护机制。
- 随着协作Agent数量增加,开发者可能需要处理繁琐的编排逻辑。
- 很快会陷入会话管理、内存管理和协调同步的复杂细节中 。 这种方法总体上仍需要开发者规划好哪个Agent调用哪个、何时调用,并处理数据格式转换和错误恢复,其适用场景通常是固定的流程编排或工具调用型的Agent协作,即任务流程相对确定,Agent 之间主要是服务调用关系而非自由对话。对于需要灵活对话协商、多轮交互的场景,RPC方式会变得力不从心。
- Google 的 Agent-to-Agent 协议
- 服务发现机制:每个支持 A2A 的 Agent 对外暴露一个Agent Card(JSON 文档),描述该 Agent 的元信息 。解决了异构 Agent 之间如何互相发现和了解彼此功能的问题。
- 标准化的消息与任务结构:A2A 将 Agent 之间交互抽象为一个个任务(Task)。任务由协议统一定义生命周期,Agent 间通过发送消息来协商和更新任务状态,每条消息都有明确的结构,确保不同实现的Agent都能正确理解彼此发送的内容。
- 多样的通信模式:考虑到不同任务对交互实时性的要求,A2A 支持多种通信方式 ,支持短周期的请求/响应或者异步通知的任务形式。
- 安全与跨平台:作为企业级协议,A2A 特别强调了通信的安全性和跨平台兼容
A2A
人与人之间可以通过各种各样的方式沟通:对话,眼神,肢体动作,画作等,这些可以帮助不同的人之间相互了解对方,并做出正确的动作,共同推动人类社会的发展,那么Agent之间沟通协作呢?Google给出了自己的答案:A2A 。
A2A 作为一个开放协议,充分考虑了 Agent 在和用户、企业打通的过程中所面临的一些挑战,其主要功能特性有以下四点:
- 安全协作(Secure Collaboration):通过引入认证/授权机制,保证 Agent 之间的身份互信。
- 任务状态管理(Task and state mgmt):实现了 Agent 之间互操作任务以及任务状态的可管理性。
- 用户体验协商(UX negotiation):不同的 Agent 通过协商的方式,对用户提供无缝的体验。
- 功能发现(Capability discovery):提供了 Agent 之间相互发现各自能力的机制。 除此之外,A2A 也在企业的无缝接入、简化集成方面,有比较好的考量。
Agent 相互之间的发现、了解和交互调用,是一个发展趋势。
- 首先,企业基于当前业务,都在探索、建立各种各样的 领域Agent 。在内部的各种 领域Agent 之间的沟通协作,是必须要面对和解决的一个问题。
- 其次,对于对外提供 Agent 服务的提供商来说,我如何让其他 Agent 主动发现我,就像SEO,吸引更多的流量,也是一个需要思考的问题。
概念
A2A(Agent2Agent) 简介 A2A 中包含三个核心的参与者:
- User,主要的作用是用于 认证&授权
- Client Agent,指的是任务发起者
- Remote Agent,指的是任务的执行者。
Client 和 Server 之间的通信,可以理解为就是一个个简单的请求和结果的响应,只不过这个请求是一个个的任务。一个 Agent 既可以是 Client 也可以是 Server。Client Agent 和 Server Agent 交互的过程中,主要涉及到的一些Entity:AgentCard、Task 、Artifact 、Message、Part。
- AgentCard 是 Server Agent 的名片,它主要描述了 Server Agent 的能力、认证机制等信息。Client Agent通过获取不同 Server Agent 的 AgentCard,了解不同 Server Agent 的能力,来决断具体的任务执行应该调用哪个 Server Agent 。
interface AgentCard { name: string; description: string; url: string; provider?: { organization: string; url: string; }; version: string; documentationUrl?: string; capabilities: { streaming?: boolean; pushNotifications?: boolean; stateTransitionHistory?: boolean; }; authentication: { schemes: string[]; credentials?: string; }; defaultInputModes: string[]; defaultOutputModes: string[]; skills: { id: string; name: string; description: string; tags: string[]; examples?: string[]; inputModes?: string[]; outputModes?: string[]; }[]; }
- Task 是一个具有状态的实体,由Client Agent创建,其状态由Server Agent维护。一个Task用于达到特定的目标或者结果。Agent Client和Server Client在Task中交换Mesaage,Server Agent生成的结果叫做Artifact。除此之外,每个Task有一个唯一的sessionId,多个Task可以使用一个sessionId,表明多个Task属于同一个会话的一部分。
interface Task { id: string; sessionId: string; status: TaskStatus; history?: Message[]; artifacts?: Artifact[]; metadata?: Record<string, any>; } interface TaskStatus { state: TaskState; message?: Message; timestamp?: string; } interface TaskStatusUpdateEvent { id: string; status: TaskStatus; final: boolean; //indicates the end of the event stream metadata?: Record<string, any>; } interface TaskArtifactUpdateEvent { id: string; artifact: Artifact; metadata?: Record<string, any>; } interface TaskSendParams { id: string; sessionId?: string; message: Message; historyLength?: number; pushNotification?: PushNotificationConfig; metadata?: Record<string, any>; // extension metadata } type TaskState = | "submitted" | "working" | "input-required" | "completed" | "canceled" | "failed" | "unknown";
- Artifacts:Server Agent 在执行任务后生成的目标结果叫做 Artifact,一个 Task 可能生成一个或者多个 Artifact。
Artifacts 是不可变的,可以命名,并且可以有多个部分。流式响应可以分批次,将结果附加到现有 Artifacts上。
interface Artifact { name?: string; description?: string; parts: Part[]; metadata?: Record<string, any>; index: number; append?: boolean; lastChunk?: boolean; }
- 在 Task执行过程中,Server Agent和Client Agent之间是通过Message完成交流的,当然,这不包括Artifact。它可以包括:Agent的思考、用户上下文、指令、错误、状态或元数据。一个Message可以包含多个Part,每个Part携带不同的内容。
interface Message { role: "user" | "agent"; parts: Part[]; metadata?: Record<string, any>; }
Part 是 Message 和 Artifact 的核心组成部分,代表了其携带的主要内容。每个 Part 都标识了内容类型和具体内容。
interface TextPart { type: "text"; text: string; } interface FilePart { type: "file"; file: { name?: string; mimeType?: string; // oneof { bytes?: string; //base64 encoded content uri?: string; //} }; } interface DataPart { type: "data"; data: Record<string, any>; } type Part = (TextPart | FilePart | DataPart) & { metadata: Record<string, any>; };
通信
ClientAgent 和ServerAgent之间通过HTTP协议进行通信,使用经典的C/S模式,支持SSE流式数据传输,数据格式为JSON-RPC2.0。PS:可以理解为定义了有哪些api endpoint 以及endpoint 的入参出参schema
A2A遵循Open API规范进行身份验证。A2A不会在协议中交换身份信息。相反,它们会在带外获取材料(如令牌),并在HTTP 头中传输。
Client Agent 和 Server Agent 之间协同工作需要经过以下几个关键步骤:
- Server Agent 在指定站点托管自己的 AgentCard;官方建议将 AgentCard 托管在
https://${host}/.well-known/agent.json
。叫做 Open Discovery,除此之外,还有另外两种方式:Curated Discovery 和 Private Discovery。Agent Client 可以通过请求https://${host}/.well-known/agent.json
,获取到指定的 AgentCard,并集成到自己的提示词或者工具集中。 - Client Agent 主动从
/.well-known/agent.json
发现 AgentCard; - Client Agent 发起一个 Task;以启动新任务、恢复中断的任务或重新打开已完成的任务。PS:客户端通过
tasks/send
发送任务请求,或者通过tasks/sendSubscribe
发送长期任务请求。{ "jsonrpc": "2.0", "id": 1, "method":"tasks/send", "params": { "id": "de38c76d-d54c-436c-8b9f-4c2703648d64", "message": { "role":"user", "data": [{ "type":"text", "text": "tell me a joke" }] }, "metadata": {} } }
- Client Agent 设置任务通知监听;ClientAgent 可以设置一个方法,给到 ServerAgent,当 ServerAgent 修改 Task 状态后,同步调用 ClientAgent 的监听方法。
//Request { "jsonrpc": "2.0", "id": 1, "method":"tasks/pushNotification/set", "params": { "id": "de38c76d-d54c-436c-8b9f-4c2703648d64", "pushNotificationConfig": { "url": "https://example.com/callback", "authentication": { "schemes": ["jwt"] } } } } //Response { "jsonrpc": "2.0", "id": 1, "result": { "id": "de38c76d-d54c-436c-8b9f-4c2703648d64", "pushNotificationConfig": { "url": "https://example.com/callback", "authentication": { "schemes": ["jwt"] } } } }
- Server Agent 执行任务,返回 Artifact;PS:根据任务类型进行流式更新。
{ "jsonrpc": "2.0", "id": 1, "result": { "id": "de38c76d-d54c-436c-8b9f-4c2703648d64", "sessionId": "c295ea44-7543-4f78-b524-7a38915ad6e4", "status": { "state": "completed", }, "artifacts": [{ "name":"joke", "parts": [{ "type":"text", "text":"Why did the chicken cross the road? To get to the other side!" }] }], "metadata": {} } }
- Client Agent 获取 Artifact。这里需要注意的是,Client Agent 需要通过获取 Task 的方式,获取到Artifact。如果任务需要更多输入,客户端可以在同一任务ID下继续提供输入。
- 任务完成。任务在达到完成、失败或取消状态后,结束任务生命周期。
A2A vs MCP
MCP 还是传统的工程思维,A2A则是站在人的思维来看待世界。我们要理解MCP的定位:提供一个规范的方式,向LLMs/Agent提供上下文。MCP强调的是LLMs/Agent为主体,MCPServer为附属的模式。而A2A强调的是Agent和Agent之间的相互操作,协议双端是对等的。
源码
官方示例 https://github.com/google/A2A/tree/main/samples/python/common 待补充。
class A2AServer:
def __init__(self, host="0.0.0.0",port=5000,endpoint="/",agent_card: AgentCard = None,task_manager: TaskManager = None,):
self.host = host
self.port = port
self.endpoint = endpoint
self.task_manager = task_manager
self.agent_card = agent_card
self.app = Starlette() # 初始化了一个app
self.app.add_route(self.endpoint, self._process_request, methods=["POST"])
self.app.add_route(
"/.well-known/agent.json", self._get_agent_card, methods=["GET"]
)
def start(self):
if self.agent_card is None:
raise ValueError("agent_card is not defined")
if self.task_manager is None:
raise ValueError("request_handler is not defined")
import uvicorn
uvicorn.run(self.app, host=self.host, port=self.port)
def _get_agent_card(self, request: Request) -> JSONResponse:
return JSONResponse(self.agent_card.model_dump(exclude_none=True))
async def _process_request(self, request: Request):
try:
body = await request.json()
json_rpc_request = A2ARequest.validate_python(body)
if isinstance(json_rpc_request, GetTaskRequest):
result = await self.task_manager.on_get_task(json_rpc_request)
elif isinstance(json_rpc_request, SendTaskRequest):
result = await self.task_manager.on_send_task(json_rpc_request)
elif isinstance(json_rpc_request, SendTaskStreamingRequest):
result = await self.task_manager.on_send_task_subscribe(
json_rpc_request
)
elif isinstance(json_rpc_request, CancelTaskRequest):
result = await self.task_manager.on_cancel_task(json_rpc_request)
elif isinstance(json_rpc_request, SetTaskPushNotificationRequest):
result = await self.task_manager.on_set_task_push_notification(json_rpc_request)
elif isinstance(json_rpc_request, GetTaskPushNotificationRequest):
result = await self.task_manager.on_get_task_push_notification(json_rpc_request)
elif isinstance(json_rpc_request, TaskResubscriptionRequest):
result = await self.task_manager.on_resubscribe_to_task(
json_rpc_request
)
else:
logger.warning(f"Unexpected request type: {type(json_rpc_request)}")
raise ValueError(f"Unexpected request type: {type(request)}")
return self._create_response(result)
except Exception as e:
return self._handle_exception(e)
安全
- Agent可信身份:让经过认证的Agent快速加入协作网络,防止未认证Agent破坏协作秩序。
- 意图的可信共享:智能体间的协作依赖于意图的真实性和准确性。例如,点餐助手与支付助手共享信息时,若意图被篡改,可能导致重复扣费或订单丢失
- 上下文保护机制:当一个AI Agent连接多个MCP(多通道协议)服务器时,所有工具描述信息会被加载到同一会话上下文中,恶意MCP Server可能借此注入恶意指令。比如黑灰产可以伪造一个“天气查询”工具注册到MCP Server,实际却在后台窃取用户航班信息;
- Agent记忆可信共享:记忆共享提升多Agent协作效率,如电商场景中记录用户偏好避免重复询问。记忆可信共享则确保数据一致、真实且安全,防止篡改与泄露,增强协作效果和用户信任。
- 身份可信流转:用户期待在AI原生应用中获得无缝流畅的服务体验。如果每次交互都需要跳转不同平台进行身份认证,将严重影响体验并阻碍AI应用普及。因此,实现跨平台无打扰的身份识别,成为提升用户体验的关键。
其它
PS:消息总线方式理解a2a