分布式Agent系统中的智能服务发现与路由
A2A Protocol采用分布式Agent注册中心架构,确保高可用性和横向扩展能力。注册中心负责管理所有Agent的生命周期、能力信息和服务状态。
{
"registryEntry": {
"agentId": "kb-agent-prod-01",
"registrationTime": "2024-12-19T10:30:00Z",
"lastHeartbeat": "2024-12-19T11:45:30Z",
"status": "ACTIVE",
"agentCard": {
// 完整的Agent Card信息
"agentInfo": { ... },
"capabilities": [ ... ],
"endpoints": { ... }
},
"runtimeMetrics": {
"currentLoad": 0.65,
"averageResponseTime": 850,
"activeConnections": 23,
"memoryUsage": 0.42,
"cpuUsage": 0.38
},
"networkInfo": {
"datacenter": "us-west-2",
"zone": "us-west-2a",
"internalIP": "10.0.1.15",
"publicEndpoint": "https://kb-agent.example.com"
},
"capabilities_index": {
// 为快速查询优化的能力索引
"search": {
"keywords": ["knowledge", "search", "nlp", "semantic"],
"inputTypes": ["text", "query"],
"outputTypes": ["results", "json"],
"performance": "high",
"languages": ["zh-CN", "en-US"]
}
},
"dependencies": {
"required": ["vector-db-service", "nlp-processor"],
"optional": ["translation-service"]
},
"configProfile": {
"environment": "production",
"version": "2.1.0",
"features": ["semantic-search", "multi-language"]
}
}
}
验证Agent身份、检查权限、确认依赖服务可用性
POST /registry/validate → 验证AgentCard格式和完整性
提交完整注册信息,分配Agent ID,建立监控连接
POST /registry/register → 创建注册条目并分配ID
分析Agent能力,构建搜索索引,优化发现算法
内部处理 → 构建多维度搜索索引
启动定期健康检查,建立心跳机制,监控性能指标
WebSocket连接 → 实时状态监控
Agent正式上线,开始接受服务请求,参与负载均衡
状态: ACTIVE → 开始处理A2A请求
import asyncio
import aiohttp
import json
from datetime import datetime
from typing import Dict, List, Optional
class A2ARegistryClient:
"""A2A Protocol Agent注册客户端"""
def __init__(self, registry_url: str, agent_card: Dict):
self.registry_url = registry_url
self.agent_card = agent_card
self.agent_id = None
self.registration_token = None
self.heartbeat_task = None
async def register(self) -> bool:
"""注册Agent到A2A注册中心"""
try:
# 步骤1: 预验证
validation_result = await self._validate_agent_card()
if not validation_result["valid"]:
raise Exception(f"Agent Card validation failed: {validation_result['errors']}")
# 步骤2: 正式注册
registration_data = {
"agentCard": self.agent_card,
"timestamp": datetime.utcnow().isoformat(),
"requestedCapabilities": self._extract_capabilities(),
"networkInfo": await self._gather_network_info()
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.registry_url}/api/v1/agents/register",
json=registration_data,
headers={"Content-Type": "application/json"}
) as response:
if response.status == 201:
result = await response.json()
self.agent_id = result["agentId"]
self.registration_token = result["token"]
# 步骤3: 启动心跳
await self._start_heartbeat()
print(f"Agent successfully registered with ID: {self.agent_id}")
return True
else:
error = await response.text()
raise Exception(f"Registration failed: {error}")
except Exception as e:
print(f"Registration error: {e}")
return False
async def _validate_agent_card(self) -> Dict:
"""验证Agent Card格式和内容"""
validation_data = {"agentCard": self.agent_card}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.registry_url}/api/v1/agents/validate",
json=validation_data
) as response:
return await response.json()
def _extract_capabilities(self) -> List[str]:
"""提取Agent能力列表"""
capabilities = []
for cap in self.agent_card.get("capabilities", []):
capabilities.append({
"name": cap["name"],
"inputTypes": self._analyze_schema_types(cap.get("inputSchema", {})),
"outputTypes": self._analyze_schema_types(cap.get("outputSchema", {})),
"keywords": self._extract_keywords(cap.get("description", "")),
"performance": cap.get("performance", {})
})
return capabilities
async def _gather_network_info(self) -> Dict:
"""收集网络和环境信息"""
import psutil
import socket
return {
"hostname": socket.gethostname(),
"internalIP": socket.gethostbyname(socket.gethostname()),
"systemInfo": {
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"disk_total": psutil.disk_usage('/').total
},
"endpoint": self.agent_card["endpoints"]["primary"]
}
async def _start_heartbeat(self):
"""启动心跳机制"""
async def heartbeat_loop():
while True:
try:
await self._send_heartbeat()
await asyncio.sleep(30) # 30秒心跳间隔
except Exception as e:
print(f"Heartbeat error: {e}")
await asyncio.sleep(5) # 错误后短暂等待
self.heartbeat_task = asyncio.create_task(heartbeat_loop())
async def _send_heartbeat(self):
"""发送心跳信息"""
import psutil
heartbeat_data = {
"agentId": self.agent_id,
"timestamp": datetime.utcnow().isoformat(),
"status": "ACTIVE",
"metrics": {
"cpu_usage": psutil.cpu_percent(interval=1),
"memory_usage": psutil.virtual_memory().percent / 100,
"active_connections": len(psutil.net_connections()),
"uptime": datetime.utcnow().isoformat()
}
}
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.registry_url}/api/v1/agents/{self.agent_id}/heartbeat",
json=heartbeat_data,
headers={"Authorization": f"Bearer {self.registration_token}"}
)
# 使用示例
async def main():
# 定义Agent Card
agent_card = {
"agentInfo": {
"name": "Knowledge Base Agent",
"type": "knowledge-base",
"version": "1.0.0",
"description": "智能知识库检索Agent"
},
"capabilities": [
{
"name": "search",
"description": "在知识库中搜索相关信息",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"category": {"type": "string"}
}
},
"outputSchema": {
"type": "object",
"properties": {
"results": {"type": "array"}
}
}
}
],
"endpoints": {
"primary": "ws://localhost:8080/agent",
"healthCheck": "http://localhost:8080/health"
}
}
# 创建注册客户端并注册
registry_client = A2ARegistryClient(
registry_url="http://localhost:9000",
agent_card=agent_card
)
success = await registry_client.register()
if success:
print("Agent注册成功,开始提供服务...")
# 保持运行状态
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
A2A Protocol实现了基于多个维度的智能Agent发现算法,能够根据请求的复杂性和上下文自动选择最合适的Agent。
class A2AAgentDiscovery:
"""A2A Protocol智能Agent发现引擎"""
def __init__(self, registry_client):
self.registry = registry_client
self.capability_embeddings = {} # 能力向量索引
self.performance_cache = {} # 性能指标缓存
async def discover_agents(self,
requirement: Dict,
context: Dict = None,
preferences: Dict = None) -> List[Dict]:
"""
智能发现匹配的Agent
Args:
requirement: 能力需求描述
context: 请求上下文信息
preferences: 选择偏好设置
"""
# 步骤1: 能力匹配
capability_matches = await self._match_capabilities(requirement)
# 步骤2: 性能评估
performance_scores = await self._evaluate_performance(capability_matches)
# 步骤3: 地理位置优化
location_scores = await self._calculate_location_scores(
capability_matches, context
)
# 步骤4: 综合评分
final_scores = self._calculate_composite_scores(
capability_matches,
performance_scores,
location_scores,
preferences or {}
)
# 步骤5: 排序和筛选
ranked_agents = self._rank_and_filter(final_scores, requirement)
return ranked_agents[:10] # 返回前10个最佳匹配
async def _match_capabilities(self, requirement: Dict) -> List[Dict]:
"""能力匹配算法"""
required_capability = requirement.get("capability")
input_schema = requirement.get("inputSchema", {})
# 从注册中心获取所有相关Agent
all_agents = await self.registry.query_agents({
"capabilities": required_capability,
"status": "ACTIVE"
})
matches = []
for agent in all_agents:
for capability in agent["agentCard"]["capabilities"]:
# 精确匹配
if capability["name"] == required_capability:
match_score = 1.0
else:
# 语义相似度匹配
match_score = await self._semantic_similarity(
required_capability,
capability["name"],
capability.get("description", "")
)
# 输入输出兼容性检查
io_compatibility = self._check_io_compatibility(
input_schema,
capability.get("inputSchema", {}),
capability.get("outputSchema", {})
)
if match_score > 0.6 and io_compatibility > 0.7:
matches.append({
"agent": agent,
"capability": capability,
"capability_score": match_score * io_compatibility,
"match_details": {
"semantic_score": match_score,
"io_compatibility": io_compatibility
}
})
return matches
async def _semantic_similarity(self, req_cap: str, agent_cap: str, description: str) -> float:
"""计算能力语义相似度"""
# 使用预训练的语义向量模型
req_embedding = await self._get_capability_embedding(req_cap)
agent_embedding = await self._get_capability_embedding(f"{agent_cap} {description}")
# 计算余弦相似度
similarity = self._cosine_similarity(req_embedding, agent_embedding)
return max(0.0, min(1.0, similarity))
async def _evaluate_performance(self, matches: List[Dict]) -> Dict[str, float]:
"""性能评估算法"""
performance_scores = {}
for match in matches:
agent_id = match["agent"]["agentId"]
# 获取性能指标
metrics = await self.registry.get_agent_metrics(agent_id)
# 计算性能评分
response_time_score = self._normalize_response_time(
metrics.get("averageResponseTime", 1000)
)
load_score = 1.0 - metrics.get("currentLoad", 0.5)
availability_score = metrics.get("availability", 0.99)
success_rate_score = metrics.get("successRate", 0.95)
# 综合性能评分
performance_score = (
response_time_score * 0.3 +
load_score * 0.3 +
availability_score * 0.2 +
success_rate_score * 0.2
)
performance_scores[agent_id] = performance_score
return performance_scores
def _calculate_composite_scores(self,
matches: List[Dict],
performance_scores: Dict[str, float],
location_scores: Dict[str, float],
preferences: Dict) -> List[Dict]:
"""计算综合评分"""
# 默认权重
weights = {
"capability": 0.4,
"performance": 0.3,
"location": 0.2,
"other": 0.1
}
# 根据偏好调整权重
if preferences.get("prioritize") == "performance":
weights["performance"] = 0.5
weights["capability"] = 0.3
elif preferences.get("prioritize") == "location":
weights["location"] = 0.4
weights["capability"] = 0.3
scored_matches = []
for match in matches:
agent_id = match["agent"]["agentId"]
composite_score = (
match["capability_score"] * weights["capability"] +
performance_scores.get(agent_id, 0.5) * weights["performance"] +
location_scores.get(agent_id, 0.5) * weights["location"]
)
match["composite_score"] = composite_score
scored_matches.append(match)
return scored_matches
# 使用示例
async def discover_knowledge_agent():
"""发现知识库Agent示例"""
registry_client = A2ARegistryClient("http://localhost:9000")
discovery = A2AAgentDiscovery(registry_client)
# 定义需求
requirement = {
"capability": "search",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"language": {"type": "string"}
}
},
"priority": "high",
"timeout": 5000
}
# 上下文信息
context = {
"requestLocation": "us-west-2",
"sessionId": "session_123",
"userLanguage": "zh-CN"
}
# 偏好设置
preferences = {
"prioritize": "performance",
"maxResults": 5,
"includeBackup": True
}
# 执行发现
agents = await discovery.discover_agents(requirement, context, preferences)
for agent in agents:
print(f"Agent: {agent['agent']['agentId']}")
print(f"Score: {agent['composite_score']:.3f}")
print(f"Capability: {agent['capability']['name']}")
print("---")