Agent间通信的标准消息格式和编码规范
A2A Protocol采用分层的消息分类系统,根据功能和用途将消息分为四个主要类别,每个类别包含多个具体的消息类型。
Agent间的服务请求消息
对REQUEST的响应消息
流式数据传输消息
连接建立和协商
连接保活和状态监控
连接断开通知
大文件分块传输
结构化数据载荷
多媒体内容传输
系统事件广播
Agent状态变更
紧急告警通知
优先级 | 值 | 处理策略 | 适用场景 |
---|---|---|---|
URGENT | 4 | 立即处理,抢占资源 | 系统告警,紧急停止指令 |
HIGH | 3 | 优先队列处理 | 实时请求,用户交互 |
NORMAL | 2 | 标准队列处理 | 常规业务请求 |
LOW | 1 | 空闲时处理 | 批处理,数据同步 |
{
"header": {
"messageType": "REQUEST",
"messageId": "req_20241219_001",
"timestamp": "2024-12-19T12:30:00.000Z",
"version": "1.0",
"priority": "NORMAL"
},
"routing": {
"source": {
"agentId": "customer-service-01",
"agentType": "customer-service",
"instanceId": "cs-instance-001"
},
"target": {
"agentId": "knowledge-base-01",
"agentType": "knowledge-base",
"routingStrategy": "direct"
},
"traceId": "trace_123456789",
"spanId": "span_987654321"
},
"request": {
"capability": "search",
"action": "semantic_search",
"parameters": {
"query": "产品退款政策相关信息",
"category": "customer_service",
"language": "zh-CN",
"maxResults": 5,
"includeContext": true
},
"expectations": {
"responseFormat": "structured",
"maxResponseTime": 3000,
"minConfidence": 0.7
},
"context": {
"sessionId": "session_abc123",
"userId": "user_789",
"conversationHistory": [
{
"role": "user",
"content": "我想了解退款相关的政策"
}
]
}
},
"metadata": {
"timeout": 5000,
"retryPolicy": {
"maxRetries": 3,
"backoffStrategy": "exponential",
"baseDelay": 1000
},
"caching": {
"enabled": true,
"ttl": 300,
"key": "search_退款政策_zh-CN"
}
},
"security": {
"authToken": "bearer_token_here",
"permissions": ["read:knowledge-base"],
"signature": "digital_signature_here"
}
}
请求的Agent能力名称,必须与目标Agent的能力声明匹配
具体的操作类型,用于细化能力调用
执行操作所需的参数,遵循目标Agent的输入Schema
对响应的期望,包括格式、时间、质量要求
{
"header": {
"messageType": "RESPONSE",
"messageId": "resp_20241219_001",
"correlationId": "req_20241219_001",
"timestamp": "2024-12-19T12:30:01.250Z",
"version": "1.0"
},
"routing": {
"source": {
"agentId": "knowledge-base-01",
"agentType": "knowledge-base"
},
"target": {
"agentId": "customer-service-01",
"agentType": "customer-service"
},
"traceId": "trace_123456789",
"spanId": "span_987654322"
},
"response": {
"status": "SUCCESS",
"statusCode": 200,
"data": {
"results": [
{
"id": "kb_article_001",
"title": "产品退款政策详解",
"content": "根据我司政策,商品在未开封状态下7天内可申请退款...",
"confidence": 0.92,
"source": "official_policy_doc",
"lastUpdated": "2024-12-01T00:00:00Z",
"category": "customer_service",
"tags": ["退款", "政策", "7天无理由"]
},
{
"id": "kb_faq_001",
"title": "退款常见问题",
"content": "Q: 退款需要多长时间?A: 通常3-5个工作日到账...",
"confidence": 0.87,
"source": "faq_database",
"lastUpdated": "2024-11-15T00:00:00Z",
"category": "customer_service",
"tags": ["退款", "FAQ", "处理时间"]
}
],
"totalCount": 2,
"searchMetadata": {
"queryProcessingTime": 145,
"searchAlgorithm": "semantic_vector_search",
"indexVersion": "2024.12.01"
}
},
"performance": {
"processingTime": 1250,
"cacheHit": false,
"resourceUsage": {
"cpu": 0.23,
"memory": 0.15,
"ioOps": 12
}
}
},
"metadata": {
"cached": true,
"cacheKey": "search_退款政策_zh-CN",
"cacheTtl": 300
}
}
{
"header": {
"messageType": "RESPONSE",
"messageId": "resp_error_001",
"correlationId": "req_20241219_001",
"timestamp": "2024-12-19T12:30:01.500Z"
},
"routing": {
"source": {
"agentId": "knowledge-base-01",
"agentType": "knowledge-base"
},
"target": {
"agentId": "customer-service-01",
"agentType": "customer-service"
}
},
"response": {
"status": "ERROR",
"statusCode": 400,
"error": {
"code": "INVALID_PARAMETERS",
"message": "查询参数验证失败",
"details": {
"field": "maxResults",
"expectedType": "integer",
"actualValue": "not_a_number",
"constraints": {
"min": 1,
"max": 100
}
},
"suggestions": [
"确保maxResults字段为1-100之间的整数",
"检查请求参数的数据类型"
],
"documentation": "https://docs.a2aprotocol.org/api/search#parameters"
},
"performance": {
"processingTime": 50,
"failurePoint": "parameter_validation"
}
}
}
{
"header": {
"messageType": "STREAM",
"messageId": "stream_001",
"streamId": "stream_session_123",
"sequenceNumber": 1,
"timestamp": "2024-12-19T12:30:00.000Z"
},
"routing": {
"source": {
"agentId": "data-processor-01",
"agentType": "data-processor"
},
"target": {
"agentId": "analytics-engine-01",
"agentType": "analytics-engine"
}
},
"stream": {
"streamType": "DATA_CHUNK",
"chunkInfo": {
"chunkIndex": 1,
"totalChunks": 100,
"chunkSize": 8192,
"totalSize": 819200
},
"data": {
"encoding": "base64",
"compression": "gzip",
"content": "H4sIAAAAAAAAA...", // base64编码的数据
"checksum": "sha256:abc123def456..."
},
"metadata": {
"dataType": "json",
"schema": "analytics_data_v1.0",
"timestamp": "2024-12-19T12:29:59.000Z"
}
},
"flowControl": {
"requiresAck": true,
"bufferSize": 10,
"backpressure": false
}
}
{
"header": {
"messageType": "HANDSHAKE",
"messageId": "handshake_001",
"timestamp": "2024-12-19T12:30:00.000Z",
"version": "1.0"
},
"routing": {
"source": {
"agentId": "new-agent-01",
"agentType": "text-analyzer"
},
"target": {
"agentId": "registry-service",
"agentType": "registry"
}
},
"handshake": {
"phase": "INITIATE", // INITIATE, NEGOTIATE, CONFIRM, COMPLETE
"protocolVersions": ["1.0", "1.1"],
"supportedFeatures": [
"streaming",
"compression",
"encryption",
"batch_processing"
],
"agentCapabilities": {
"maxConcurrentRequests": 50,
"supportedEncodings": ["json", "msgpack", "protobuf"],
"authMethods": ["bearer-token", "mutual-tls"],
"compressionAlgorithms": ["gzip", "lz4"]
},
"connectionPreferences": {
"transport": "websocket",
"keepAlive": 30,
"maxMessageSize": 10485760,
"compressionEnabled": true
},
"security": {
"encryptionRequired": true,
"certificateChain": "base64_encoded_cert_chain",
"supportedCiphers": ["AES-256-GCM", "ChaCha20-Poly1305"]
}
}
}
{
"header": {
"messageType": "HEARTBEAT",
"messageId": "hb_20241219_001",
"timestamp": "2024-12-19T12:30:00.000Z",
"priority": "LOW"
},
"routing": {
"source": {
"agentId": "knowledge-base-01",
"agentType": "knowledge-base"
},
"target": {
"agentId": "registry-service",
"agentType": "registry"
}
},
"heartbeat": {
"status": "HEALTHY",
"uptime": 86400, // 秒
"performance": {
"cpu": {
"usage": 0.35,
"cores": 8,
"frequency": 2.4
},
"memory": {
"used": 0.45,
"available": 16777216, // KB
"gc_pressure": 0.02
},
"network": {
"connectionsActive": 23,
"bytesIn": 1048576,
"bytesOut": 2097152,
"latencyAvg": 15 // ms
},
"application": {
"requestsPerSecond": 150,
"averageResponseTime": 250, // ms
"errorRate": 0.001,
"queueDepth": 5
}
},
"capabilities": {
"available": ["search", "index", "analyze"],
"degraded": [],
"unavailable": []
},
"dependencies": {
"database": {
"status": "HEALTHY",
"latency": 5 // ms
},
"vectorIndex": {
"status": "HEALTHY",
"latency": 12 // ms
},
"cache": {
"status": "DEGRADED",
"latency": 50, // ms
"hitRate": 0.85
}
}
}
}
ARTIFACT消息用于传输结构化的数据工件,如文档、图像、分析结果等持久化内容。
{
"header": {
"messageType": "ARTIFACT",
"messageId": "artifact_doc_001",
"timestamp": "2024-12-19T12:30:00.000Z"
},
"routing": {
"source": {
"agentId": "document-processor-01",
"agentType": "document-processor"
},
"target": {
"agentId": "storage-agent-01",
"agentType": "storage"
}
},
"artifact": {
"type": "DOCUMENT",
"subType": "analysis_report",
"format": "json",
"metadata": {
"title": "市场分析报告2024Q4",
"description": "第四季度市场趋势分析和预测",
"version": "1.0",
"author": "analytics-engine-01",
"createdAt": "2024-12-19T10:00:00.000Z",
"tags": ["市场分析", "Q4", "预测", "趋势"],
"category": "business_intelligence",
"confidentiality": "internal"
},
"content": {
"summary": {
"keyFindings": [
"Q4市场增长率达到15%",
"用户活跃度提升25%",
"新兴市场表现突出"
],
"recommendations": [
"加大新兴市场投入",
"优化用户体验",
"扩展产品线"
]
},
"data": {
"marketGrowth": {
"q4_2024": 0.15,
"q3_2024": 0.12,
"yearOverYear": 0.23
},
"userMetrics": {
"activeUsers": 1250000,
"growthRate": 0.25,
"retention": 0.82
},
"regionalData": {
"northAmerica": {
"growth": 0.10,
"share": 0.45
},
"asia": {
"growth": 0.28,
"share": 0.35
},
"europe": {
"growth": 0.08,
"share": 0.20
}
}
},
"charts": [
{
"type": "line_chart",
"title": "季度增长趋势",
"dataUrl": "https://storage.example.com/charts/growth_trend.png",
"description": "过去8个季度的增长趋势图"
}
]
},
"storage": {
"persistent": true,
"retention": "5_years",
"backupRequired": true,
"indexingEnabled": true
}
}
}
{
"header": {
"messageType": "MEDIA",
"messageId": "media_img_001",
"timestamp": "2024-12-19T12:30:00.000Z"
},
"routing": {
"source": {
"agentId": "image-processor-01",
"agentType": "image-processor"
},
"target": {
"agentId": "user-interface-01",
"agentType": "ui"
}
},
"media": {
"type": "IMAGE",
"format": "png",
"encoding": "base64",
"compression": "none",
"metadata": {
"width": 1920,
"height": 1080,
"channels": 3,
"colorSpace": "sRGB",
"dpi": 300,
"fileSize": 2048576, // bytes
"processingInfo": {
"algorithm": "style_transfer",
"model": "neural_style_v2.1",
"processingTime": 15.5, // seconds
"quality": "high"
}
},
"content": {
"data": "iVBORw0KGgoAAAANSUhEUgAA...", // base64 encoded image
"thumbnail": "iVBORw0KGgoAAAANSUhEUgAA...", // 缩略图
"checksum": "sha256:1234567890abcdef..."
},
"annotations": [
{
"type": "processing_log",
"message": "应用风格转换滤镜",
"timestamp": "2024-12-19T12:29:45.000Z"
},
{
"type": "quality_assessment",
"score": 0.92,
"details": "图像质量评分:优秀"
}
]
}
}
class A2AMessageEncoder:
"""A2A Protocol消息编码器"""
def select_encoding(self, message: Dict, context: Dict) -> str:
"""智能选择最优编码格式"""
# 消息大小评估
estimated_size = self._estimate_message_size(message)
# 性能要求评估
performance_requirement = context.get("performance", "normal")
# 兼容性要求
compatibility_requirement = context.get("compatibility", "standard")
# 决策逻辑
if estimated_size > 1024 * 1024: # > 1MB
return "protobuf" if performance_requirement == "high" else "msgpack"
elif performance_requirement == "high":
return "msgpack"
elif compatibility_requirement == "maximum":
return "json"
elif message.get("header", {}).get("messageType") in ["HEARTBEAT", "HANDSHAKE"]:
return "json" # 控制消息优先可读性
else:
return "msgpack" # 默认高效格式
def encode_message(self, message: Dict, encoding: str = None) -> bytes:
"""编码A2A消息"""
if encoding is None:
encoding = self.select_encoding(message, {})
# 添加编码元信息
message["_meta"] = {
"encoding": encoding,
"version": "1.0",
"timestamp": datetime.utcnow().isoformat()
}
if encoding == "json":
return json.dumps(message, ensure_ascii=False).encode('utf-8')
elif encoding == "msgpack":
import msgpack
return msgpack.packb(message, use_bin_type=True)
elif encoding == "protobuf":
# 转换为protobuf并序列化
pb_message = self._dict_to_protobuf(message)
return pb_message.SerializeToString()
else:
raise ValueError(f"不支持的编码格式: {encoding}")
def decode_message(self, data: bytes, encoding: str = None) -> Dict:
"""解码A2A消息"""
if encoding is None:
# 尝试自动检测编码格式
encoding = self._detect_encoding(data)
if encoding == "json":
return json.loads(data.decode('utf-8'))
elif encoding == "msgpack":
import msgpack
return msgpack.unpackb(data, raw=False, strict_map_key=False)
elif encoding == "protobuf":
pb_message = self._bytes_to_protobuf(data)
return self._protobuf_to_dict(pb_message)
else:
raise ValueError(f"不支持的编码格式: {encoding}")
# 性能基准测试结果
ENCODING_BENCHMARKS = {
"json": {
"encode_speed": "1.0x",
"decode_speed": "1.0x",
"size_ratio": "1.0x",
"cpu_usage": "low"
},
"msgpack": {
"encode_speed": "3.2x",
"decode_speed": "2.8x",
"size_ratio": "0.7x",
"cpu_usage": "medium"
},
"protobuf": {
"encode_speed": "4.1x",
"decode_speed": "3.9x",
"size_ratio": "0.5x",
"cpu_usage": "high"
}
}
基于JSON Schema验证消息结构完整性和字段类型
验证消息内容的逻辑一致性和业务规则符合性
验证数字签名、权限检查和内容安全扫描
检查消息大小限制、处理复杂度和资源需求