从零开始构建智能Agent通信系统
在现代人工智能系统中,单个Agent往往无法独立完成复杂任务。通过A2A Protocol,不同的智能Agent可以:
多个Agent协同完成复杂任务,如智能客服与知识库Agent的配合
Agent间共享计算资源、数据和专业能力
保持分布式Agent系统的状态一致性
智能分配和路由任务到最适合的Agent
{
"messageId": "msg_123456",
"fromAgent": "customer-service-agent",
"toAgent": "knowledge-base-agent",
"timestamp": "2024-12-19T10:30:00Z",
"messageType": "QUERY",
"payload": {
"query": "用户询问退款政策",
"context": {
"userId": "user_789",
"sessionId": "session_abc123"
}
},
"expectResponse": true,
"timeoutMs": 5000
}
具体的Agent业务逻辑和任务处理
A2A消息解析、路由和状态管理
WebSocket、HTTP、gRPC等底层通信
TCP/IP网络连接和数据传输
import asyncio
import json
import uuid
from typing import Dict, List, Optional, Callable
from datetime import datetime
import websockets
class A2AAgent:
"""A2A Protocol基础Agent实现"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.connections = {}
self.message_handlers = {}
self.is_running = False
async def register_capability(self, capability: str, handler: Callable):
"""注册Agent能力和对应的处理函数"""
self.capabilities.append(capability)
self.message_handlers[capability] = handler
async def send_message(self, target_agent: str, message_type: str, payload: Dict):
"""发送消息给目标Agent"""
message = {
"messageId": str(uuid.uuid4()),
"fromAgent": self.agent_id,
"toAgent": target_agent,
"timestamp": datetime.utcnow().isoformat(),
"messageType": message_type,
"payload": payload
}
# 通过WebSocket连接发送消息
if target_agent in self.connections:
await self.connections[target_agent].send(json.dumps(message))
return message["messageId"]
else:
raise ConnectionError(f"No connection to agent: {target_agent}")
async def handle_message(self, message: Dict):
"""处理接收到的消息"""
message_type = message.get("messageType")
if message_type in self.message_handlers:
handler = self.message_handlers[message_type]
response = await handler(message["payload"])
# 如果需要回复,发送响应消息
if message.get("expectResponse", False):
await self.send_message(
message["fromAgent"],
"RESPONSE",
{"originalMessageId": message["messageId"], "result": response}
)
else:
print(f"Unknown message type: {message_type}")
async def start(self, router_url: str = "ws://localhost:8080"):
"""启动Agent并连接到消息路由器"""
self.is_running = True
async with websockets.connect(router_url) as websocket:
# 注册Agent
registration = {
"action": "REGISTER",
"agentId": self.agent_id,
"capabilities": self.capabilities
}
await websocket.send(json.dumps(registration))
# 监听消息
async for message in websocket:
if self.is_running:
try:
data = json.loads(message)
await self.handle_message(data)
except Exception as e:
print(f"Error handling message: {e}")
else:
break
构建一个包含客服Agent、知识库Agent和订单处理Agent的智能客服系统。
# 客服Agent实现
class CustomerServiceAgent(A2AAgent):
def __init__(self):
super().__init__("customer-service", ["customer_query", "conversation"])
async def handle_customer_query(self, payload):
"""处理客户咨询"""
query = payload.get("query")
customer_id = payload.get("customerId")
# 首先查询知识库
knowledge_response = await self.send_message(
"knowledge-base",
"SEARCH",
{"query": query, "context": "customer_service"}
)
# 如果是订单相关问题,调用订单Agent
if "订单" in query or "物流" in query:
order_response = await self.send_message(
"order-processor",
"ORDER_QUERY",
{"customerId": customer_id, "query": query}
)
return {
"response": "已为您查询相关信息,请稍候...",
"status": "processing"
}
# 知识库Agent实现
class KnowledgeBaseAgent(A2AAgent):
def __init__(self):
super().__init__("knowledge-base", ["search", "update"])
self.knowledge_db = self._load_knowledge_base()
async def handle_search(self, payload):
"""搜索知识库"""
query = payload.get("query")
context = payload.get("context", "general")
# 模拟知识库搜索
results = self._search_knowledge(query, context)
return {
"results": results,
"confidence": 0.85,
"source": "knowledge_base"
}
def _search_knowledge(self, query: str, context: str):
"""模拟知识库搜索逻辑"""
# 这里实现实际的搜索算法
return [
{"title": "退款政策", "content": "30天内可无理由退款..."},
{"title": "配送说明", "content": "全国包邮,3-5个工作日..."}
]
# 启动多Agent系统
async def main():
# 创建Agent实例
customer_agent = CustomerServiceAgent()
knowledge_agent = KnowledgeBaseAgent()
# 启动Agent(在实际应用中会在不同进程/服务器上运行)
await asyncio.gather(
customer_agent.start(),
knowledge_agent.start()
)
if __name__ == "__main__":
asyncio.run(main())
使用多个专门的Agent协作处理大规模数据: