身份认证与访问控制深度理论知识
身份认证与访问控制深度理论知识
本文档专注于IAM系统、令牌机制、网关认证鉴权的完整理论与实践
目录
IAM基础理论
1.1 核心概念辨析
graph TB
subgraph Identity["身份 (Identity)"]
User["用户<br/>• 人类用户<br/>• 服务账号<br/>• AI Agent"]
Attributes["身份属性<br/>• ID: user_123<br/>• Email<br/>• Roles<br/>• Groups"]
end
subgraph Authentication["认证 (Authentication)"]
Verify["验证身份<br/>回答: 你是谁?"]
Methods["认证方式<br/>• 密码<br/>• MFA<br/>• 证书<br/>• 生物识别"]
end
subgraph Authorization["授权 (Authorization)"]
CheckPerm["检查权限<br/>回答: 你能做什么?"]
Models["权限模型<br/>• RBAC<br/>• ABAC<br/>• ReBAC"]
end
Identity --> Authentication
Authentication --> Authorization
style Identity fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style Authentication fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
style Authorization fill:#e8f5e9,stroke:#43a047,stroke-width:2px
关键区别:
| 概念 | 问题 | 验证内容 | 结果 |
|---|---|---|---|
| 身份 (Identity) | 你声称是谁? | 身份标识符 (ID, Email) | 身份声明 |
| 认证 (Authentication) | 你真的是这个人吗? | 凭证 (密码, Token, 证书) | 身份确认 |
| 授权 (Authorization) | 你可以访问这个资源吗? | 权限、角色、策略 | 访问决策 |
1.2 IAM系统的核心职责
graph LR
subgraph IAM["IAM系统"]
IM[身份管理<br/>Identity Management]
AM[访问管理<br/>Access Management]
PM[策略管理<br/>Policy Management]
TM[令牌管理<br/>Token Management]
end
IM --> |用户、组、角色| AM
PM --> |定义权限规则| AM
AM --> |颁发令牌| TM
External[外部请求] --> TM
TM --> |验证令牌| Gateway[API网关]
IAM的四大支柱:
-
身份管理 (Identity Management)
- 用户生命周期管理(创建、更新、删除)
- 身份属性存储(用户信息、元数据)
- 身份联合(SSO、LDAP、SAML)
-
访问管理 (Access Management)
- 认证流程(登录、MFA)
- 会话管理(Session、Cookie)
- 令牌颁发(JWT、OAuth Token)
-
策略管理 (Policy Management)
- 权限定义(RBAC角色、ABAC策略)
- 策略存储与分发
- 策略评估引擎
-
令牌管理 (Token Management)
- 令牌生成与签名
- 令牌验证与刷新
- 令牌撤销(黑名单)
身份令牌机制
2.1 令牌类型对比
| 令牌类型 | 特点 | 验证方式 | 适用场景 | 安全性 |
|---|---|---|---|---|
| JWT (自包含令牌) | 令牌本身包含用户信息 | 本地验证签名 | 无状态API、微服务 | 中(无法主动撤销) |
| Opaque Token (引用令牌) | 随机字符串,需查询获取信息 | 调用IAM验证 | 需要集中控制、可撤销 | 高(集中管理) |
| Session ID | 服务器端存储,客户端持有ID | 查询Session存储 | 单体应用、Web应用 | 高(服务器控制) |
2.2 JWT详解
JWT结构
JWT = Header.Payload.Signature
完整示例:
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzEyMyIsIm5hbWUiOiJBbGljZSIsInJvbGVzIjpbImFkbWluIl0sImlhdCI6MTcwNjQ1MTYwMCwiZXhwIjoxNzA2NDU1MjAwfQ.signature_here
Header (头部):
{
"alg": "RS256", // 签名算法: RSA + SHA256
"typ": "JWT", // 令牌类型
"kid": "key-2024-01" // 密钥ID (用于密钥轮换)
}
Payload (负载):
{
// 标准声明 (Standard Claims)
"sub": "user_123", // Subject: 用户ID
"iss": "https://iam.example.com", // Issuer: 颁发者
"aud": "api.example.com", // Audience: 目标受众
"exp": 1706455200, // Expiration: 过期时间 (Unix时间戳)
"iat": 1706451600, // Issued At: 颁发时间
"nbf": 1706451600, // Not Before: 生效时间
"jti": "jwt-uuid-456", // JWT ID: 唯一标识
// 自定义声明 (Custom Claims)
"name": "Alice",
"email": "alice@example.com",
"roles": ["admin", "editor"],
"permissions": ["read:users", "write:posts"],
"tenant_id": "org_789",
"agent_type": "ai_assistant" // AI Agent标识
}
Signature (签名):
// 签名算法
RSASHA256(
base64UrlEncode(header) + "." + base64UrlEncode(payload),
privateKey // 使用私钥签名
)
JWT签名算法对比
| 算法 | 类型 | 密钥 | 安全性 | 性能 | 适用场景 |
|---|---|---|---|---|---|
| HS256 | 对称 | 共享密钥 | 中 | 快 | 单体应用、内部服务 |
| RS256 | 非对称 | 公钥/私钥 | 高 | 慢 | 分布式系统、多方验证 |
| ES256 | 非对称 | 椭圆曲线 | 高 | 中 | 移动应用、IoT |
安全建议:
- 微服务网关: RS256 (网关只需公钥验证,IAM持有私钥签名)
- 内部服务间: HS256 (性能更好,密钥可共享)
- 移动端/前端: ES256 (签名短,性能平衡)
2.3 Opaque Token机制
sequenceDiagram
participant Client as 客户端
participant Gateway as API网关
participant IAM as IAM系统
participant Redis as Token存储
participant Service as 后端服务
Note over Client,IAM: 1. 令牌颁发阶段
Client->>IAM: 登录请求 (username + password)
IAM->>Redis: 存储令牌信息<br/>token_abc123 → {user_id, roles, ...}
IAM->>Client: 返回 Opaque Token: abc123xyz
Note over Client,Service: 2. 令牌验证阶段
Client->>Gateway: 请求 + Token: abc123xyz
Gateway->>IAM: 验证令牌 (Introspection Endpoint)
IAM->>Redis: 查询 token_abc123
Redis->>IAM: 返回用户信息
IAM->>Gateway: 返回验证结果 + 用户信息
Gateway->>Service: 转发请求 (附加用户上下文)
Service->>Gateway: 响应
Gateway->>Client: 响应
Opaque Token的优势:
- 可主动撤销: 删除Redis中的令牌即可立即失效
- 安全性高: 令牌本身不包含任何信息,泄露后无法直接读取
- 集中控制: IAM系统完全控制令牌生命周期
劣势:
- 性能开销: 每次请求都需要调用IAM验证
- 可用性依赖: IAM/Redis故障会影响所有服务
- 网络延迟: 增加额外的网络调用
2.4 JWT vs Opaque Token 选型
graph TB
Start{令牌选型}
Start --> Q1{需要主动撤销?}
Q1 -->|是| Opaque[Opaque Token]
Q1 -->|否| Q2{高并发场景?}
Q2 -->|是| JWT[JWT]
Q2 -->|否| Q3{多方验证?}
Q3 -->|是| JWT
Q3 -->|否| Q4{对性能敏感?}
Q4 -->|是| JWT
Q4 -->|否| Opaque
style JWT fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Opaque fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
推荐方案:
| 场景 | 推荐 | 理由 |
|---|---|---|
| 高并发API | JWT | 无需每次查询IAM,减少延迟 |
| 金融/安全敏感 | Opaque | 可主动撤销,集中控制 |
| 微服务网关 | JWT | 网关本地验证,降低IAM压力 |
| 内部管理系统 | Opaque | 需要实时权限更新 |
| AI Agent | 短期 Access Token + 可撤销会话/刷新机制 | 降低凭证泄露后的可用窗口,便于审计、轮换和最小权限控制 |
混合方案 (推荐):
用户登录 → IAM颁发 JWT (短期,5-60分钟) + Refresh Token (Opaque, 按风险配置)
- JWT: 日常API访问,网关本地验证;高风险场景缩短有效期并绑定 audience/scope
- Refresh Token: 通过IAM验证并采用 rotation、重用检测、撤销和设备/客户端绑定
- 撤销: 将JWT的jti加入黑名单 (Redis),或优先采用Opaque Token/Introspection集中控制
- 高价值客户端: 使用 mTLS 或 DPoP 等 sender-constrained token,避免 bearer token 被复制后可直接使用
网关认证鉴权架构
3.1 完整认证鉴权流程
sequenceDiagram
participant Client as 客户端
participant Gateway as API网关
participant IAM as IAM系统
participant AuthZ as 权限服务
participant Service as 后端服务
Note over Client,Service: 阶段1: 认证 (Authentication)
Client->>Gateway: 请求 + JWT Token
Note over Gateway: 1.1 Token格式验证
alt Token格式错误
Gateway->>Client: 401 Unauthorized
end
Note over Gateway: 1.2 签名验证
Gateway->>Gateway: 使用IAM公钥验证签名
alt 签名验证失败
Gateway->>Client: 401 Invalid Token
end
Note over Gateway: 1.3 过期时间检查
alt Token已过期
Gateway->>Client: 401 Token Expired
end
Note over Gateway: 1.4 黑名单检查
Gateway->>IAM: 检查jti是否在黑名单
alt 在黑名单中
Gateway->>Client: 401 Token Revoked
end
Note over Client,Service: 阶段2: 身份还原
Note over Gateway: 2.1 解析JWT Payload
Gateway->>Gateway: 提取用户信息:<br/>• sub (用户ID)<br/>• roles<br/>• permissions<br/>• tenant_id
Note over Gateway: 2.2 构建用户上下文
Gateway->>Gateway: 创建 UserContext 对象
Note over Client,Service: 阶段3: 授权 (Authorization)
Note over Gateway: 3.1 路径匹配
Gateway->>Gateway: 请求路径: POST /api/users
Note over Gateway: 3.2 权限检查
alt 简单RBAC (网关实现)
Gateway->>Gateway: 检查 roles 是否包含 "admin"
else 复杂策略 (调用权限服务)
Gateway->>AuthZ: 请求授权决策<br/>{user, resource, action}
AuthZ->>Gateway: 返回 Allow/Deny
end
alt 权限不足
Gateway->>Client: 403 Forbidden
end
Note over Client,Service: 阶段4: 请求转发
Gateway->>Service: 转发请求 + 已签名用户上下文<br/>Headers:<br/>• X-User-ID: user_123<br/>• X-User-Roles: admin,editor<br/>• X-Tenant-ID: org_789<br/>• X-Context-Signature: ...
Note over Service: 仅信任来自网关可信边界的已清洗/已签名上下文
Service->>Gateway: 响应
Gateway->>Client: 响应
3.2 网关身份还原实现
方案一: 从JWT中提取(推荐)
# 网关实现示例
import jwt
from typing import Dict, List
class JWTAuthenticator:
def __init__(self, public_key: str):
self.public_key = public_key
def verify_and_extract(self, token: str) -> Dict:
"""验证JWT并提取用户信息"""
try:
# 1. 验证签名和过期时间
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
audience="api.example.com",
issuer="https://iam.example.com"
)
# 2. 提取用户信息
user_context = {
"user_id": payload.get("sub"),
"email": payload.get("email"),
"name": payload.get("name"),
"roles": payload.get("roles", []),
"permissions": payload.get("permissions", []),
"tenant_id": payload.get("tenant_id"),
"agent_type": payload.get("agent_type"), # AI Agent标识
"exp": payload.get("exp")
}
# 3. 黑名单检查(可选)
jti = payload.get("jti")
if jti and self.is_blacklisted(jti):
raise Exception("Token has been revoked")
return user_context
except jwt.ExpiredSignatureError:
raise Exception("Token expired")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {str(e)}")
def is_blacklisted(self, jti: str) -> bool:
"""检查JWT ID是否在黑名单中"""
# 查询Redis黑名单
return redis_client.sismember("jwt:blacklist", jti)
方案二: 调用IAM Introspection端点
class OpaqueTokenAuthenticator:
def __init__(self, iam_url: str):
self.iam_url = iam_url
def verify_and_extract(self, token: str) -> Dict:
"""调用IAM验证Opaque Token"""
# RFC 7662: OAuth Token Introspection
response = requests.post(
f"{self.iam_url}/oauth/introspect",
data={"token": token},
auth=(gateway_client_id, gateway_client_secret)
)
result = response.json()
if not result.get("active"):
raise Exception("Token is not active")
return {
"user_id": result.get("sub"),
"roles": result.get("roles", []),
"permissions": result.get("permissions", []),
"tenant_id": result.get("tenant_id"),
"exp": result.get("exp")
}
3.3 用户上下文传递
网关验证完成后,需要将用户信息传递给后端服务:
方式一: HTTP Header传递(仅限可信网关边界内)
Header 传递用户上下文不是天然安全机制。只有在网关与后端之间存在明确可信边界时才适用,并且必须满足:
- 网关入口先清洗/删除所有客户端提交的
X-User-*、X-Tenant-*、X-Agent-*等内部上下文 Header,防止伪造。 - 网关到后端使用 mTLS、服务网格身份或等价网络隔离,后端只接受来自可信网关/sidecar 的流量。
- 用户上下文 Header 需要签名或封装为内部 JWT,包含
iss、aud、iat、exp、jti,后端验证签名和短过期时间。 - Header 中只传递最小必要字段,不传身份证号、手机号、完整邮箱、原始 Token 等敏感数据。
- 后端对高风险操作仍应调用策略服务或验证内部令牌,不能把明文 Header 当作最终授权事实。
# 网关添加自定义Header
GET /api/users HTTP/1.1
Host: user-service
Authorization: Bearer <original_jwt>
X-User-ID: user_123
X-User-Email: alice@example.com
X-User-Roles: admin,editor
X-User-Permissions: read:users,write:posts
X-Tenant-ID: org_789
X-Agent-Type: ai_assistant
X-Context-Issuer: api-gateway
X-Context-Expires-At: 1706455200
X-Context-Signature: base64url(hmac_or_signature)
后端服务获取用户上下文:
from flask import request
def get_current_user():
# 生产环境需先验证请求来源、上下文签名、过期时间和audience;此处只展示字段读取。
return {
"user_id": request.headers.get("X-User-ID"),
"roles": request.headers.get("X-User-Roles", "").split(","),
"permissions": request.headers.get("X-User-Permissions", "").split(","),
"tenant_id": request.headers.get("X-Tenant-ID"),
"agent_type": request.headers.get("X-Agent-Type")
}
# 在业务逻辑中使用
@app.route("/api/users")
def list_users():
user = get_current_user()
if "admin" not in user["roles"]:
return {"error": "Forbidden"}, 403
# ...业务逻辑
方式二: 重新签名JWT传递
网关验证原始JWT后,生成内部JWT传递给服务:
def create_internal_jwt(user_context: Dict) -> str:
"""创建内部服务使用的JWT"""
payload = {
"sub": user_context["user_id"],
"roles": user_context["roles"],
"iss": "api-gateway",
"aud": "internal-services",
"exp": time.time() + 300 # 5分钟短期有效
}
return jwt.encode(payload, internal_secret, algorithm="HS256")
3.4 网关配置示例
Kong网关配置
services:
- name: user-service
url: http://user-service:8080
routes:
- name: user-routes
paths:
- /api/users
plugins:
# JWT验证插件
- name: jwt
config:
key_claim_name: kid
secret_is_base64: false
uri_param_names:
- jwt
cookie_names:
- jwt
claims_to_verify:
- exp
# 权限控制插件
- name: acl
config:
whitelist:
- admin
- editor
# 请求转换插件 (添加用户上下文)
- name: request-transformer
config:
add:
headers:
- X-User-ID:$(jwt.sub)
- X-User-Roles:$(jwt.roles)
- X-Tenant-ID:$(jwt.tenant_id)
Nginx + Lua脚本
-- nginx.conf
location /api/ {
access_by_lua_block {
local jwt = require "resty.jwt"
local cjson = require "cjson"
-- 1. 提取Token
local auth_header = ngx.var.http_Authorization
local token = string.match(auth_header, "Bearer%s+(.+)")
if not token then
ngx.status = 401
ngx.say(cjson.encode({error = "Missing token"}))
return ngx.exit(401)
end
-- 2. 验证JWT
local jwt_obj = jwt:verify(public_key, token)
if not jwt_obj.verified then
ngx.status = 401
ngx.say(cjson.encode({error = "Invalid token"}))
return ngx.exit(401)
end
-- 3. 提取用户信息
local payload = jwt_obj.payload
ngx.req.set_header("X-User-ID", payload.sub)
ngx.req.set_header("X-User-Roles", table.concat(payload.roles, ","))
ngx.req.set_header("X-Tenant-ID", payload.tenant_id)
}
proxy_pass http://backend-service;
}
权限控制模型
4.1 RBAC (基于角色的访问控制)
graph LR
User[用户: Alice] --> UR[用户-角色] --> Role[角色: Admin]
Role --> RP[角色-权限] --> Perm1[权限: read:users]
Role --> RP --> Perm2[权限: write:users]
Role --> RP --> Perm3[权限: delete:users]
style User fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style Role fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
style Perm1 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Perm2 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Perm3 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
网关实现RBAC:
class RBACAuthorizer:
def __init__(self):
# 权限规则配置
self.rules = {
"/api/users": {
"GET": ["admin", "viewer"],
"POST": ["admin"],
"PUT": ["admin"],
"DELETE": ["admin"]
},
"/api/posts": {
"GET": ["admin", "editor", "viewer"],
"POST": ["admin", "editor"],
"PUT": ["admin", "editor"],
"DELETE": ["admin"]
}
}
def authorize(self, user_roles: List[str], path: str, method: str) -> bool:
"""检查用户是否有权限访问资源"""
required_roles = self.rules.get(path, {}).get(method, [])
if not required_roles:
return True # 无限制
# 检查用户是否拥有任一所需角色
return any(role in required_roles for role in user_roles)
def check(self, user_context: Dict, request_path: str, request_method: str):
user_roles = user_context.get("roles", [])
if not self.authorize(user_roles, request_path, request_method):
raise PermissionDenied(
f"User with roles {user_roles} cannot {request_method} {request_path}"
)
4.2 ABAC (基于属性的访问控制)
更灵活的权限模型,基于用户属性、资源属性、环境属性进行决策。
class ABACAuthorizer:
def authorize(self, user_context: Dict, resource: Dict, action: str, environment: Dict) -> bool:
"""基于属性的授权决策"""
# 示例策略: 只有同租户的管理员才能删除资源
if action == "delete":
return (
"admin" in user_context.get("roles", []) and
user_context.get("tenant_id") == resource.get("tenant_id")
)
# 示例策略: 工作时间才能访问财务数据
if resource.get("category") == "financial":
current_hour = environment.get("hour")
return 9 <= current_hour <= 18
# 示例策略: 只能访问自己部门的数据
if action == "read":
return user_context.get("department") == resource.get("department")
return False
网关集成ABAC(调用OPA):
import requests
class OPAAuthorizer:
def __init__(self, opa_url: str):
self.opa_url = opa_url
def authorize(self, user_context: Dict, resource_path: str, action: str) -> bool:
"""调用OPA进行授权决策"""
policy_input = {
"input": {
"user": user_context,
"resource": {"path": resource_path},
"action": action,
"environment": {
"time": datetime.now().isoformat(),
"ip": request.remote_addr
}
}
}
response = requests.post(
f"{self.opa_url}/v1/data/authz/allow",
json=policy_input
)
result = response.json()
return result.get("result", False)
OPA策略示例 (Rego语言):
package authz
# 默认拒绝
default allow = false
# 管理员可以访问所有资源
allow {
input.user.roles[_] == "admin"
}
# 用户只能访问自己租户的资源
allow {
input.user.tenant_id == input.resource.tenant_id
input.action == "read"
}
# AI Agent只能读取,不能修改
allow {
input.user.agent_type == "ai_assistant"
input.action == "read"
}
AI Agent身份管理
5.1 AI Agent的身份特点
AI Agent与普通用户的区别:
| 特性 | 普通用户 | AI Agent |
|---|---|---|
| 身份类型 | 人类用户 | 服务账号 |
| 认证方式 | 密码 + MFA / 无密码 | Client Credentials、工作负载身份、mTLS/DPoP、Token Exchange |
| 令牌有效期 | 短期 (5-60分钟) | 短期 (通常分钟级到1小时,按风险收敛) |
| 权限范围 | 基于角色 | 基于用途、scope、资源和委托边界限定 |
| 审计要求 | 记录操作人 | 记录代理用户 (on-behalf-of) |
5.2 AI Agent的令牌设计
AI Agent 不应依赖 7 天到永久有效的 bearer token。更稳妥的设计是:短期 Access Token、可撤销会话、Refresh Token Rotation(如确需刷新)、密钥/证书轮换、最小权限 scope、sender-constrained token,以及完整审计。
{
"sub": "agent_ai_assistant_123",
"agent_type": "ai_assistant",
"agent_name": "Customer Support Bot",
"on_behalf_of": "user_789", // 代表哪个用户
"delegated_by": "user_789", // 由谁授权
"roles": ["ai_agent"],
"permissions": [
"read:conversations",
"write:responses",
"read:knowledge_base"
],
"scope": "assistant:basic", // OAuth scope限定
"jti": "token-instance-id",
"constraints": {
"rate_limit": 1000, // 速率限制
"allowed_resources": ["conversations", "knowledge_base"],
"forbidden_actions": ["delete", "admin"]
},
"cnf": {
"jkt": "dpop_public_key_thumbprint"
},
"iss": "https://iam.example.com",
"aud": "api.example.com",
"exp": 1706282400, // 短期有效,例如15-60分钟,按风险配置
"iat": 1706278800
}
令牌控制要点:
- Access Token 默认短期有效;高风险工具调用可使用更短 TTL 或一次性令牌
- Refresh Token 如确需使用,应采用 rotation、重用检测、撤销列表和客户端绑定
- 优先使用 mTLS 证书绑定或 DPoP 将令牌约束到持有私钥的客户端实例
- 每个 Agent 拥有独立身份、独立密钥和独立审计主体,禁止多个 Agent 共用静态 API Key
- Token 必须包含
aud、scope、jti、exp、委托关系和资源边界,权限变更应可即时撤销或快速收敛
5.3 AI Agent的授权模式
模式一: Client Credentials Flow
适用于不代表具体用户、以自身服务身份访问资源的AI Agent。客户端凭证本身必须可轮换,令牌应短期有效;高价值场景建议叠加 mTLS/DPoP。
sequenceDiagram
participant Agent as AI Agent
participant IAM as IAM系统
participant API as API网关
Agent->>IAM: 请求令牌<br/>client_assertion / mTLS / DPoP
IAM->>IAM: 验证客户端凭证与绑定密钥
IAM->>Agent: 返回短期 Access Token
Agent->>API: 请求 + Access Token
API->>API: 验证JWT<br/>校验scope/aud/cnf
API->>Agent: 返回数据
模式二: Token Exchange (代表用户)
适用于需要代表用户操作的AI Agent。交换出的 delegated token 应比原始用户令牌权限更窄、有效期更短,并保留 on_behalf_of、act 或等价代理关系用于审计。
sequenceDiagram
participant User as 用户
participant Agent as AI Agent
participant IAM as IAM系统
participant API as API网关
User->>IAM: 登录获取用户Token
IAM->>User: 返回 user_token
User->>Agent: 授权AI Agent<br/>user_token
Agent->>IAM: Token Exchange<br/>user_token + agent_credentials
IAM->>Agent: 返回 delegated_token<br/>(包含 on_behalf_of)
Agent->>API: 请求 + delegated_token
API->>API: 验证JWT<br/>识别是Agent代表用户
API->>Agent: 返回数据
Token Exchange实现 (RFC 8693):
def token_exchange(user_token: str, agent_credentials: Dict) -> str:
"""交换令牌:用户令牌 → AI Agent代理令牌"""
response = requests.post(
"https://iam.example.com/oauth/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token": user_token,
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"client_id": agent_credentials["client_id"],
"client_secret": agent_credentials["client_secret"],
"scope": "assistant:basic"
}
)
return response.json()["access_token"]
5.4 AI Agent权限控制策略
class AIAgentAuthorizer:
def authorize(self, user_context: Dict, action: str, resource: str) -> bool:
"""AI Agent特殊授权逻辑"""
# 1. 识别AI Agent
if user_context.get("agent_type") == "ai_assistant":
# 2. 检查约束条件
constraints = user_context.get("constraints", {})
# 禁止的操作
if action in constraints.get("forbidden_actions", []):
return False
# 限定的资源
allowed_resources = constraints.get("allowed_resources", [])
if allowed_resources and resource not in allowed_resources:
return False
# 3. 检查代理权限
if "on_behalf_of" in user_context:
# 验证代理用户是否有权限
original_user_id = user_context["on_behalf_of"]
return self.check_user_permission(original_user_id, action, resource)
# 普通用户权限检查
return self.check_user_permission(user_context["user_id"], action, resource)
典型集成场景解决方案
6.1 网关身份还原与权限控制方案
场景: 企业 IAM 系统给用户和 AI Agent 颁发身份令牌,API 网关需要基于令牌还原身份上下文,并进行权限匹配控制。
完整解决方案架构
graph TB
subgraph Client["客户端"]
User[用户]
Agent[AI Agent]
end
subgraph IAM["客户IAM系统"]
IssueToken[令牌颁发]
PublicKey[公钥分发]
end
subgraph Gateway["API网关"]
Extract[1. 提取Token]
Verify[2. 验证签名]
Parse[3. 解析Payload]
Restore[4. 还原身份]
Authorize[5. 权限检查]
end
subgraph Backend["后端服务"]
Service[业务服务]
end
User --> |JWT Token| Extract
Agent --> |JWT Token| Extract
IAM --> |颁发| User
IAM --> |颁发| Agent
IAM -.->|提供公钥| Verify
Extract --> Verify
Verify --> Parse
Parse --> Restore
Restore --> Authorize
Authorize --> |通过| Service
Authorize --> |拒绝| User
详细实现步骤
步骤1: 获取IAM公钥
import requests
import json
class IAMPublicKeyProvider:
def __init__(self, iam_jwks_url: str):
self.jwks_url = iam_jwks_url
self.keys_cache = {}
def get_public_key(self, kid: str) -> str:
"""获取指定kid的公钥"""
if kid in self.keys_cache:
return self.keys_cache[kid]
# 从IAM的JWKS端点获取公钥
response = requests.get(self.jwks_url)
jwks = response.json()
for key in jwks["keys"]:
if key["kid"] == kid:
# 转换JWK为PEM格式
public_key = self.jwk_to_pem(key)
self.keys_cache[kid] = public_key
return public_key
raise Exception(f"Public key not found for kid: {kid}")
def jwk_to_pem(self, jwk: Dict) -> str:
"""将JWK转换为PEM格式"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import base64
# 解析JWK
n = int.from_bytes(base64.urlsafe_b64decode(jwk["n"] + "=="), "big")
e = int.from_bytes(base64.urlsafe_b64decode(jwk["e"] + "=="), "big")
# 构造公钥
public_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend())
# 转换为PEM
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode("utf-8")
步骤2: 网关验证和身份还原
import jwt
from typing import Dict, Optional
import redis
class GatewayAuthenticator:
def __init__(self,
public_key_provider: IAMPublicKeyProvider,
redis_client: redis.Redis,
iam_issuer: str,
api_audience: str):
self.public_key_provider = public_key_provider
self.redis = redis_client
self.iam_issuer = iam_issuer
self.api_audience = api_audience
def authenticate_and_restore_identity(self, authorization_header: str) -> Dict:
"""完整的认证和身份还原流程"""
# 1. 提取Token
token = self.extract_token(authorization_header)
if not token:
raise AuthenticationError("Missing or invalid Authorization header")
# 2. 解码Header获取kid
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
if not kid:
raise AuthenticationError("Missing kid in JWT header")
# 3. 获取公钥
public_key = self.public_key_provider.get_public_key(kid)
# 4. 验证JWT签名和claims
try:
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=self.api_audience,
issuer=self.iam_issuer,
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_aud": True,
"verify_iss": True
}
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError as e:
raise AuthenticationError(f"Invalid token: {str(e)}")
# 5. 黑名单检查(可选)
jti = payload.get("jti")
if jti and self.is_token_revoked(jti):
raise AuthenticationError("Token has been revoked")
# 6. 还原用户身份
user_identity = self.restore_identity(payload)
return user_identity
def extract_token(self, authorization_header: str) -> Optional[str]:
"""从Authorization header中提取token"""
if not authorization_header:
return None
parts = authorization_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
return None
return parts[1]
def is_token_revoked(self, jti: str) -> bool:
"""检查token是否被撤销"""
return self.redis.sismember("jwt:blacklist", jti)
def restore_identity(self, payload: Dict) -> Dict:
"""从JWT payload还原完整用户身份"""
# 基础身份信息
identity = {
"user_id": payload.get("sub"),
"email": payload.get("email"),
"name": payload.get("name"),
"tenant_id": payload.get("tenant_id"),
# 权限相关
"roles": payload.get("roles", []),
"permissions": payload.get("permissions", []),
"groups": payload.get("groups", []),
# Token元数据
"token_id": payload.get("jti"),
"issued_at": payload.get("iat"),
"expires_at": payload.get("exp"),
# 识别用户类型
"is_human": payload.get("agent_type") is None,
"is_agent": payload.get("agent_type") is not None,
}
# AI Agent特殊字段
if identity["is_agent"]:
identity.update({
"agent_type": payload.get("agent_type"),
"agent_name": payload.get("agent_name"),
"on_behalf_of": payload.get("on_behalf_of"), # 代理的用户
"delegated_by": payload.get("delegated_by"),
"constraints": payload.get("constraints", {})
})
return identity
步骤3: 权限匹配控制
class GatewayAuthorizer:
def __init__(self, policy_config: Dict):
self.policies = policy_config
def authorize(self, user_identity: Dict, request_path: str, request_method: str) -> bool:
"""基于用户身份进行权限检查"""
# 1. 查找匹配的策略
policy = self.match_policy(request_path, request_method)
if not policy:
return True # 无策略限制,默认允许
# 2. 检查是否是AI Agent
if user_identity.get("is_agent"):
return self.authorize_agent(user_identity, policy)
# 3. 检查普通用户权限
return self.authorize_user(user_identity, policy)
def match_policy(self, path: str, method: str) -> Optional[Dict]:
"""匹配路径的权限策略"""
for pattern, policy in self.policies.items():
if self.path_matches(path, pattern):
return policy.get(method)
return None
def authorize_user(self, user_identity: Dict, policy: Dict) -> bool:
"""普通用户权限检查"""
required_roles = policy.get("required_roles", [])
required_permissions = policy.get("required_permissions", [])
user_roles = set(user_identity.get("roles", []))
user_permissions = set(user_identity.get("permissions", []))
# 检查角色
if required_roles:
if not any(role in user_roles for role in required_roles):
return False
# 检查权限
if required_permissions:
if not all(perm in user_permissions for perm in required_permissions):
return False
return True
def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
"""AI Agent权限检查"""
constraints = user_identity.get("constraints", {})
# 1. 检查禁止的操作
forbidden_actions = constraints.get("forbidden_actions", [])
if policy.get("action") in forbidden_actions:
return False
# 2. 如果Agent代理用户,检查被代理用户的权限
if user_identity.get("on_behalf_of"):
# 这里需要查询原始用户的权限
original_user = self.get_user_by_id(user_identity["on_behalf_of"])
return self.authorize_user(original_user, policy)
# 3. Agent自身的权限检查
agent_permissions = set(user_identity.get("permissions", []))
required_permissions = set(policy.get("required_permissions", []))
return required_permissions.issubset(agent_permissions)
步骤4: 完整的网关中间件
from flask import Flask, request, jsonify, g
app = Flask(__name__)
# 初始化组件
public_key_provider = IAMPublicKeyProvider("https://customer-iam.com/.well-known/jwks.json")
redis_client = redis.Redis(host="redis", port=6379)
authenticator = GatewayAuthenticator(
public_key_provider=public_key_provider,
redis_client=redis_client,
iam_issuer="https://customer-iam.com",
api_audience="api.yourservice.com"
)
authorizer = GatewayAuthorizer(policy_config={
"/api/users": {
"GET": {"required_roles": ["admin", "viewer"]},
"POST": {"required_roles": ["admin"]},
"DELETE": {"required_roles": ["admin"], "action": "delete"}
},
"/api/posts": {
"GET": {"required_permissions": ["read:posts"]},
"POST": {"required_permissions": ["write:posts"]},
}
})
@app.before_request
def authenticate_request():
"""请求前置:认证和鉴权"""
# 1. 认证:验证token并还原身份
try:
auth_header = request.headers.get("Authorization")
user_identity = authenticator.authenticate_and_restore_identity(auth_header)
g.user = user_identity # 存储到Flask的g对象
except AuthenticationError as e:
return jsonify({"error": str(e)}), 401
# 2. 鉴权:检查权限
if not authorizer.authorize(user_identity, request.path, request.method):
return jsonify({
"error": "Forbidden",
"message": f"User {user_identity['user_id']} does not have permission to {request.method} {request.path}"
}), 403
@app.after_request
def add_user_context_headers(response):
"""向后端服务传递用户上下文"""
if hasattr(g, "user"):
user = g.user
# 实际网关转发时应先清洗客户端传入的内部Header,再添加已签名、短时效的上下文。
headers_to_add = {
"X-User-ID": user["user_id"],
"X-User-Email": user.get("email", ""),
"X-User-Roles": ",".join(user.get("roles", [])),
"X-Tenant-ID": user.get("tenant_id", ""),
"X-Is-Agent": str(user.get("is_agent", False)),
}
if user.get("is_agent"):
headers_to_add["X-Agent-Type"] = user.get("agent_type", "")
headers_to_add["X-On-Behalf-Of"] = user.get("on_behalf_of", "")
# 签名值应由网关用KMS/HMAC/私钥生成,后端按同一协议验证。
headers_to_add["X-Context-Issuer"] = "api-gateway"
headers_to_add["X-Context-Expires-At"] = "1706455200"
headers_to_add["X-Context-Signature"] = "calculated_signature"
for key, value in headers_to_add.items():
response.headers[key] = value
return response
# 业务路由
@app.route("/api/users", methods=["GET"])
def list_users():
user = g.user
# 业务逻辑...
return jsonify({
"message": f"Hello {user['name']}",
"users": [...]
})
6.2 关键配置清单
IAM系统需要提供的信息
-
JWKS端点:
https://customer-iam.com/.well-known/jwks.json{ "keys": [ { "kty": "RSA", "kid": "key-2024-01", "use": "sig", "alg": "RS256", "n": "base64_encoded_modulus", "e": "AQAB" } ] } -
JWT格式约定:
- Header必须包含
kid字段 - Payload必须包含
sub,iss,aud,exp,iat - 自定义claims的字段名约定(roles, permissions等)
- Header必须包含
-
Token撤销机制 (可选):
- 提供Introspection端点:
POST /oauth/introspect - 或提供黑名单同步机制
- 提供Introspection端点:
网关配置项
# gateway-config.yaml
authentication:
iam:
issuer: "https://customer-iam.com"
jwks_url: "https://customer-iam.com/.well-known/jwks.json"
audience: "api.yourservice.com"
token:
blacklist_enabled: true
blacklist_redis_key: "jwt:blacklist"
cache_public_keys: true
cache_ttl: 3600 # 1小时
authorization:
mode: "rbac" # 或 "abac", "opa"
# RBAC配置
rbac:
policies:
- path: "/api/users"
methods:
GET: ["admin", "viewer"]
POST: ["admin"]
DELETE: ["admin"]
- path: "/api/posts"
methods:
GET: ["admin", "editor", "viewer"]
POST: ["admin", "editor"]
# AI Agent特殊规则
agent:
forbidden_actions: ["delete", "admin"]
enforce_delegation: true # 要求Agent必须代理用户
user_context:
headers:
user_id: "X-User-ID"
email: "X-User-Email"
roles: "X-User-Roles"
tenant_id: "X-Tenant-ID"
is_agent: "X-Is-Agent"
agent_type: "X-Agent-Type"
6.3 测试验证
测试用例1: 普通用户请求
# 1. 获取用户Token
USER_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=password" \
-d "username=alice@example.com" \
-d "password=secret" \
| jq -r '.access_token')
# 2. 请求API
curl -H "Authorization: Bearer $USER_TOKEN" \
https://api.yourservice.com/api/users
# 3. 检查后端收到的Header
# X-User-ID: user_123
# X-User-Roles: admin
# X-Is-Agent: False
测试用例2: AI Agent请求
# 1. 获取Agent Token
AGENT_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=client_credentials" \
-d "client_id=agent_ai_assistant" \
-d "client_secret=agent_secret" \
| jq -r '.access_token')
# 2. 请求API
curl -H "Authorization: Bearer $AGENT_TOKEN" \
https://api.yourservice.com/api/posts
# 3. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-Agent-Type: ai_assistant
测试用例3: Agent代理用户请求
# 1. 用户授权Agent
USER_TOKEN="user_token_here"
# 2. Agent交换令牌
DELEGATED_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=urn:ietf:params:oauth:grant-type:token-exchange" \
-d "subject_token=$USER_TOKEN" \
-d "client_id=agent_ai_assistant" \
-d "client_secret=agent_secret" \
| jq -r '.access_token')
# 3. Agent代表用户请求
curl -H "Authorization: Bearer $DELEGATED_TOKEN" \
https://api.yourservice.com/api/users
# 4. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-On-Behalf-Of: user_123 ← 代理的用户
6.4 常见问题解决
问题1: 公钥获取失败
原因: IAM的JWKS端点不可达或格式错误
解决:
# 添加重试和降级机制
from tenacity import retry, stop_after_attempt, wait_exponential
class IAMPublicKeyProvider:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def get_public_key(self, kid: str) -> str:
# ... 获取公钥逻辑
pass
def get_cached_or_default_key(self, kid: str) -> str:
"""降级方案:使用缓存的公钥"""
try:
return self.get_public_key(kid)
except Exception as e:
logger.error(f"Failed to get public key: {e}")
# 返回缓存的公钥
cached_key = self.redis.get(f"public_key:{kid}")
if cached_key:
return cached_key.decode()
raise
问题2: Token过期但用户仍在使用
原因: Token过期时间太短,用户体验不好
解决: 实现Refresh Token机制
@app.errorhandler(401)
def handle_expired_token(error):
if "Token has expired" in str(error):
return jsonify({
"error": "token_expired",
"message": "Please refresh your token",
"refresh_endpoint": "/oauth/token",
"hint": "Use refresh_token grant"
}), 401
return jsonify({"error": str(error)}), 401
# 客户端自动刷新
def request_with_auto_refresh(url, access_token, refresh_token):
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
if response.status_code == 401 and response.json().get("error") == "token_expired":
# 自动刷新token
new_tokens = refresh_access_token(refresh_token)
# 重试请求
return requests.get(url, headers={"Authorization": f"Bearer {new_tokens['access_token']}"})
return response
问题3: AI Agent权限过大
原因: Agent使用了用户的完整权限
解决: 使用Scope限定Agent权限
def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
# 检查Token的scope
token_scope = user_identity.get("scope", "")
scopes = set(token_scope.split())
# Agent只有基础scope,不能执行管理操作
if "admin" in policy.get("required_roles", []):
return "admin:full" in scopes
# 其他操作需要对应的scope
action = policy.get("action")
required_scope = f"{action}:{resource_type}"
return required_scope in scopes
总结
核心要点
-
IAM职责: 颁发令牌、管理身份、定义策略
-
JWT优势: 自包含、无状态、适合分布式
-
网关核心:
- 验证令牌签名
- 还原用户身份
- 执行权限检查
- 传递用户上下文
-
AI Agent特殊处理:
- 使用Client Credentials或Token Exchange
- 限定权限scope
- 记录代理关系(on-behalf-of)
最佳实践
| 场景 | 推荐方案 |
|---|---|
| 用户令牌 | JWT (RS256/ES256), 短期(5-60分钟) + 可撤销Refresh Token |
| AI Agent令牌 | 短期JWT或Opaque Token,限定scope/audience/resource,必要时mTLS/DPoP发送方约束 |
| 网关验证 | 本地验证JWT签名 + 黑名单检查 |
| 权限模型 | 简单场景用RBAC, 复杂场景用ABAC(OPA) |
| 用户上下文传递 | 网关清洗后的签名Header或内部JWT,后端验证来源和签名 |
安全建议
- 始终使用HTTPS传输Token
- 使用RS256等非对称算法签名JWT
- 设置合理的Token过期时间
- 实现Token黑名单机制
- 定期轮换签名密钥(kid)
- 记录完整的审计日志
- AI Agent使用最小权限原则
- 避免长期或永久Agent bearer token;采用短期、可撤销、可轮换、发送方约束的凭证
- 客户端传入的用户上下文Header必须在网关入口清洗,禁止后端直接信任外部Header
主流IAM实现方案深度分析
7.1 云厂商IAM方案对比
国际云厂商
AWS IAM (Amazon Identity and Access Management)
架构特点:
graph TB
subgraph AWS_IAM["AWS IAM架构"]
User[IAM User]
Role[IAM Role]
Policy[Policy Document]
STS[STS临时凭证]
User --> Policy
Role --> Policy
Role --> STS
end
subgraph Resources["AWS资源"]
S3[S3 Bucket]
EC2[EC2 Instance]
Lambda[Lambda Function]
end
User --> |访问| Resources
STS --> |临时凭证| Resources
style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px
核心机制:
- Policy结构 (JSON策略文档):
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::my-bucket/*",
"Condition": {
"IpAddress": {
"aws:SourceIp": "203.0.113.0/24"
}
}
}
]
}
特色功能:
- STS (Security Token Service): 临时凭证,支持跨账号访问
- AssumeRole: 角色扮演机制,适合服务间调用
- Resource-based Policy: 资源级策略,细粒度控制
- Conditions: 基于IP、时间、MFA等条件的策略
Token机制:
- 使用AWS Signature V4签名算法
- Access Key + Secret Key生成签名
- STS临时凭证: Access Key + Secret Key + Session Token (有效期15分钟~12小时)
适用场景:
- AWS生态内的资源访问控制
- 跨账号资源共享
- 服务间的安全调用
局限性:
- 仅限AWS资源,无法管理外部应用
- 学习曲线陡峭,策略语法复杂
- 无内置UI管理界面
Azure AD / Microsoft Entra ID
架构特点:
graph LR
subgraph Azure_AD["Azure AD"]
User[用户]
App[应用注册]
SP[Service Principal]
ManagedID[托管标识]
end
subgraph Protocols["认证协议"]
OAuth[OAuth 2.0]
OIDC[OpenID Connect]
SAML[SAML 2.0]
end
User --> OAuth
App --> OAuth
SP --> OAuth
ManagedID --> OAuth
OAuth --> Resources[Azure资源]
OIDC --> Resources
SAML --> Resources
style Azure_AD fill:#0078d4,stroke:#ffffff,stroke-width:2px
核心机制:
- 应用注册 (Application Registration):
{
"appId": "00000000-0000-0000-0000-000000000000",
"appRoles": [
{
"id": "role-id",
"displayName": "Admin",
"value": "Admin",
"allowedMemberTypes": ["User", "Application"]
}
],
"requiredResourceAccess": [
{
"resourceAppId": "00000003-0000-0000-c000-000000000000",
"resourceAccess": [
{
"id": "e1fe6dd8-ba31-4d61-89e7-88639da4683d",
"type": "Scope"
}
]
}
]
}
- Managed Identity (托管标识):
- System-assigned: 与资源生命周期绑定
- User-assigned: 可复用的独立身份
Token机制:
- 使用标准JWT (RS256签名)
- Access Token: 访问Azure资源
- ID Token: 用户身份信息
- Refresh Token: 刷新访问令牌
Token示例:
{
"aud": "https://graph.microsoft.com",
"iss": "https://sts.windows.net/{tenant-id}/",
"iat": 1706451600,
"nbf": 1706451600,
"exp": 1706455200,
"sub": "user-object-id",
"oid": "user-object-id",
"roles": ["Directory.Read.All"],
"scp": "User.Read Mail.Send",
"tid": "tenant-id"
}
特色功能:
- Conditional Access: 基于风险、设备、位置的条件访问
- Privileged Identity Management (PIM): 即时权限提升
- Identity Protection: 风险检测和自动响应
- B2B/B2C: 支持外部用户和客户身份管理
适用场景:
- 企业级身份管理
- Microsoft 365生态集成
- 混合云环境(Azure + On-Premises)
优势:
- 完整的企业身份解决方案
- 强大的条件访问和风险控制
- 与Windows、Office深度集成
Google Cloud IAM
架构特点:
graph TB
subgraph GCP_IAM["Google Cloud IAM"]
Member[成员<br/>User/SA/Group]
Role[角色<br/>Primitive/Predefined/Custom]
Policy[IAM Policy]
Member --> Policy
Role --> Policy
end
subgraph Resources["资源层级"]
Org[Organization]
Folder[Folder]
Project[Project]
Resource[Resource]
Org --> Folder
Folder --> Project
Project --> Resource
end
Policy --> Resources
style GCP_IAM fill:#4285f4,stroke:#ffffff,stroke-width:2px
核心机制:
- IAM Policy Binding:
{
"bindings": [
{
"role": "roles/storage.objectViewer",
"members": [
"user:alice@example.com",
"serviceAccount:my-sa@project.iam.gserviceaccount.com"
],
"condition": {
"title": "Expires in 2024",
"expression": "request.time < timestamp('2024-12-31T23:59:59Z')"
}
}
]
}
- Service Account:
- 应用身份,非人类用户
- 可以模拟(Impersonate)其他服务账号
- 支持短期密钥(Short-lived credentials)
Token机制:
- 使用标准JWT
- 通过Metadata Server自动获取(GCE/GKE)
- 支持Workload Identity Federation(联合身份)
特色功能:
- Hierarchical Inheritance: 策略继承(Organization → Folder → Project → Resource)
- IAM Conditions: 基于CEL(Common Expression Language)的条件策略
- Workload Identity: 将Kubernetes SA映射到GCP SA,无需密钥
- Policy Analyzer: 分析和审计权限
适用场景:
- GCP资源访问控制
- 多项目/多组织管理
- Kubernetes workload身份管理
优势:
- 简洁的权限模型(Member-Role-Resource)
- 强大的条件策略(CEL表达式)
- Workload Identity消除密钥管理
国内云厂商
阿里云RAM (Resource Access Management)
架构特点:
graph TB
subgraph Alibaba_RAM["阿里云RAM"]
User[RAM用户]
Role[RAM角色]
Policy[权限策略]
STS[STS临时Token]
User --> Policy
Role --> Policy
Role --> STS
end
subgraph Resources["阿里云资源"]
ECS[ECS]
OSS[OSS]
RDS[RDS]
end
User --> Resources
STS --> Resources
style Alibaba_RAM fill:#ff6a00,stroke:#ffffff,stroke-width:2px
核心机制:
- Policy语法 (类似AWS):
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:GetObject",
"oss:PutObject"
],
"Resource": [
"acs:oss:*:*:my-bucket/*"
],
"Condition": {
"IpAddress": {
"acs:SourceIp": ["192.168.0.0/16"]
}
}
}
]
}
Token机制:
- AccessKey/SecretKey: 长期静态凭证,不建议作为应用或Agent的默认运行凭证
- STS临时Token: SecurityToken + TempAccessKey + TempSecretKey
- 有效期: 900秒~3600秒
特色功能:
- 跨云账号授权: 支持主账号授权给其他阿里云账号
- 角色SSO: 支持企业IdP集成
- 资源组: 跨产品的资源分组管理
- 操作审计: 记录所有API调用
适用场景:
- 阿里云资源访问控制
- 移动应用临时授权(STS)
- 跨账号资源共享
腾讯云CAM (Cloud Access Management)
架构特点:
graph TB
subgraph Tencent_CAM["腾讯云CAM"]
User[子用户]
Role[角色]
Policy[策略]
Group[用户组]
User --> Group
Group --> Policy
Role --> Policy
end
subgraph Resources["腾讯云资源"]
CVM[CVM]
COS[COS]
TKE[TKE]
end
User --> Resources
Role --> Resources
style Tencent_CAM fill:#006eff,stroke:#ffffff,stroke-width:2px
核心机制:
- Policy结构:
{
"version": "2.0",
"statement": [
{
"effect": "allow",
"action": [
"cos:GetObject",
"cos:PutObject"
],
"resource": [
"qcs::cos:ap-guangzhou:uid/1250000000:my-bucket/*"
],
"condition": {
"ip_equal": {
"qcs:ip": ["10.0.0.1/24"]
}
}
}
]
}
Token机制:
- 长期静态密钥: SecretId + SecretKey,应优先用临时密钥、角色或工作负载身份替代
- 临时密钥: TmpSecretId + TmpSecretKey + Token
- 签名算法: HMAC-SHA256
特色功能:
- 服务角色: 授权腾讯云服务访问其他服务
- 标签授权: 基于资源标签的权限控制
- 会话管理: 查看和撤销子用户登录会话
- 操作保护: 敏感操作二次验证
适用场景:
- 腾讯云资源权限管理
- 企业多账号管理
- 临时授权(STS)
华为云IAM
核心特点:
- 统一身份认证: 支持华为云账号、IAM用户、联邦用户
- 委托授权: 类似AWS的AssumeRole
- 虚拟MFA: 支持虚拟MFA设备
- 自定义策略: 支持细粒度的资源级授权
Token机制:
- 长期AK/SK: Access Key + Secret Key,应限制使用范围并定期轮换
- 临时Token: 通过委托获取临时凭证
- IAM Token: 基于用户名密码获取的临时token(24小时有效)
7.2 IDaaS方案对比
国际方案
Okta
架构定位: 云原生身份即服务(IDaaS)平台
graph TB
subgraph Okta["Okta平台"]
UD[Universal Directory]
SSO[Single Sign-On]
MFA[Multi-Factor Auth]
LC[Lifecycle Management]
end
subgraph Apps["应用集成"]
SaaS[SaaS应用<br/>5000+]
OnPrem[本地应用]
Custom[自定义应用]
end
subgraph Protocols["协议支持"]
SAML2[SAML 2.0]
OIDC[OIDC]
OAuth2[OAuth 2.0]
end
UD --> SSO
SSO --> Apps
Protocols --> Apps
style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px
核心能力:
-
Universal Directory:
- 集中式用户目录
- 支持LDAP、AD集成
- 自定义用户属性
-
Adaptive MFA:
- 基于风险的自适应认证
- 支持TOTP、WebAuthn、生物识别
- 无密码认证(Passwordless)
-
Lifecycle Management:
- 用户自动Provisioning/Deprovisioning
- 与HR系统集成(Workday、SAP等)
Token机制:
- 标准JWT (RS256)
- 短期Access Token (1小时)
- 长期Refresh Token (可配置)
OAuth 2.0 Flow支持:
- Authorization Code + PKCE
- Client Credentials
- Resource Owner Password (不推荐)
- Device Authorization
特色功能:
- Okta Workflows: 低代码自动化(类似Zapier)
- API Access Management: 为自己的API提供OAuth服务器
- Customer Identity (CIAM): B2C场景的用户管理
适用场景:
- 企业SSO门户
- 应用集成中心
- 客户身份管理(CIAM)
优势:
- 5000+ 预集成应用
- 强大的自动化能力
- 完善的开发者生态
定价:
- 按用户数收费($2~15/user/month)
- 高昂的企业成本
Auth0 (by Okta)
架构定位: 面向开发者的身份平台
graph LR
subgraph Auth0["Auth0"]
Rules[Rules引擎]
Actions[Actions]
Connections[Identity Connections]
MFA[MFA]
end
subgraph Integrations["身份源"]
Social[社交登录<br/>Google/FB/GitHub]
Enterprise[企业IdP<br/>AD/SAML]
DB[用户名密码]
end
Integrations --> Connections
Connections --> Rules
Rules --> Actions
style Auth0 fill:#eb5424,stroke:#ffffff,stroke-width:2px
核心能力:
-
Universal Login:
- 托管登录页面
- 自定义品牌和UI
- 跨域SSO
-
Rules & Actions:
- 可编程的认证流程
- JavaScript/Node.js编写
- 支持调用外部API
Rules示例:
function addRolesToToken(user, context, callback) {
// 从数据库查询用户角色
const roles = getUserRoles(user.email);
// 添加到Token的claims
context.idToken['https://myapp.com/roles'] = roles;
context.accessToken['https://myapp.com/roles'] = roles;
callback(null, user, context);
}
- Organizations (B2B多租户):
- 租户隔离
- 每个租户独立的身份源
- 租户级别的品牌定制
Token机制:
- 标准JWT
- 支持自定义Claims
- Token签名算法可配置(RS256/HS256)
特色功能:
- Extensibility: 高度可定制的认证流程
- Social Connections: 30+社交登录提供商
- Passwordless: 邮箱/短信验证码登录
- Breached Password Detection: 泄露密码检测
适用场景:
- SaaS应用的用户认证
- 移动应用登录
- 多租户B2B应用
优势:
- 开发者友好的API和SDK
- 灵活的可扩展性
- 快速集成(几小时完成)
定价:
- 免费版: 7000 MAU (月活用户)
- 付费版: $35~240/month (起步价)
国内方案
身份宝 (IDaaS by 阿里云)
架构定位: 企业级身份管理云服务
graph TB
subgraph IDaaS["身份宝"]
UD[统一用户目录]
SSO[单点登录]
MFA[多因素认证]
Sync[账号同步]
end
subgraph Sources["身份源"]
AD[Active Directory]
LDAP[LDAP]
DingTalk[钉钉]
WeChat[企业微信]
end
subgraph Apps["应用"]
Internal[内部应用]
SaaS[SaaS应用]
Alibaba[阿里云控制台]
end
Sources --> Sync
Sync --> UD
UD --> SSO
SSO --> Apps
style IDaaS fill:#ff6a00,stroke:#ffffff,stroke-width:2px
核心能力:
-
多源身份集成:
- AD/LDAP同步
- 钉钉、企业微信集成
- HR系统对接
-
应用门户:
- 企业应用市场
- 统一工作台
- 移动端APP
-
权限管理:
- RBAC权限模型
- 应用级授权
- 数据权限控制
特色功能:
- 国密支持: SM2/SM3/SM4算法
- 本地化部署: 支持私有化部署
- 合规认证: 等保三级、ISO 27001
适用场景:
- 国企/央企身份管理
- 需要本地化部署的场景
- 阿里云生态深度用户
玉符 (AuthingCloud)
架构定位: 开发者友好的身份云
graph LR
subgraph Authing["玉符"]
Core[认证核心]
Pipeline[Pipeline扩展]
Rules[业务规则]
end
subgraph Features["功能"]
Social[社交登录]
MFA[MFA]
RBAC[权限管理]
Webhooks[Webhooks]
end
Core --> Pipeline
Pipeline --> Rules
Features --> Core
style Authing fill:#396aff,stroke:#ffffff,stroke-width:2px
核心能力:
-
快速集成:
- 5分钟接入
- 丰富的SDK(20+语言)
- 托管登录页
-
Pipeline机制:
- 可编程的认证流程
- 类似Auth0的Rules
- 支持JavaScript/Python
Pipeline示例:
async function pipe(user, context, callback) {
// 从自己的数据库获取额外信息
const userInfo = await getUserFromDB(user.id);
// 添加到Token
user.customData = {
department: userInfo.department,
level: userInfo.level
};
return callback(null, user, context);
}
- 多租户SaaS:
- 租户隔离
- 独立用户池
- 品牌定制
Token机制:
- 标准JWT (RS256/HS256)
- 自定义Claims
- Token生命周期可配置
特色功能:
- 国内社交登录: 微信、支付宝、QQ等
- 小程序登录: 微信小程序一键登录
- 实名认证: 集成第三方实名认证服务
- 国密支持: 符合国内合规要求
适用场景:
- 互联网应用/SaaS
- 移动应用/小程序
- 需要国内社交登录的场景
优势:
- 国内网络环境优化
- 微信生态深度集成
- 性价比高
定价:
- 免费版: 7000 MAU
- 付费版: ¥3000~30000/year
竹云 (BambooCloud)
架构定位: 企业级IAM全栈方案
核心能力:
- 4A统一管控: Account(账号)、Authentication(认证)、Authorization(授权)、Audit(审计)
- 特权账号管理(PAM): 运维人员权限管控
- 堡垒机集成: 安全运维审计
特色功能:
- 面向传统企业/政府
- 强调合规和审计
- 支持私有化部署
适用场景:
- 大型企业集团
- 金融/政府行业
- 需要PAM能力的场景
7.3 开源IAM方案
Keycloak
架构定位: 开源身份和访问管理解决方案
graph TB
subgraph Keycloak["Keycloak"]
Realm[Realm<br/>租户隔离]
Client[Client<br/>应用注册]
User[User Federation]
IDP[Identity Brokering]
end
subgraph Features["功能"]
SSO[SSO]
OAuth[OAuth 2.0/OIDC]
SAML[SAML 2.0]
LDAP[LDAP/AD]
end
Realm --> Client
Realm --> User
User --> LDAP
IDP --> Features
style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px
核心能力:
-
Realm多租户:
- 每个Realm独立的用户、角色、客户端
- 跨Realm SSO
-
User Federation:
- LDAP/AD集成
- 自定义User Storage SPI
-
Identity Brokering:
- 作为OAuth/OIDC代理
- 支持社交登录(Google、GitHub等)
Token机制:
- 标准JWT (RS256)
- Access Token / ID Token / Refresh Token
- Token mapper自定义Claims
Client配置示例:
{
"clientId": "my-app",
"enabled": true,
"clientAuthenticatorType": "client-secret",
"redirectUris": ["https://myapp.com/callback"],
"webOrigins": ["https://myapp.com"],
"protocol": "openid-connect",
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": false
}
特色功能:
- Admin Console: 完善的Web管理界面
- Account Console: 用户自服务门户
- Custom Themes: 自定义登录页面
- Event Listeners: 审计和集成
适用场景:
- 企业内部应用SSO
- 微服务身份管理
- 需要自主可控的场景
优势:
- 完全开源免费
- 功能完整(不输商业产品)
- 活跃的社区支持
劣势:
- 学习曲线陡峭
- 资源消耗较大(Java应用)
- 需要自己运维
Casdoor
架构定位: 国产化开源IAM平台
graph LR
subgraph Casdoor["Casdoor"]
Org[Organization]
App[Application]
User[User]
Provider[Provider]
end
subgraph Features["特色"]
WeChat[微信登录]
Alipay[支付宝]
DingTalk[钉钉]
LDAP[LDAP]
end
Org --> App
App --> User
Provider --> Features
style Casdoor fill:#2e7d32,stroke:#ffffff,stroke-width:2px
核心能力:
- 多组织管理: Organization概念隔离租户
- 国内生态: 微信、支付宝、钉钉等登录
- Casbin集成: 内置权限管理
Token机制:
- 标准JWT
- 支持自定义字段
特色功能:
- 全中文界面和文档
- 支持国密算法
- 轻量级(Go语言开发)
适用场景:
- 国内中小企业
- 需要快速部署的场景
- 对国密有要求的项目
7.4 方案选型决策树
graph TB
Start{需求分析}
Start --> Q1{部署方式?}
Q1 -->|云服务| Q2{预算?}
Q1 -->|私有化| Q3{规模?}
Q2 -->|充足| Okta[Okta/Auth0]
Q2 -->|有限| Authing[玉符/身份宝]
Q3 -->|大型| Keycloak[Keycloak]
Q3 -->|中小型| Casdoor[Casdoor]
Start --> Q4{场景?}
Q4 -->|云资源| Cloud{云厂商?}
Q4 -->|应用| App{类型?}
Cloud -->|AWS| AWS_IAM[AWS IAM]
Cloud -->|Azure| Azure_AD[Azure AD]
Cloud -->|阿里云| Alibaba_RAM[阿里云RAM]
Cloud -->|腾讯云| Tencent_CAM[腾讯云CAM]
App -->|B2B SaaS| Auth0
App -->|B2C| Authing
App -->|企业内部| Keycloak
style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px
style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px
style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px
7.5 综合对比表
| 方案 | 类型 | 部署方式 | 适用场景 | Token类型 | 价格 | 优势 | 劣势 |
|---|---|---|---|---|---|---|---|
| AWS IAM | 云IAM | SaaS | AWS资源 | AWS签名 | 按使用量 | AWS生态 | 仅限AWS |
| Azure AD | 企业IdP | SaaS | 企业SSO | JWT | $6~21/user/month | 企业功能完整 | 价格较高 |
| Okta | IDaaS | SaaS | 应用SSO | JWT | $2~15/user/month | 应用集成多 | 价格昂贵 |
| Auth0 | IDaaS | SaaS | 开发者 | JWT | $35~240/month | 开发者友好 | MAU限制 |
| Keycloak | 开源 | 自建 | 企业内部 | JWT | 免费 | 功能完整 | 运维成本 |
| 阿里云RAM | 云IAM | SaaS | 阿里云资源 | STS Token | 按使用量 | 阿里云集成 | 仅限阿里云 |
| 玉符 | IDaaS | SaaS | 国内应用 | JWT | ¥3000~30000/year | 国内生态 | 功能相对简单 |
| 身份宝 | IDaaS | SaaS/私有化 | 企业级 | JWT | 定制报价 | 本地化支持 | 价格不透明 |
| Casdoor | 开源 | 自建 | 中小企业 | JWT | 免费 | 轻量级 | 社区较小 |
7.6 架构设计建议
场景一: 初创公司 (0~50人)
推荐方案: Auth0 / 玉符
架构:
用户 → Auth0托管登录 → 应用 (接收JWT)
理由:
- 快速集成(1天完成)
- 无需运维
- 成本可控(免费额度内)
场景二: 成长期公司 (50~500人)
推荐方案: Keycloak (自建) / Okta (预算充足)
架构:
用户 → Keycloak SSO → 内部应用(10+)
→ SaaS应用(SAML)
→ API网关(JWT)
理由:
- 应用数量增加,SSO需求
- 自建Keycloak节省成本
- 支持LDAP集成企业AD
场景三: 大型企业 (500+人)
推荐方案: Azure AD (Microsoft生态) / 混合架构
混合架构:
[Azure AD] ←→ [On-Premises AD]
↓
[应用网关]
↓ ↓
[内部应用] [SaaS应用]
↓
[Keycloak] (微服务内部)
理由:
- 企业级功能(PIM、条件访问)
- 与Office 365集成
- 混合云支持
- 微服务层使用轻量级Keycloak
场景四: 多云/混合云
推荐方案: Workload Identity Federation
架构:
[GKE Pod] → [Workload Identity] → [GCP Service Account]
↓
[AssumeRoleWithWebIdentity]
↓
[AWS IAM Role]
关键技术:
- Google Workload Identity
- AWS IAM OIDC Provider
- Azure Managed Identity
理由:
- 无需管理云凭证
- 支持跨云访问
- 最小权限原则
7.7 实战建议
典型场景建议
场景假设:企业 IAM 系统给用户和 AI Agent 颁发身份令牌,网关需要还原身份并进行权限控制。
推荐方案:
-
如果客户IAM是云厂商的:
- AWS: 使用STS AssumeRole
- Azure: 使用Managed Identity
- 阿里云: 使用RAM STS
-
如果客户IAM是企业自建的:
- 要求客户IAM提供JWKS端点
- 网关使用公钥验证JWT
- 实现本文档第6章的方案
-
如果需要从零建设IAM:
- 小型项目: 使用Auth0/玉符
- 中型项目: 自建Keycloak
- 大型项目: Azure AD + Keycloak混合
关键对接点:
# 与客户IAM的对接清单
1. JWKS端点: https://customer-iam.com/.well-known/jwks.json
2. Token格式: JWT (RS256)
3. Claims约定:
- sub: 用户/Agent ID
- roles: 角色数组
- permissions: 权限数组
- agent_type: AI Agent标识
4. Token有效期:
- 用户Access Token: 5-60分钟,按风险配置
- Agent Access Token: 短期有效,通常分钟级到1小时
- Refresh Token: 如需使用,必须rotation、重用检测、可撤销、可轮换
5. 撤销机制:
- 方式1: jti黑名单(Redis)
- 方式2: Introspection端点
6. 令牌绑定:
- 高价值Agent使用mTLS证书绑定或DPoP
- 后端校验aud/scope/cnf/on_behalf_of等边界
参考资源
标准规范
- RFC 7519: JWT
- RFC 7662: OAuth 2.0 Token Introspection
- RFC 8693: OAuth 2.0 Token Exchange
- RFC 8705: OAuth 2.0 Mutual-TLS Client Authentication and Certificate-Bound Access Tokens
- RFC 9449: OAuth 2.0 Demonstrating Proof of Possession (DPoP)
- RFC 9700: Best Current Practice for OAuth 2.0 Security
- OAuth 2.1 Authorization Framework (Internet-Draft)
- OpenID Connect Core
- NIST SP 800-63B: Digital Identity Guidelines
- OWASP Top 10 for LLM Applications 2025
云厂商文档
开源项目
书籍推荐
- OAuth 2.0 in Action by Justin Richer
- Solving Identity Management in Modern Applications by Yvonne Wilson
- Zero Trust Networks by Evan Gilman