5 分钟阅读

简介

初探分布式Agent系统架构相对于在单个进程空间运行的Agent系统,分布式智能体系统则是将这些智能体分布运行在不同的进程、主机甚至不同的平台上,它们需要通过网络通信协同工作,共同完成复杂任务。分布式的需求来自于Agent能力扩展的需要,本质上有两个维度:

  1. 横向扩展:单个 Agent 实例部署在多台服务器上,从而提高其处理性能和吞吐能力,这类似于传统 Web 服务的水平扩展。例如:当某个 Agent 需要维护大量长连接会话、处理大规模并发请求,或执行耗时的任务时,可以通过负载均衡将请求分散到多个实例,实现更高的容量和可靠性。
  2. 纵向扩展:多个不同职责的 Agent,将它们部署在不同服务器上协同工作,形成一个多 Agent 系统,类似于将单一智能体的功能按模块解耦为多个服务。例如:可以用一个 Agent 专门负责数据检索,另一个负责推理决策,再另一个负责执行操作,通过彼此通信来完成一个综合任务。这种方式提升了系统的模块化和灵活性,使各Agent各司其职,协同解决复杂问题。

分布式Agent系统

挑战

对于横向的水平扩展,其面临的最主要挑战是,由于Agent任务的特点,简单的无状态服务形式对其是不够的(长时间任务、多轮对话、流输出等);但有状态的服务又会带来扩展能力的下降,面临新的挑战:

  1. 任务状态一致性:如何保证Agent任务状态和上下文在多个实例间的一致性是难题。可能需要借助会话粘连(保证同一会话总是由同一实例处理)或状态持久化等手段,来实现状态同步。这对分布式会话管理有较高要求。
  2. 任务调度与容错:需要有效的负载均衡和任务调度策略来优化资源利用,是简单的轮询还是根据实例响应时间或资源使用的动态分配等。此外,还需要考虑实例故障的容错,通常需要借助成熟的负载均衡与容器编排策略。

而对于更复杂的多Agent系统跨网络的协作,还会面临更多问题:

  1. Agent 协同协议。如果每对Agent都用各自私有接口对话,随着Agent数量增加,连接关系将变得错综复杂,维护成本高且易出错,会导致极高的耦合度。
  2. 消息传递效率。Agent 间需要频繁交换消息,如果通信机制低效,多 Agent 协作的整体速度将受限。特别在涉及长时间任务时,如何让Agent间异步并行工作、及时通知任务状态变化,避免因为轮询等机制浪费资源,都是需要考虑的问题。
  3. 上下文共享。多个 Agent 合作时,每个 Agent 只掌握局部信息,如何在它们之间共享上下文尤为关键。需要机制让Agent能够方便地共享知识和中间结果(例如通过公共内存、黑板系统甚至数据库)。
  4. 能力复用。当不同Agent具备互补的技能时,应该允许互相调用对方的能力,而不是各自重复实现同样的功能。这要求在 Agent 之间建立服务发现与调用机制。

2025年MAS(Multi-Agent System)系统仍不成熟,业内对于单Agent还是多Agent仍存在大量争论,MAS系统的设计和协调机制复杂度高,行为难以预测和控制,目前更适合研究而非生产,所以A2A协议没有像MCP协议快速发展和普及。

协作方案

  1. AutoGen 框架的分布式运行时。AutoGen在最新0.4版本后引入了实验性的分布式 Agent Runtime。其架构包含一个中央的主机服务(Host Service)和若干工作节点(Worker Runtime): AutoGen 框架提供一体化的解决方案,适合于快速构建同构环境下的多Agent协作;但局限是协作的 Agent 需要在同一 AutoGen 框架内,跨不同框架和语言的 Agent 互通需另行考虑。
  2. 方案二:Agent 远程服务化(RPC/MCP)这种方法是将每个 Agent 作为独立服务来部署,通过远程调用的方式实现协作。利用成熟的分布式技术栈。同时,每个 Agent 服务可以独立扩展和部署(当然横向扩展也会面临上面提到的问题),天然具有模块边界清晰、故障隔离好的特点。然而,纯粹的 RPC 式协作也有明显的局限:
    1. 需要开发者自行设计请求/响应的数据格式和流程。PS:不标准
    2. Agent 之间没有共享的“会话”概念或上下文维护机制。很快会陷入会话管理、内存管理和协调同步的复杂细节中。Agent系统的真实任务场景复杂度往往较高,简单的无状态API无法满足。比如:
      • 一个需要很长时间运行的任务
      • 多轮对话的问题,Human-in-the-loop工作流
      • 流式输出的问题
      • 不同供应商之间的Agent之间的互信问题
    3. 随着协作Agent数量增加,开发者可能需要处理繁琐的编排逻辑。 这种方法总体上仍需要开发者规划好哪个Agent调用哪个、何时调用,并处理数据格式转换和错误恢复,其适用场景通常是固定的流程编排或工具调用型的Agent协作,即任务流程相对确定,Agent 之间主要是服务调用关系而非自由对话。对于需要灵活对话协商、多轮交互的场景,API/RPC方式会变得力不从心。
  3. Google 的 Agent-to-Agent 协议
    1. 服务发现机制:每个支持 A2A 的 Agent 对外暴露一个Agent Card(JSON 文档),描述该 Agent 的元信息 。解决了异构 Agent 之间如何互相发现和了解彼此功能的问题。
    2. 标准化的消息与任务结构:A2A 将 Agent 之间交互抽象为一个个任务(Task)。任务由协议统一定义生命周期,Agent 间通过发送消息来协商和更新任务状态,每条消息都有明确的结构,确保不同实现的Agent都能正确理解彼此发送的内容。
    3. 多样的通信模式:考虑到不同任务对交互实时性的要求,A2A 支持多种通信方式 ,支持短周期的请求/响应或者异步通知的任务形式。
    4. 安全与跨平台:作为企业级协议,A2A 特别强调了通信的安全性和跨平台兼容

整体上而言,A2A提供的价值与MCP是类似的:

  1. 降低异构Agent之间的集成复杂性:你无需了解对端Agent的细节。
  2. 提高Agent能力的可复用性:你可以把某个任务交给更擅长它的Agent。
  3. 更好的扩展性以适应变化:Agent的内部逻辑变化可以被A2A所隔离。 PS: 与单体服务==> 微服务演进是一样一样的,利用A2A协议构建“服务化”的Agent系统

A2A

人与人之间可以通过各种各样的方式沟通:对话,眼神,肢体动作,画作等,这些可以帮助不同的人之间相互了解对方,并做出正确的动作,共同推动人类社会的发展,那么Agent之间沟通协作呢?Google给出了自己的答案:A2A 。

A2A 作为一个开放协议,充分考虑了 Agent 在和用户、企业打通的过程中所面临的一些挑战,其主要功能特性有以下四点:

  1. 安全协作(Secure Collaboration):通过引入认证/授权机制,保证 Agent 之间的身份互信。
  2. 任务状态管理(Task and state mgmt):实现了 Agent 之间互操作任务以及任务状态的可管理性。
  3. 用户体验协商(UX negotiation):不同的 Agent 通过协商的方式,对用户提供无缝的体验。
  4. 功能发现(Capability discovery):提供了 Agent 之间相互发现各自能力的机制。 除此之外,A2A 也在企业的无缝接入、简化集成方面,有比较好的考量。

Agent 相互之间的发现、了解和交互调用,是一个发展趋势。

  1. 首先,企业基于当前业务,都在探索、建立各种各样的 领域Agent 。在内部的各种 领域Agent 之间的沟通协作,是必须要面对和解决的一个问题。
  2. 其次,对于对外提供 Agent 服务的提供商来说,我如何让其他 Agent 主动发现我,就像SEO,吸引更多的流量,也是一个需要思考的问题。

概念

https://google-a2a.github.io/A2A/latest/specification/ PS:学习协议要学会抓包,尤其是http 协议更方便。

ClientAgent 和ServerAgent之间通过HTTP协议进行通信,使用经典的C/S模式,支持SSE流式数据传输,数据格式为JSON-RPC2.0。PS:可以理解为

  1. 定义了有哪些api endpoint(mcp固定为 http://host:port/xx/{sse|mcp},a2a 固定为http://host:port{/ | .well-known/agent.json},区分功能主要靠jsonrpc内的 method) 以及endpoint 的入参出参schema
  2. 定义了哪些 json-rpc method。 https://a2a-protocol.org/latest/specification/#356-method-mapping-reference-table
  3. json-rpc method 对应哪些数据对象

A2A(Agent2Agent) 简介 A2A 中包含三个核心的参与者:

  1. User,主要的作用是用于 认证&授权
  2. Client Agent,指的是任务发起者
  3. Remote Agent,指的是任务的执行者。

Client 和 Server 之间的通信,可以理解为就是一个个简单的请求和结果的响应,只不过这个请求是一个个的任务(Client和Server之间通信是以任务的粒度进行)。一个 Agent 既可以是 Client 也可以是 Server。Client Agent 和 Server Agent 交互的过程中,主要涉及到的一些Entity:AgentCard、Task 、Artifact 、Message、Part。

  1. AgentCard 是 Server Agent 的名片,它主要描述了 Server Agent 的能力、认证机制等信息。Client Agent通过获取不同 Server Agent 的 AgentCard,了解不同 Server Agent 的能力,来决断具体的任务执行应该调用哪个 Server Agent 。
     interface AgentCard {
         name: string;
         description: string;
         url: string;
         provider?: {
             organization: string;
             url: string;
         };
         version: string;
         documentationUrl?: string;
         capabilities: {
             streaming?: boolean; 
             pushNotifications?: boolean;
             stateTransitionHistory?: boolean;
         };
    
         authentication: {
             schemes: string[]; 
             credentials?: string;
         };
         defaultInputModes: string[];
         defaultOutputModes: string[];
         skills: {
             id: string; 
             name: string;
             description: string;
             tags: string[];
             examples?: string[]; 
             inputModes?: string[];
             outputModes?: string[];
         }[];
     }
    
  2. Task 是一个具有状态的实体,由server创建的有状态工作单元是客户端交给服务端Agent需要完成的工作任务,拥有独立任务 ID 和生命周期状态(例如 submitted → working → input‑required → completed/failed/canceled) 。任务过程可能需要客户端的协作;任务结果可以同步等待也可以异步获取。一个Task用于达到特定的目标或者结果。Agent Client和Server Client在Task中交换Mesaage,Server Agent生成的(最终)结果叫做Artifact(比如一段文字、一个报告、一个图片或者视频)。除此之外,每个Task有一个唯一的sessionId,多个Task可以使用一个sessionId,表明多个Task属于同一个会话的一部分。
     interface Task {
         id: string;
         sessionId: string;
         status: TaskStatus;
         history?: Message[];
         artifacts?: Artifact[]; 
         metadata?: Record<string, any>; 
     }
     interface TaskStatus {
         state: TaskState;
         message?: Message;
         timestamp?: string; 
     }
     interface TaskStatusUpdateEvent {
         id: string;
         status: TaskStatus;
         final: boolean; //indicates the end of the event stream
         metadata?: Record<string, any>;
     }
     interface TaskArtifactUpdateEvent {
         id: string;
         artifact: Artifact;
         metadata?: Record<string, any>;
     }
     interface TaskSendParams { #  后来协议演进,改为message/send 了
         id: string;
         sessionId?: string; 
         message: Message;
         historyLength?: number; 
         pushNotification?: PushNotificationConfig;
         metadata?: Record<string, any>; // extension metadata
     }
     type TaskState =
         | "submitted"
         | "working"
         | "input-required"
         | "completed"
         | "canceled"
         | "failed"
         | "unknown";
    
  3. Artifacts:Server Agent 在执行任务后生成的目标结果叫做 Artifact,一个 Task 可能生成一个或者多个 Artifact。Artifacts 是不可变的,可以命名,并且可以有多个部分。流式响应可以分批次,将结果附加到现有 Artifacts上。
     interface Artifact {
         name?: string;
         description?: string;
         parts: Part[];
         metadata?: Record<string, any>;
         index: number;
         append?: boolean;
         lastChunk?: boolean;
     }
    
  4. 在 Task执行过程中,Server Agent和Client Agent之间是通过Message完成交流的,当然,这不包括Artifact。它可以包括:Agent的思考、用户上下文、指令、错误、状态或元数据。一个Message可以包含多个Part,每个Part携带不同的内容。
     interface Message {
         role: "user" | "agent";
         parts: Part[];
         metadata?: Record<string, any>;
     }
    

    Part 是 Message 和 Artifact 的核心组成部分,代表了其携带的主要内容。每个 Part 都标识了内容类型和具体内容。

     interface TextPart {
         type: "text";
         text: string;
     }
     interface FilePart {
         type: "file";
         file: {
             name?: string;
             mimeType?: string;
             // oneof {
             bytes?: string; //base64 encoded content
             uri?: string;
             //}
         };
     }
     interface DataPart {
         type: "data";
         data: Record<string, any>;
     }
     type Part = (TextPart | FilePart | DataPart) & {
         metadata: Record<string, any>;
     };
    

通信

A2A客户端与服务端之间的任务交互过程,有三种主要模式(针对不同的场景):

  1. 非流模式通过HTTP Post同步获得请求的响应结果。适用于那些执行时间较短或不需要实时反馈的任务。json-rpc method = message/send,很类似langchain.runnable.invoke
  2. 流模式通过接收HTTP请求后的多个SSE事件来获取响应。 json-rpc method = message/stream, 类似langchain.runnable.ainvoke
  3. PushNotification。对于执行时间极长(数小时或数天)、非交互式或“即发即忘”类型的任务,A2A协议提供了第三种,也是解耦程度最高的通信模型:基于Webhook的推送通知。在该模型中,客户端在发起任务时向服务器提供一个公开可访问的HTTP URL,即Webhook。服务器在任务状态发生重要变化时(例如完成、失败或需要输入),会向这个URL发送一个异步的HTTP POST请求。这种方式完全解耦了客户端和服务器,任务发起后双方无需维持任何持久连接或进行后续轮询。这是异步消息处理的经典模式,通信双方在初始请求后无需同时在线。它非常适合批量处理场景,或涉及人类审批环节的流程,这些流程的执行时间可能非常长且不可预测。

Client Agent 和 Server Agent 之间协同工作需要经过以下几个关键步骤:

  1. Server Agent 在指定站点托管自己的 AgentCard;官方建议将 AgentCard 托管在 https://${host:port}/.well-known/agent.json。叫做 Open Discovery,除此之外,还有另外两种方式:Curated Discovery 和 Private Discovery。Agent Client 可以通过请求https://${host:port}/.well-known/agent.json,获取到指定的 AgentCard,并集成到自己的提示词或者工具集中。
  2. Client Agent 主动从/.well-known/agent.json发现 AgentCard;
  3. Client Agent 发起一个 Task;以启动新任务、恢复中断的任务或重新打开已完成的任务。PS:客户端通过method=tasks/send发送任务请求,或者通过method=tasks/sendSubscribe发送长期任务请求。
     {
         "jsonrpc": "2.0",
         "id": 1,
         "method":"tasks/send",
         "params": {
             "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
             "message": {
             "role":"user",
             "data": [{
                 "type":"text",
                 "text": "tell me a joke"
             }]
             },
             "metadata": {}
         }
     }
    
  4. Client Agent 设置任务通知监听;ClientAgent 可以设置一个方法,给到 ServerAgent,当 ServerAgent 修改 Task 状态后,同步调用 ClientAgent 的监听方法。
     //Request
     {
         "jsonrpc": "2.0",
         "id": 1,
         "method":"tasks/pushNotification/set",
         "params": {
             "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
             "pushNotificationConfig": {
                 "url": "https://example.com/callback",
                 "authentication": {
                     "schemes": ["jwt"]
                 }
             }
         }
     }
     //Response
     {
         "jsonrpc": "2.0",
         "id": 1,
         "result": {
             "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
             "pushNotificationConfig": {
                 "url": "https://example.com/callback",
                 "authentication": {
                     "schemes": ["jwt"]
                 }
             }
         }
     }
    
  5. Server Agent 执行任务,返回 Artifact;PS:根据任务类型进行流式更新。
     {
         "jsonrpc": "2.0",
         "id": 1,
         "result": {
             "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
             "sessionId": "c295ea44-7543-4f78-b524-7a38915ad6e4",
             "status": {
                 "state": "completed",
             },
             "artifacts": [{
                 "name":"joke",
                 "parts": [{
                     "type":"text",
                     "text":"Why did the chicken cross the road? To get to the other side!"
                 }]
             }],
             "metadata": {}
         }
     }
    
  6. Client Agent 获取 Artifact。这里需要注意的是,Client Agent 需要通过获取 Task 的方式,获取到Artifact。如果任务需要更多输入,客户端可以在同一任务ID下继续提供输入。
  7. 任务完成。任务在达到完成、失败或取消状态后,结束任务生命周期。

PS: 结合Multiagent来看的话,Sub Agent 接收任务并完成后,需要将结果反馈给 Supervisor,并附带上下文唯一标识(如 Task_ID),以便 Supervisor 异步接收反馈并驱动下一步决策。

安全

A2A遵循Open API规范进行身份验证。A2A不会在协议中交换身份信息。相反,它们会在带外获取材料(如令牌),并在HTTP 头中传输。

A2A vs MCP

MCP 还是传统的工程思维,A2A则是站在人的思维来看待世界。我们要理解MCP的定位:提供一个规范的方式,向LLMs/Agent提供上下文,是Agent的“内部事务”。MCP强调的是LLMs/Agent为主体,MCPServer为附属的模式。而A2A强调的是Agent和Agent之间的相互操作,协议双端是对等的

源码

在A2A框架里,协议层抽象了几个状态管理的组件

  1. 为了管理Task的状态,提出了TaskStore
    1. 为了做到Server端可以向客户端推送状态和断线重连(resubscribe),提供了TaskNotifier和TaskQueue的远程实现

low level

官方示例 https://github.com/google/A2A/tree/main/samples/python/common 待补充。基于Starlette low level一点的实现

class A2AServer:
    def __init__(self, host="0.0.0.0",port=5000,endpoint="/",agent_card: AgentCard = None,task_manager: TaskManager = None,):
        self.host = host
        self.port = port
        self.endpoint = endpoint
        self.task_manager = task_manager
        self.agent_card = agent_card
        self.app = Starlette()  # 初始化了一个app
        self.app.add_route(self.endpoint, self._process_request, methods=["POST"])
        self.app.add_route(
            "/.well-known/agent.json", self._get_agent_card, methods=["GET"]
        )
    def start(self):
        if self.agent_card is None:
            raise ValueError("agent_card is not defined")

        if self.task_manager is None:
            raise ValueError("request_handler is not defined")
        import uvicorn
        uvicorn.run(self.app, host=self.host, port=self.port)
    def _get_agent_card(self, request: Request) -> JSONResponse:
        return JSONResponse(self.agent_card.model_dump(exclude_none=True))
     async def _process_request(self, request: Request):
        try:
            body = await request.json()
            json_rpc_request = A2ARequest.validate_python(body)

            if isinstance(json_rpc_request, GetTaskRequest):
                result = await self.task_manager.on_get_task(json_rpc_request)
            elif isinstance(json_rpc_request, SendTaskRequest):
                result = await self.task_manager.on_send_task(json_rpc_request)
            elif isinstance(json_rpc_request, SendTaskStreamingRequest):
                result = await self.task_manager.on_send_task_subscribe(
                    json_rpc_request
                )
            elif isinstance(json_rpc_request, CancelTaskRequest):
                result = await self.task_manager.on_cancel_task(json_rpc_request)
            elif isinstance(json_rpc_request, SetTaskPushNotificationRequest):
                result = await self.task_manager.on_set_task_push_notification(json_rpc_request)
            elif isinstance(json_rpc_request, GetTaskPushNotificationRequest):
                result = await self.task_manager.on_get_task_push_notification(json_rpc_request)
            elif isinstance(json_rpc_request, TaskResubscriptionRequest):
                result = await self.task_manager.on_resubscribe_to_task(
                    json_rpc_request
                )
            else:
                logger.warning(f"Unexpected request type: {type(json_rpc_request)}")
                raise ValueError(f"Unexpected request type: {type(request)}")
            return self._create_response(result)
        except Exception as e:
            return self._handle_exception(e)

a2a-python

a2a-python

server

request_handler = DefaultRequestHandler(
    agent_executor=HelloWorldAgentExecutor(),
    task_store=InMemoryTaskStore(),
)
server = A2AStarletteApplication(
    agent_card=public_agent_card,
    http_handler=request_handler,
    extended_agent_card=specific_extended_agent_card,
)
starlette_app = server.build()
uvicorn.run(starlette_app, host='0.0.0.0', port=9999)

  1. agent_card_url ==> A2AStarletteApplication._handle_get_agent_card
  2. rpc_url ==> A2AStarletteApplication._handle_requests ==> _process_streaming_request /_process_non_streaming_request ==> JSONRPCHandler.on_message_send_stream ==> request_handler.on_message_send_stream ==> agent_executor.execute ==> agent.invoke
    class DefaultRequestHandler(RequestHandler):
     async def on_message_send_stream(self,params,context)-> AsyncGenerator[Event]:
         task_manager = TaskManager( 
             task_id=params.message.taskId,
             context_id=params.message.contextId,
             task_store=self.task_store,
             initial_message=params.message,
         )
         task: Task | None = await task_manager.get_task()
         ...
         request_context = await self._request_context_builder.build(params,task_id,context_id,...)
         queue = await self._queue_manager.create_or_tap(task_id)
         # producer 处理request 并将 流式结果发到queue
         producer_task = asyncio.create_task(self._run_event_stream(request_context,  queue,))
         await self._register_producer(task_id, producer_task)
         consumer = EventConsumer(queue)
         # consumer 从queue 里取出event 并返给调用方
         async for event in result_aggregator.consume_and_emit(consumer):
             yield event
     async def _run_event_stream(self, request: RequestContext, queue: EventQueue) -> None:
         await self.agent_executor.execute(request, queue)
         await queue.close()
    

    AgentExecutor 只是一个interface,包含execute和cancel方法

    class HelloWorldAgentExecutor(AgentExecutor):
     def __init__(self):
         self.agent = HelloWorldAgent()
     async def execute(self,context: RequestContext,event_queue: EventQueue,) -> None:
         result = await self.agent.invoke()
         await event_queue.enqueue_event(new_agent_text_message(result))
     async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
         raise Exception('cancel not supported')
    def new_agent_text_message(text: str,context_id,task_id) -> Message:
     return Message(
         role=Role.agent,
         parts=[Part(root=TextPart(text=text))],
         messageId=str(uuid.uuid4()),
         taskId=task_id,
         contextId=context_id,
     )
    

    也就是框架已经封装的蛮好了,只需要开发一个agent实现(任何框架)并封装成一个AgentExecutor interface实例即可。从 RequestContext 拿到json-rpc body(有Message,有Metadata),进行处理,处理完封装 a2a.types.Event 发出。

对于调用方来说,类似mcpserver,拿到所有的 mcpserver urls,就可以list_tools 并使用

mcp_servers = {
    "tavily-mcp": {
        "transport": "sse",
        "url": "http://0.0.0.0:8080/sse",
    }
}
async def run_agent():
    async with MultiServerMCPClient(mcp_servers) as client:
        agent = create_react_agent(llm, client.get_tools())
        await agent.ainvoke({"messages": "what's 3 + 5"})

对于a2a 调用方来说,知道server url 即可访问server,a2a-python 提供了RemoteAgentConnections 对server url 进行了简单封装

class RemoteAgentConnections:
    def __init__(self, client: httpx.AsyncClient, agent_card: AgentCard):
        self.agent_client = A2AClient(client, agent_card)
        self.card = agent_card
        self.pending_tasks = set()

    def get_agent(self) -> AgentCard:
        return self.card

    async def send_message(self,request: MessageSendParams,task_callback: TaskUpdateCallback | None,) -> Task | Message | None:
        response = await self.agent_client.send_message(SendMessageRequest(id=str(uuid4()), params=request))
        ...

google adk RemoteA2aAgent 提供了更丰富的封装,输入一个server url,即可将RemoteA2aAgent 作为本地agent 使用。

待定

  1. 如何基于a2a 实现human in the loop?
  2. 如何用langgraph 方便的访问多个a2a server?

安全

  1. Agent可信身份:让经过认证的Agent快速加入协作网络,防止未认证Agent破坏协作秩序。
  2. 意图的可信共享:智能体间的协作依赖于意图的真实性和准确性。例如,点餐助手与支付助手共享信息时,若意图被篡改,可能导致重复扣费或订单丢失
  3. 上下文保护机制:当一个AI Agent连接多个MCP(多通道协议)服务器时,所有工具描述信息会被加载到同一会话上下文中,恶意MCP Server可能借此注入恶意指令。比如黑灰产可以伪造一个“天气查询”工具注册到MCP Server,实际却在后台窃取用户航班信息;
  4. Agent记忆可信共享:记忆共享提升多Agent协作效率,如电商场景中记录用户偏好避免重复询问。记忆可信共享则确保数据一致、真实且安全,防止篡改与泄露,增强协作效果和用户信任。
  5. 身份可信流转:用户期待在AI原生应用中获得无缝流畅的服务体验。如果每次交互都需要跳转不同平台进行身份认证,将严重影响体验并阻碍AI应用普及。因此,实现跨平台无打扰的身份识别,成为提升用户体验的关键。

其它

PS:消息总线方式理解a2a

留下评论