身份认证与访问控制深度理论知识
身份认证与访问控制深度理论知识
本文档专注于IAM系统、令牌机制、网关认证鉴权的完整理论与实践
目录
IAM基础理论
1.1 核心概念辨析
graph TB
subgraph Identity["身份 (Identity)"]
User["用户<br/>• 人类用户<br/>• 服务账号<br/>• AI Agent"]
Attributes["身份属性<br/>• ID: user_123<br/>• Email<br/>• Roles<br/>• Groups"]
end
subgraph Authentication["认证 (Authentication)"]
Verify["验证身份<br/>回答: 你是谁?"]
Methods["认证方式<br/>• 密码<br/>• MFA<br/>• 证书<br/>• 生物识别"]
end
subgraph Authorization["授权 (Authorization)"]
CheckPerm["检查权限<br/>回答: 你能做什么?"]
Models["权限模型<br/>• RBAC<br/>• ABAC<br/>• ReBAC"]
end
Identity --> Authentication
Authentication --> Authorization
style Identity fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style Authentication fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
style Authorization fill:#e8f5e9,stroke:#43a047,stroke-width:2px
关键区别:
| 概念 | 问题 | 验证内容 | 结果 |
|---|---|---|---|
| 身份 (Identity) | 你声称是谁? | 身份标识符 (ID, Email) | 身份声明 |
| 认证 (Authentication) | 你真的是这个人吗? | 凭证 (密码, Token, 证书) | 身份确认 |
| 授权 (Authorization) | 你可以访问这个资源吗? | 权限、角色、策略 | 访问决策 |
1.2 IAM系统的核心职责
graph LR
subgraph IAM["IAM系统"]
IM[身份管理<br/>Identity Management]
AM[访问管理<br/>Access Management]
PM[策略管理<br/>Policy Management]
TM[令牌管理<br/>Token Management]
end
IM --> |用户、组、角色| AM
PM --> |定义权限规则| AM
AM --> |颁发令牌| TM
External[外部请求] --> TM
TM --> |验证令牌| Gateway[API网关]
IAM的四大支柱:
-
身份管理 (Identity Management)
- 用户生命周期管理(创建、更新、删除)
- 身份属性存储(用户信息、元数据)
- 身份联合(SSO、LDAP、SAML)
-
访问管理 (Access Management)
- 认证流程(登录、MFA)
- 会话管理(Session、Cookie)
- 令牌颁发(JWT、OAuth Token)
-
策略管理 (Policy Management)
- 权限定义(RBAC角色、ABAC策略)
- 策略存储与分发
- 策略评估引擎
-
令牌管理 (Token Management)
- 令牌生成与签名
- 令牌验证与刷新
- 令牌撤销(黑名单)
身份令牌机制
2.1 令牌类型对比
| 令牌类型 | 特点 | 验证方式 | 适用场景 | 安全性 |
|---|---|---|---|---|
| JWT (自包含令牌) | 令牌本身包含用户信息 | 本地验证签名 | 无状态API、微服务 | 中(无法主动撤销) |
| Opaque Token (引用令牌) | 随机字符串,需查询获取信息 | 调用IAM验证 | 需要集中控制、可撤销 | 高(集中管理) |
| Session ID | 服务器端存储,客户端持有ID | 查询Session存储 | 单体应用、Web应用 | 高(服务器控制) |
2.2 JWT详解
JWT结构
JWT = Header.Payload.Signature
完整示例:
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzEyMyIsIm5hbWUiOiJBbGljZSIsInJvbGVzIjpbImFkbWluIl0sImlhdCI6MTcwNjQ1MTYwMCwiZXhwIjoxNzA2NDU1MjAwfQ.signature_here
Header (头部):
{
"alg": "RS256", // 签名算法: RSA + SHA256
"typ": "JWT", // 令牌类型
"kid": "key-2024-01" // 密钥ID (用于密钥轮换)
}
Payload (负载):
{
// 标准声明 (Standard Claims)
"sub": "user_123", // Subject: 用户ID
"iss": "https://iam.example.com", // Issuer: 颁发者
"aud": "api.example.com", // Audience: 目标受众
"exp": 1706455200, // Expiration: 过期时间 (Unix时间戳)
"iat": 1706451600, // Issued At: 颁发时间
"nbf": 1706451600, // Not Before: 生效时间
"jti": "jwt-uuid-456", // JWT ID: 唯一标识
// 自定义声明 (Custom Claims)
"name": "Alice",
"email": "alice@example.com",
"roles": ["admin", "editor"],
"permissions": ["read:users", "write:posts"],
"tenant_id": "org_789",
"agent_type": "ai_assistant" // AI Agent标识
}
Signature (签名):
// 签名算法
RSASHA256(
base64UrlEncode(header) + "." + base64UrlEncode(payload),
privateKey // 使用私钥签名
)
JWT签名算法对比
| 算法 | 类型 | 密钥 | 安全性 | 性能 | 适用场景 |
|---|---|---|---|---|---|
| HS256 | 对称 | 共享密钥 | 中 | 快 | 单体应用、内部服务 |
| RS256 | 非对称 | 公钥/私钥 | 高 | 慢 | 分布式系统、多方验证 |
| ES256 | 非对称 | 椭圆曲线 | 高 | 中 | 移动应用、IoT |
安全建议:
- 微服务网关: RS256 (网关只需公钥验证,IAM持有私钥签名)
- 内部服务间: HS256 (性能更好,密钥可共享)
- 移动端/前端: ES256 (签名短,性能平衡)
2.3 Opaque Token机制
sequenceDiagram
participant Client as 客户端
participant Gateway as API网关
participant IAM as IAM系统
participant Redis as Token存储
participant Service as 后端服务
Note over Client,IAM: 1. 令牌颁发阶段
Client->>IAM: 登录请求 (username + password)
IAM->>Redis: 存储令牌信息<br/>token_abc123 → {user_id, roles, ...}
IAM->>Client: 返回 Opaque Token: abc123xyz
Note over Client,Service: 2. 令牌验证阶段
Client->>Gateway: 请求 + Token: abc123xyz
Gateway->>IAM: 验证令牌 (Introspection Endpoint)
IAM->>Redis: 查询 token_abc123
Redis->>IAM: 返回用户信息
IAM->>Gateway: 返回验证结果 + 用户信息
Gateway->>Service: 转发请求 (附加用户上下文)
Service->>Gateway: 响应
Gateway->>Client: 响应
Opaque Token的优势:
- 可主动撤销: 删除Redis中的令牌即可立即失效
- 安全性高: 令牌本身不包含任何信息,泄露后无法直接读取
- 集中控制: IAM系统完全控制令牌生命周期
劣势:
- 性能开销: 每次请求都需要调用IAM验证
- 可用性依赖: IAM/Redis故障会影响所有服务
- 网络延迟: 增加额外的网络调用
2.4 JWT vs Opaque Token 选型
graph TB
Start{令牌选型}
Start --> Q1{需要主动撤销?}
Q1 -->|是| Opaque[Opaque Token]
Q1 -->|否| Q2{高并发场景?}
Q2 -->|是| JWT[JWT]
Q2 -->|否| Q3{多方验证?}
Q3 -->|是| JWT
Q3 -->|否| Q4{对性能敏感?}
Q4 -->|是| JWT
Q4 -->|否| Opaque
style JWT fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Opaque fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
推荐方案:
| 场景 | 推荐 | 理由 |
|---|---|---|
| 高并发API | JWT | 无需每次查询IAM,减少延迟 |
| 金融/安全敏感 | Opaque | 可主动撤销,集中控制 |
| 微服务网关 | JWT | 网关本地验证,降低IAM压力 |
| 内部管理系统 | Opaque | 需要实时权限更新 |
| AI Agent | JWT | 长期有效,减少认证次数 |
混合方案 (推荐):
用户登录 → IAM颁发 JWT (短期,1小时) + Refresh Token (Opaque, 7天)
- JWT: 日常API访问,网关本地验证
- Refresh Token: 刷新JWT,通过IAM验证
- 撤销: 将JWT的jti加入黑名单 (Redis)
网关认证鉴权架构
3.1 完整认证鉴权流程
sequenceDiagram
participant Client as 客户端
participant Gateway as API网关
participant IAM as IAM系统
participant AuthZ as 权限服务
participant Service as 后端服务
Note over Client,Service: 阶段1: 认证 (Authentication)
Client->>Gateway: 请求 + JWT Token
Note over Gateway: 1.1 Token格式验证
alt Token格式错误
Gateway->>Client: 401 Unauthorized
end
Note over Gateway: 1.2 签名验证
Gateway->>Gateway: 使用IAM公钥验证签名
alt 签名验证失败
Gateway->>Client: 401 Invalid Token
end
Note over Gateway: 1.3 过期时间检查
alt Token已过期
Gateway->>Client: 401 Token Expired
end
Note over Gateway: 1.4 黑名单检查
Gateway->>IAM: 检查jti是否在黑名单
alt 在黑名单中
Gateway->>Client: 401 Token Revoked
end
Note over Client,Service: 阶段2: 身份还原
Note over Gateway: 2.1 解析JWT Payload
Gateway->>Gateway: 提取用户信息:<br/>• sub (用户ID)<br/>• roles<br/>• permissions<br/>• tenant_id
Note over Gateway: 2.2 构建用户上下文
Gateway->>Gateway: 创建 UserContext 对象
Note over Client,Service: 阶段3: 授权 (Authorization)
Note over Gateway: 3.1 路径匹配
Gateway->>Gateway: 请求路径: POST /api/users
Note over Gateway: 3.2 权限检查
alt 简单RBAC (网关实现)
Gateway->>Gateway: 检查 roles 是否包含 "admin"
else 复杂策略 (调用权限服务)
Gateway->>AuthZ: 请求授权决策<br/>{user, resource, action}
AuthZ->>Gateway: 返回 Allow/Deny
end
alt 权限不足
Gateway->>Client: 403 Forbidden
end
Note over Client,Service: 阶段4: 请求转发
Gateway->>Service: 转发请求 + 用户上下文<br/>Headers:<br/>• X-User-ID: user_123<br/>• X-User-Roles: admin,editor<br/>• X-Tenant-ID: org_789
Note over Service: 服务信任网关<br/>无需再次认证
Service->>Gateway: 响应
Gateway->>Client: 响应
3.2 网关身份还原实现
方案一: 从JWT中提取(推荐)
# 网关实现示例
import jwt
from typing import Dict, List
class JWTAuthenticator:
def __init__(self, public_key: str):
self.public_key = public_key
def verify_and_extract(self, token: str) -> Dict:
"""验证JWT并提取用户信息"""
try:
# 1. 验证签名和过期时间
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
audience="api.example.com",
issuer="https://iam.example.com"
)
# 2. 提取用户信息
user_context = {
"user_id": payload.get("sub"),
"email": payload.get("email"),
"name": payload.get("name"),
"roles": payload.get("roles", []),
"permissions": payload.get("permissions", []),
"tenant_id": payload.get("tenant_id"),
"agent_type": payload.get("agent_type"), # AI Agent标识
"exp": payload.get("exp")
}
# 3. 黑名单检查(可选)
jti = payload.get("jti")
if jti and self.is_blacklisted(jti):
raise Exception("Token has been revoked")
return user_context
except jwt.ExpiredSignatureError:
raise Exception("Token expired")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {str(e)}")
def is_blacklisted(self, jti: str) -> bool:
"""检查JWT ID是否在黑名单中"""
# 查询Redis黑名单
return redis_client.sismember("jwt:blacklist", jti)
方案二: 调用IAM Introspection端点
class OpaqueTokenAuthenticator:
def __init__(self, iam_url: str):
self.iam_url = iam_url
def verify_and_extract(self, token: str) -> Dict:
"""调用IAM验证Opaque Token"""
# RFC 7662: OAuth Token Introspection
response = requests.post(
f"{self.iam_url}/oauth/introspect",
data={"token": token},
auth=(gateway_client_id, gateway_client_secret)
)
result = response.json()
if not result.get("active"):
raise Exception("Token is not active")
return {
"user_id": result.get("sub"),
"roles": result.get("roles", []),
"permissions": result.get("permissions", []),
"tenant_id": result.get("tenant_id"),
"exp": result.get("exp")
}
3.3 用户上下文传递
网关验证完成后,需要将用户信息传递给后端服务:
方式一: HTTP Header传递(推荐)
# 网关添加自定义Header
GET /api/users HTTP/1.1
Host: user-service
Authorization: Bearer <original_jwt>
X-User-ID: user_123
X-User-Email: alice@example.com
X-User-Roles: admin,editor
X-User-Permissions: read:users,write:posts
X-Tenant-ID: org_789
X-Agent-Type: ai_assistant
后端服务获取用户上下文:
from flask import request
def get_current_user():
return {
"user_id": request.headers.get("X-User-ID"),
"roles": request.headers.get("X-User-Roles", "").split(","),
"permissions": request.headers.get("X-User-Permissions", "").split(","),
"tenant_id": request.headers.get("X-Tenant-ID"),
"agent_type": request.headers.get("X-Agent-Type")
}
# 在业务逻辑中使用
@app.route("/api/users")
def list_users():
user = get_current_user()
if "admin" not in user["roles"]:
return {"error": "Forbidden"}, 403
# ...业务逻辑
方式二: 重新签名JWT传递
网关验证原始JWT后,生成内部JWT传递给服务:
def create_internal_jwt(user_context: Dict) -> str:
"""创建内部服务使用的JWT"""
payload = {
"sub": user_context["user_id"],
"roles": user_context["roles"],
"iss": "api-gateway",
"aud": "internal-services",
"exp": time.time() + 300 # 5分钟短期有效
}
return jwt.encode(payload, internal_secret, algorithm="HS256")
3.4 网关配置示例
Kong网关配置
services:
- name: user-service
url: http://user-service:8080
routes:
- name: user-routes
paths:
- /api/users
plugins:
# JWT验证插件
- name: jwt
config:
key_claim_name: kid
secret_is_base64: false
uri_param_names:
- jwt
cookie_names:
- jwt
claims_to_verify:
- exp
# 权限控制插件
- name: acl
config:
whitelist:
- admin
- editor
# 请求转换插件 (添加用户上下文)
- name: request-transformer
config:
add:
headers:
- X-User-ID:$(jwt.sub)
- X-User-Roles:$(jwt.roles)
- X-Tenant-ID:$(jwt.tenant_id)
Nginx + Lua脚本
-- nginx.conf
location /api/ {
access_by_lua_block {
local jwt = require "resty.jwt"
local cjson = require "cjson"
-- 1. 提取Token
local auth_header = ngx.var.http_Authorization
local token = string.match(auth_header, "Bearer%s+(.+)")
if not token then
ngx.status = 401
ngx.say(cjson.encode({error = "Missing token"}))
return ngx.exit(401)
end
-- 2. 验证JWT
local jwt_obj = jwt:verify(public_key, token)
if not jwt_obj.verified then
ngx.status = 401
ngx.say(cjson.encode({error = "Invalid token"}))
return ngx.exit(401)
end
-- 3. 提取用户信息
local payload = jwt_obj.payload
ngx.req.set_header("X-User-ID", payload.sub)
ngx.req.set_header("X-User-Roles", table.concat(payload.roles, ","))
ngx.req.set_header("X-Tenant-ID", payload.tenant_id)
}
proxy_pass http://backend-service;
}
权限控制模型
4.1 RBAC (基于角色的访问控制)
graph LR
User[用户: Alice] --> UR[用户-角色] --> Role[角色: Admin]
Role --> RP[角色-权限] --> Perm1[权限: read:users]
Role --> RP --> Perm2[权限: write:users]
Role --> RP --> Perm3[权限: delete:users]
style User fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style Role fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
style Perm1 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Perm2 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
style Perm3 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
网关实现RBAC:
class RBACAuthorizer:
def __init__(self):
# 权限规则配置
self.rules = {
"/api/users": {
"GET": ["admin", "viewer"],
"POST": ["admin"],
"PUT": ["admin"],
"DELETE": ["admin"]
},
"/api/posts": {
"GET": ["admin", "editor", "viewer"],
"POST": ["admin", "editor"],
"PUT": ["admin", "editor"],
"DELETE": ["admin"]
}
}
def authorize(self, user_roles: List[str], path: str, method: str) -> bool:
"""检查用户是否有权限访问资源"""
required_roles = self.rules.get(path, {}).get(method, [])
if not required_roles:
return True # 无限制
# 检查用户是否拥有任一所需角色
return any(role in required_roles for role in user_roles)
def check(self, user_context: Dict, request_path: str, request_method: str):
user_roles = user_context.get("roles", [])
if not self.authorize(user_roles, request_path, request_method):
raise PermissionDenied(
f"User with roles {user_roles} cannot {request_method} {request_path}"
)
4.2 ABAC (基于属性的访问控制)
更灵活的权限模型,基于用户属性、资源属性、环境属性进行决策。
class ABACAuthorizer:
def authorize(self, user_context: Dict, resource: Dict, action: str, environment: Dict) -> bool:
"""基于属性的授权决策"""
# 示例策略: 只有同租户的管理员才能删除资源
if action == "delete":
return (
"admin" in user_context.get("roles", []) and
user_context.get("tenant_id") == resource.get("tenant_id")
)
# 示例策略: 工作时间才能访问财务数据
if resource.get("category") == "financial":
current_hour = environment.get("hour")
return 9 <= current_hour <= 18
# 示例策略: 只能访问自己部门的数据
if action == "read":
return user_context.get("department") == resource.get("department")
return False
网关集成ABAC(调用OPA):
import requests
class OPAAuthorizer:
def __init__(self, opa_url: str):
self.opa_url = opa_url
def authorize(self, user_context: Dict, resource_path: str, action: str) -> bool:
"""调用OPA进行授权决策"""
policy_input = {
"input": {
"user": user_context,
"resource": {"path": resource_path},
"action": action,
"environment": {
"time": datetime.now().isoformat(),
"ip": request.remote_addr
}
}
}
response = requests.post(
f"{self.opa_url}/v1/data/authz/allow",
json=policy_input
)
result = response.json()
return result.get("result", False)
OPA策略示例 (Rego语言):
package authz
# 默认拒绝
default allow = false
# 管理员可以访问所有资源
allow {
input.user.roles[_] == "admin"
}
# 用户只能访问自己租户的资源
allow {
input.user.tenant_id == input.resource.tenant_id
input.action == "read"
}
# AI Agent只能读取,不能修改
allow {
input.user.agent_type == "ai_assistant"
input.action == "read"
}
AI Agent身份管理
5.1 AI Agent的身份特点
AI Agent与普通用户的区别:
| 特性 | 普通用户 | AI Agent |
|---|---|---|
| 身份类型 | 人类用户 | 服务账号 |
| 认证方式 | 密码 + MFA | API Key / Client Credentials |
| 令牌有效期 | 短期 (1小时) | 长期 (7天~永久) |
| 权限范围 | 基于角色 | 基于用途限定 |
| 审计要求 | 记录操作人 | 记录代理用户 (on-behalf-of) |
5.2 AI Agent的令牌设计
{
"sub": "agent_ai_assistant_123",
"agent_type": "ai_assistant",
"agent_name": "Customer Support Bot",
"on_behalf_of": "user_789", // 代表哪个用户
"delegated_by": "user_789", // 由谁授权
"roles": ["ai_agent"],
"permissions": [
"read:conversations",
"write:responses",
"read:knowledge_base"
],
"scope": "assistant:basic", // OAuth scope限定
"constraints": {
"rate_limit": 1000, // 速率限制
"allowed_resources": ["conversations", "knowledge_base"],
"forbidden_actions": ["delete", "admin"]
},
"iss": "https://iam.example.com",
"aud": "api.example.com",
"exp": 1706883600, // 7天有效期
"iat": 1706278800
}
5.3 AI Agent的授权模式
模式一: Client Credentials Flow
适用于完全自主的AI Agent (无需用户授权)
sequenceDiagram
participant Agent as AI Agent
participant IAM as IAM系统
participant API as API网关
Agent->>IAM: 请求令牌<br/>client_id + client_secret
IAM->>IAM: 验证客户端凭证
IAM->>Agent: 返回 Access Token (JWT)
Agent->>API: 请求 + Access Token
API->>API: 验证JWT<br/>提取 agent_type
API->>Agent: 返回数据
模式二: Token Exchange (代表用户)
适用于需要代表用户操作的AI Agent
sequenceDiagram
participant User as 用户
participant Agent as AI Agent
participant IAM as IAM系统
participant API as API网关
User->>IAM: 登录获取用户Token
IAM->>User: 返回 user_token
User->>Agent: 授权AI Agent<br/>user_token
Agent->>IAM: Token Exchange<br/>user_token + agent_credentials
IAM->>Agent: 返回 delegated_token<br/>(包含 on_behalf_of)
Agent->>API: 请求 + delegated_token
API->>API: 验证JWT<br/>识别是Agent代表用户
API->>Agent: 返回数据
Token Exchange实现 (RFC 8693):
def token_exchange(user_token: str, agent_credentials: Dict) -> str:
"""交换令牌:用户令牌 → AI Agent代理令牌"""
response = requests.post(
"https://iam.example.com/oauth/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token": user_token,
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"client_id": agent_credentials["client_id"],
"client_secret": agent_credentials["client_secret"],
"scope": "assistant:basic"
}
)
return response.json()["access_token"]
5.4 AI Agent权限控制策略
class AIAgentAuthorizer:
def authorize(self, user_context: Dict, action: str, resource: str) -> bool:
"""AI Agent特殊授权逻辑"""
# 1. 识别AI Agent
if user_context.get("agent_type") == "ai_assistant":
# 2. 检查约束条件
constraints = user_context.get("constraints", {})
# 禁止的操作
if action in constraints.get("forbidden_actions", []):
return False
# 限定的资源
allowed_resources = constraints.get("allowed_resources", [])
if allowed_resources and resource not in allowed_resources:
return False
# 3. 检查代理权限
if "on_behalf_of" in user_context:
# 验证代理用户是否有权限
original_user_id = user_context["on_behalf_of"]
return self.check_user_permission(original_user_id, action, resource)
# 普通用户权限检查
return self.check_user_permission(user_context["user_id"], action, resource)
实战场景解决方案
6.1 您的问题解决方案
问题: 客户的IAM系统给用户和AI Agent颁发了身份令牌,但是我在网关上如何基于身份令牌还原用户身份原始信息,再进行权限匹配控制?
完整解决方案架构
graph TB
subgraph Client["客户端"]
User[用户]
Agent[AI Agent]
end
subgraph IAM["客户IAM系统"]
IssueToken[令牌颁发]
PublicKey[公钥分发]
end
subgraph Gateway["API网关"]
Extract[1. 提取Token]
Verify[2. 验证签名]
Parse[3. 解析Payload]
Restore[4. 还原身份]
Authorize[5. 权限检查]
end
subgraph Backend["后端服务"]
Service[业务服务]
end
User --> |JWT Token| Extract
Agent --> |JWT Token| Extract
IAM --> |颁发| User
IAM --> |颁发| Agent
IAM -.->|提供公钥| Verify
Extract --> Verify
Verify --> Parse
Parse --> Restore
Restore --> Authorize
Authorize --> |通过| Service
Authorize --> |拒绝| User
详细实现步骤
步骤1: 获取IAM公钥
import requests
import json
class IAMPublicKeyProvider:
def __init__(self, iam_jwks_url: str):
self.jwks_url = iam_jwks_url
self.keys_cache = {}
def get_public_key(self, kid: str) -> str:
"""获取指定kid的公钥"""
if kid in self.keys_cache:
return self.keys_cache[kid]
# 从IAM的JWKS端点获取公钥
response = requests.get(self.jwks_url)
jwks = response.json()
for key in jwks["keys"]:
if key["kid"] == kid:
# 转换JWK为PEM格式
public_key = self.jwk_to_pem(key)
self.keys_cache[kid] = public_key
return public_key
raise Exception(f"Public key not found for kid: {kid}")
def jwk_to_pem(self, jwk: Dict) -> str:
"""将JWK转换为PEM格式"""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import base64
# 解析JWK
n = int.from_bytes(base64.urlsafe_b64decode(jwk["n"] + "=="), "big")
e = int.from_bytes(base64.urlsafe_b64decode(jwk["e"] + "=="), "big")
# 构造公钥
public_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend())
# 转换为PEM
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode("utf-8")
步骤2: 网关验证和身份还原
import jwt
from typing import Dict, Optional
import redis
class GatewayAuthenticator:
def __init__(self,
public_key_provider: IAMPublicKeyProvider,
redis_client: redis.Redis,
iam_issuer: str,
api_audience: str):
self.public_key_provider = public_key_provider
self.redis = redis_client
self.iam_issuer = iam_issuer
self.api_audience = api_audience
def authenticate_and_restore_identity(self, authorization_header: str) -> Dict:
"""完整的认证和身份还原流程"""
# 1. 提取Token
token = self.extract_token(authorization_header)
if not token:
raise AuthenticationError("Missing or invalid Authorization header")
# 2. 解码Header获取kid
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
if not kid:
raise AuthenticationError("Missing kid in JWT header")
# 3. 获取公钥
public_key = self.public_key_provider.get_public_key(kid)
# 4. 验证JWT签名和claims
try:
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=self.api_audience,
issuer=self.iam_issuer,
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_aud": True,
"verify_iss": True
}
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError as e:
raise AuthenticationError(f"Invalid token: {str(e)}")
# 5. 黑名单检查(可选)
jti = payload.get("jti")
if jti and self.is_token_revoked(jti):
raise AuthenticationError("Token has been revoked")
# 6. 还原用户身份
user_identity = self.restore_identity(payload)
return user_identity
def extract_token(self, authorization_header: str) -> Optional[str]:
"""从Authorization header中提取token"""
if not authorization_header:
return None
parts = authorization_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
return None
return parts[1]
def is_token_revoked(self, jti: str) -> bool:
"""检查token是否被撤销"""
return self.redis.sismember("jwt:blacklist", jti)
def restore_identity(self, payload: Dict) -> Dict:
"""从JWT payload还原完整用户身份"""
# 基础身份信息
identity = {
"user_id": payload.get("sub"),
"email": payload.get("email"),
"name": payload.get("name"),
"tenant_id": payload.get("tenant_id"),
# 权限相关
"roles": payload.get("roles", []),
"permissions": payload.get("permissions", []),
"groups": payload.get("groups", []),
# Token元数据
"token_id": payload.get("jti"),
"issued_at": payload.get("iat"),
"expires_at": payload.get("exp"),
# 识别用户类型
"is_human": payload.get("agent_type") is None,
"is_agent": payload.get("agent_type") is not None,
}
# AI Agent特殊字段
if identity["is_agent"]:
identity.update({
"agent_type": payload.get("agent_type"),
"agent_name": payload.get("agent_name"),
"on_behalf_of": payload.get("on_behalf_of"), # 代理的用户
"delegated_by": payload.get("delegated_by"),
"constraints": payload.get("constraints", {})
})
return identity
步骤3: 权限匹配控制
class GatewayAuthorizer:
def __init__(self, policy_config: Dict):
self.policies = policy_config
def authorize(self, user_identity: Dict, request_path: str, request_method: str) -> bool:
"""基于用户身份进行权限检查"""
# 1. 查找匹配的策略
policy = self.match_policy(request_path, request_method)
if not policy:
return True # 无策略限制,默认允许
# 2. 检查是否是AI Agent
if user_identity.get("is_agent"):
return self.authorize_agent(user_identity, policy)
# 3. 检查普通用户权限
return self.authorize_user(user_identity, policy)
def match_policy(self, path: str, method: str) -> Optional[Dict]:
"""匹配路径的权限策略"""
for pattern, policy in self.policies.items():
if self.path_matches(path, pattern):
return policy.get(method)
return None
def authorize_user(self, user_identity: Dict, policy: Dict) -> bool:
"""普通用户权限检查"""
required_roles = policy.get("required_roles", [])
required_permissions = policy.get("required_permissions", [])
user_roles = set(user_identity.get("roles", []))
user_permissions = set(user_identity.get("permissions", []))
# 检查角色
if required_roles:
if not any(role in user_roles for role in required_roles):
return False
# 检查权限
if required_permissions:
if not all(perm in user_permissions for perm in required_permissions):
return False
return True
def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
"""AI Agent权限检查"""
constraints = user_identity.get("constraints", {})
# 1. 检查禁止的操作
forbidden_actions = constraints.get("forbidden_actions", [])
if policy.get("action") in forbidden_actions:
return False
# 2. 如果Agent代理用户,检查被代理用户的权限
if user_identity.get("on_behalf_of"):
# 这里需要查询原始用户的权限
original_user = self.get_user_by_id(user_identity["on_behalf_of"])
return self.authorize_user(original_user, policy)
# 3. Agent自身的权限检查
agent_permissions = set(user_identity.get("permissions", []))
required_permissions = set(policy.get("required_permissions", []))
return required_permissions.issubset(agent_permissions)
步骤4: 完整的网关中间件
from flask import Flask, request, jsonify, g
app = Flask(__name__)
# 初始化组件
public_key_provider = IAMPublicKeyProvider("https://customer-iam.com/.well-known/jwks.json")
redis_client = redis.Redis(host="redis", port=6379)
authenticator = GatewayAuthenticator(
public_key_provider=public_key_provider,
redis_client=redis_client,
iam_issuer="https://customer-iam.com",
api_audience="api.yourservice.com"
)
authorizer = GatewayAuthorizer(policy_config={
"/api/users": {
"GET": {"required_roles": ["admin", "viewer"]},
"POST": {"required_roles": ["admin"]},
"DELETE": {"required_roles": ["admin"], "action": "delete"}
},
"/api/posts": {
"GET": {"required_permissions": ["read:posts"]},
"POST": {"required_permissions": ["write:posts"]},
}
})
@app.before_request
def authenticate_request():
"""请求前置:认证和鉴权"""
# 1. 认证:验证token并还原身份
try:
auth_header = request.headers.get("Authorization")
user_identity = authenticator.authenticate_and_restore_identity(auth_header)
g.user = user_identity # 存储到Flask的g对象
except AuthenticationError as e:
return jsonify({"error": str(e)}), 401
# 2. 鉴权:检查权限
if not authorizer.authorize(user_identity, request.path, request.method):
return jsonify({
"error": "Forbidden",
"message": f"User {user_identity['user_id']} does not have permission to {request.method} {request.path}"
}), 403
@app.after_request
def add_user_context_headers(response):
"""向后端服务传递用户上下文"""
if hasattr(g, "user"):
user = g.user
# 添加自定义header
headers_to_add = {
"X-User-ID": user["user_id"],
"X-User-Email": user.get("email", ""),
"X-User-Roles": ",".join(user.get("roles", [])),
"X-Tenant-ID": user.get("tenant_id", ""),
"X-Is-Agent": str(user.get("is_agent", False)),
}
if user.get("is_agent"):
headers_to_add["X-Agent-Type"] = user.get("agent_type", "")
headers_to_add["X-On-Behalf-Of"] = user.get("on_behalf_of", "")
for key, value in headers_to_add.items():
response.headers[key] = value
return response
# 业务路由
@app.route("/api/users", methods=["GET"])
def list_users():
user = g.user
# 业务逻辑...
return jsonify({
"message": f"Hello {user['name']}",
"users": [...]
})
6.2 关键配置清单
IAM系统需要提供的信息
-
JWKS端点:
https://customer-iam.com/.well-known/jwks.json{ "keys": [ { "kty": "RSA", "kid": "key-2024-01", "use": "sig", "alg": "RS256", "n": "base64_encoded_modulus", "e": "AQAB" } ] } -
JWT格式约定:
- Header必须包含
kid字段 - Payload必须包含
sub,iss,aud,exp,iat - 自定义claims的字段名约定(roles, permissions等)
- Header必须包含
-
Token撤销机制 (可选):
- 提供Introspection端点:
POST /oauth/introspect - 或提供黑名单同步机制
- 提供Introspection端点:
网关配置项
# gateway-config.yaml
authentication:
iam:
issuer: "https://customer-iam.com"
jwks_url: "https://customer-iam.com/.well-known/jwks.json"
audience: "api.yourservice.com"
token:
blacklist_enabled: true
blacklist_redis_key: "jwt:blacklist"
cache_public_keys: true
cache_ttl: 3600 # 1小时
authorization:
mode: "rbac" # 或 "abac", "opa"
# RBAC配置
rbac:
policies:
- path: "/api/users"
methods:
GET: ["admin", "viewer"]
POST: ["admin"]
DELETE: ["admin"]
- path: "/api/posts"
methods:
GET: ["admin", "editor", "viewer"]
POST: ["admin", "editor"]
# AI Agent特殊规则
agent:
forbidden_actions: ["delete", "admin"]
enforce_delegation: true # 要求Agent必须代理用户
user_context:
headers:
user_id: "X-User-ID"
email: "X-User-Email"
roles: "X-User-Roles"
tenant_id: "X-Tenant-ID"
is_agent: "X-Is-Agent"
agent_type: "X-Agent-Type"
6.3 测试验证
测试用例1: 普通用户请求
# 1. 获取用户Token
USER_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=password" \
-d "username=alice@example.com" \
-d "password=secret" \
| jq -r '.access_token')
# 2. 请求API
curl -H "Authorization: Bearer $USER_TOKEN" \
https://api.yourservice.com/api/users
# 3. 检查后端收到的Header
# X-User-ID: user_123
# X-User-Roles: admin
# X-Is-Agent: False
测试用例2: AI Agent请求
# 1. 获取Agent Token
AGENT_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=client_credentials" \
-d "client_id=agent_ai_assistant" \
-d "client_secret=agent_secret" \
| jq -r '.access_token')
# 2. 请求API
curl -H "Authorization: Bearer $AGENT_TOKEN" \
https://api.yourservice.com/api/posts
# 3. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-Agent-Type: ai_assistant
测试用例3: Agent代理用户请求
# 1. 用户授权Agent
USER_TOKEN="user_token_here"
# 2. Agent交换令牌
DELEGATED_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
-d "grant_type=urn:ietf:params:oauth:grant-type:token-exchange" \
-d "subject_token=$USER_TOKEN" \
-d "client_id=agent_ai_assistant" \
-d "client_secret=agent_secret" \
| jq -r '.access_token')
# 3. Agent代表用户请求
curl -H "Authorization: Bearer $DELEGATED_TOKEN" \
https://api.yourservice.com/api/users
# 4. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-On-Behalf-Of: user_123 ← 代理的用户
6.4 常见问题解决
问题1: 公钥获取失败
原因: IAM的JWKS端点不可达或格式错误
解决:
# 添加重试和降级机制
from tenacity import retry, stop_after_attempt, wait_exponential
class IAMPublicKeyProvider:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def get_public_key(self, kid: str) -> str:
# ... 获取公钥逻辑
pass
def get_cached_or_default_key(self, kid: str) -> str:
"""降级方案:使用缓存的公钥"""
try:
return self.get_public_key(kid)
except Exception as e:
logger.error(f"Failed to get public key: {e}")
# 返回缓存的公钥
cached_key = self.redis.get(f"public_key:{kid}")
if cached_key:
return cached_key.decode()
raise
问题2: Token过期但用户仍在使用
原因: Token过期时间太短,用户体验不好
解决: 实现Refresh Token机制
@app.errorhandler(401)
def handle_expired_token(error):
if "Token has expired" in str(error):
return jsonify({
"error": "token_expired",
"message": "Please refresh your token",
"refresh_endpoint": "/oauth/token",
"hint": "Use refresh_token grant"
}), 401
return jsonify({"error": str(error)}), 401
# 客户端自动刷新
def request_with_auto_refresh(url, access_token, refresh_token):
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
if response.status_code == 401 and response.json().get("error") == "token_expired":
# 自动刷新token
new_tokens = refresh_access_token(refresh_token)
# 重试请求
return requests.get(url, headers={"Authorization": f"Bearer {new_tokens['access_token']}"})
return response
问题3: AI Agent权限过大
原因: Agent使用了用户的完整权限
解决: 使用Scope限定Agent权限
def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
# 检查Token的scope
token_scope = user_identity.get("scope", "")
scopes = set(token_scope.split())
# Agent只有基础scope,不能执行管理操作
if "admin" in policy.get("required_roles", []):
return "admin:full" in scopes
# 其他操作需要对应的scope
action = policy.get("action")
required_scope = f"{action}:{resource_type}"
return required_scope in scopes
总结
核心要点
-
IAM职责: 颁发令牌、管理身份、定义策略
-
JWT优势: 自包含、无状态、适合分布式
-
网关核心:
- 验证令牌签名
- 还原用户身份
- 执行权限检查
- 传递用户上下文
-
AI Agent特殊处理:
- 使用Client Credentials或Token Exchange
- 限定权限scope
- 记录代理关系(on-behalf-of)
最佳实践
| 场景 | 推荐方案 |
|---|---|
| 用户令牌 | JWT (RS256), 短期(1小时) + Refresh Token |
| AI Agent令牌 | JWT (RS256), 长期(7天), 限定scope |
| 网关验证 | 本地验证JWT签名 + 黑名单检查 |
| 权限模型 | 简单场景用RBAC, 复杂场景用ABAC(OPA) |
| 用户上下文传递 | 自定义HTTP Header |
安全建议
- 始终使用HTTPS传输Token
- 使用RS256等非对称算法签名JWT
- 设置合理的Token过期时间
- 实现Token黑名单机制
- 定期轮换签名密钥(kid)
- 记录完整的审计日志
- AI Agent使用最小权限原则
主流IAM实现方案深度分析
7.1 云厂商IAM方案对比
国际云厂商
AWS IAM (Amazon Identity and Access Management)
架构特点:
graph TB
subgraph AWS_IAM["AWS IAM架构"]
User[IAM User]
Role[IAM Role]
Policy[Policy Document]
STS[STS临时凭证]
User --> Policy
Role --> Policy
Role --> STS
end
subgraph Resources["AWS资源"]
S3[S3 Bucket]
EC2[EC2 Instance]
Lambda[Lambda Function]
end
User --> |访问| Resources
STS --> |临时凭证| Resources
style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px
核心机制:
- Policy结构 (JSON策略文档):
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::my-bucket/*",
"Condition": {
"IpAddress": {
"aws:SourceIp": "203.0.113.0/24"
}
}
}
]
}
特色功能:
- STS (Security Token Service): 临时凭证,支持跨账号访问
- AssumeRole: 角色扮演机制,适合服务间调用
- Resource-based Policy: 资源级策略,细粒度控制
- Conditions: 基于IP、时间、MFA等条件的策略
Token机制:
- 使用AWS Signature V4签名算法
- Access Key + Secret Key生成签名
- STS临时凭证: Access Key + Secret Key + Session Token (有效期15分钟~12小时)
适用场景:
- AWS生态内的资源访问控制
- 跨账号资源共享
- 服务间的安全调用
局限性:
- 仅限AWS资源,无法管理外部应用
- 学习曲线陡峭,策略语法复杂
- 无内置UI管理界面
Azure AD / Microsoft Entra ID
架构特点:
graph LR
subgraph Azure_AD["Azure AD"]
User[用户]
App[应用注册]
SP[Service Principal]
ManagedID[托管标识]
end
subgraph Protocols["认证协议"]
OAuth[OAuth 2.0]
OIDC[OpenID Connect]
SAML[SAML 2.0]
end
User --> OAuth
App --> OAuth
SP --> OAuth
ManagedID --> OAuth
OAuth --> Resources[Azure资源]
OIDC --> Resources
SAML --> Resources
style Azure_AD fill:#0078d4,stroke:#ffffff,stroke-width:2px
核心机制:
- 应用注册 (Application Registration):
{
"appId": "00000000-0000-0000-0000-000000000000",
"appRoles": [
{
"id": "role-id",
"displayName": "Admin",
"value": "Admin",
"allowedMemberTypes": ["User", "Application"]
}
],
"requiredResourceAccess": [
{
"resourceAppId": "00000003-0000-0000-c000-000000000000",
"resourceAccess": [
{
"id": "e1fe6dd8-ba31-4d61-89e7-88639da4683d",
"type": "Scope"
}
]
}
]
}
- Managed Identity (托管标识):
- System-assigned: 与资源生命周期绑定
- User-assigned: 可复用的独立身份
Token机制:
- 使用标准JWT (RS256签名)
- Access Token: 访问Azure资源
- ID Token: 用户身份信息
- Refresh Token: 刷新访问令牌
Token示例:
{
"aud": "https://graph.microsoft.com",
"iss": "https://sts.windows.net/{tenant-id}/",
"iat": 1706451600,
"nbf": 1706451600,
"exp": 1706455200,
"sub": "user-object-id",
"oid": "user-object-id",
"roles": ["Directory.Read.All"],
"scp": "User.Read Mail.Send",
"tid": "tenant-id"
}
特色功能:
- Conditional Access: 基于风险、设备、位置的条件访问
- Privileged Identity Management (PIM): 即时权限提升
- Identity Protection: 风险检测和自动响应
- B2B/B2C: 支持外部用户和客户身份管理
适用场景:
- 企业级身份管理
- Microsoft 365生态集成
- 混合云环境(Azure + On-Premises)
优势:
- 完整的企业身份解决方案
- 强大的条件访问和风险控制
- 与Windows、Office深度集成
Google Cloud IAM
架构特点:
graph TB
subgraph GCP_IAM["Google Cloud IAM"]
Member[成员<br/>User/SA/Group]
Role[角色<br/>Primitive/Predefined/Custom]
Policy[IAM Policy]
Member --> Policy
Role --> Policy
end
subgraph Resources["资源层级"]
Org[Organization]
Folder[Folder]
Project[Project]
Resource[Resource]
Org --> Folder
Folder --> Project
Project --> Resource
end
Policy --> Resources
style GCP_IAM fill:#4285f4,stroke:#ffffff,stroke-width:2px
核心机制:
- IAM Policy Binding:
{
"bindings": [
{
"role": "roles/storage.objectViewer",
"members": [
"user:alice@example.com",
"serviceAccount:my-sa@project.iam.gserviceaccount.com"
],
"condition": {
"title": "Expires in 2024",
"expression": "request.time < timestamp('2024-12-31T23:59:59Z')"
}
}
]
}
- Service Account:
- 应用身份,非人类用户
- 可以模拟(Impersonate)其他服务账号
- 支持短期密钥(Short-lived credentials)
Token机制:
- 使用标准JWT
- 通过Metadata Server自动获取(GCE/GKE)
- 支持Workload Identity Federation(联合身份)
特色功能:
- Hierarchical Inheritance: 策略继承(Organization → Folder → Project → Resource)
- IAM Conditions: 基于CEL(Common Expression Language)的条件策略
- Workload Identity: 将Kubernetes SA映射到GCP SA,无需密钥
- Policy Analyzer: 分析和审计权限
适用场景:
- GCP资源访问控制
- 多项目/多组织管理
- Kubernetes workload身份管理
优势:
- 简洁的权限模型(Member-Role-Resource)
- 强大的条件策略(CEL表达式)
- Workload Identity消除密钥管理
国内云厂商
阿里云RAM (Resource Access Management)
架构特点:
graph TB
subgraph Alibaba_RAM["阿里云RAM"]
User[RAM用户]
Role[RAM角色]
Policy[权限策略]
STS[STS临时Token]
User --> Policy
Role --> Policy
Role --> STS
end
subgraph Resources["阿里云资源"]
ECS[ECS]
OSS[OSS]
RDS[RDS]
end
User --> Resources
STS --> Resources
style Alibaba_RAM fill:#ff6a00,stroke:#ffffff,stroke-width:2px
核心机制:
- Policy语法 (类似AWS):
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:GetObject",
"oss:PutObject"
],
"Resource": [
"acs:oss:*:*:my-bucket/*"
],
"Condition": {
"IpAddress": {
"acs:SourceIp": ["192.168.0.0/16"]
}
}
}
]
}
Token机制:
- AccessKey/SecretKey: 永久凭证
- STS临时Token: SecurityToken + TempAccessKey + TempSecretKey
- 有效期: 900秒~3600秒
特色功能:
- 跨云账号授权: 支持主账号授权给其他阿里云账号
- 角色SSO: 支持企业IdP集成
- 资源组: 跨产品的资源分组管理
- 操作审计: 记录所有API调用
适用场景:
- 阿里云资源访问控制
- 移动应用临时授权(STS)
- 跨账号资源共享
腾讯云CAM (Cloud Access Management)
架构特点:
graph TB
subgraph Tencent_CAM["腾讯云CAM"]
User[子用户]
Role[角色]
Policy[策略]
Group[用户组]
User --> Group
Group --> Policy
Role --> Policy
end
subgraph Resources["腾讯云资源"]
CVM[CVM]
COS[COS]
TKE[TKE]
end
User --> Resources
Role --> Resources
style Tencent_CAM fill:#006eff,stroke:#ffffff,stroke-width:2px
核心机制:
- Policy结构:
{
"version": "2.0",
"statement": [
{
"effect": "allow",
"action": [
"cos:GetObject",
"cos:PutObject"
],
"resource": [
"qcs::cos:ap-guangzhou:uid/1250000000:my-bucket/*"
],
"condition": {
"ip_equal": {
"qcs:ip": ["10.0.0.1/24"]
}
}
}
]
}
Token机制:
- 永久密钥: SecretId + SecretKey
- 临时密钥: TmpSecretId + TmpSecretKey + Token
- 签名算法: HMAC-SHA256
特色功能:
- 服务角色: 授权腾讯云服务访问其他服务
- 标签授权: 基于资源标签的权限控制
- 会话管理: 查看和撤销子用户登录会话
- 操作保护: 敏感操作二次验证
适用场景:
- 腾讯云资源权限管理
- 企业多账号管理
- 临时授权(STS)
华为云IAM
核心特点:
- 统一身份认证: 支持华为云账号、IAM用户、联邦用户
- 委托授权: 类似AWS的AssumeRole
- 虚拟MFA: 支持虚拟MFA设备
- 自定义策略: 支持细粒度的资源级授权
Token机制:
- 永久AK/SK: Access Key + Secret Key
- 临时Token: 通过委托获取临时凭证
- IAM Token: 基于用户名密码获取的临时token(24小时有效)
7.2 IDaaS方案对比
国际方案
Okta
架构定位: 云原生身份即服务(IDaaS)平台
graph TB
subgraph Okta["Okta平台"]
UD[Universal Directory]
SSO[Single Sign-On]
MFA[Multi-Factor Auth]
LC[Lifecycle Management]
end
subgraph Apps["应用集成"]
SaaS[SaaS应用<br/>5000+]
OnPrem[本地应用]
Custom[自定义应用]
end
subgraph Protocols["协议支持"]
SAML2[SAML 2.0]
OIDC[OIDC]
OAuth2[OAuth 2.0]
end
UD --> SSO
SSO --> Apps
Protocols --> Apps
style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px
核心能力:
-
Universal Directory:
- 集中式用户目录
- 支持LDAP、AD集成
- 自定义用户属性
-
Adaptive MFA:
- 基于风险的自适应认证
- 支持TOTP、WebAuthn、生物识别
- 无密码认证(Passwordless)
-
Lifecycle Management:
- 用户自动Provisioning/Deprovisioning
- 与HR系统集成(Workday、SAP等)
Token机制:
- 标准JWT (RS256)
- 短期Access Token (1小时)
- 长期Refresh Token (可配置)
OAuth 2.0 Flow支持:
- Authorization Code + PKCE
- Client Credentials
- Resource Owner Password (不推荐)
- Device Authorization
特色功能:
- Okta Workflows: 低代码自动化(类似Zapier)
- API Access Management: 为自己的API提供OAuth服务器
- Customer Identity (CIAM): B2C场景的用户管理
适用场景:
- 企业SSO门户
- 应用集成中心
- 客户身份管理(CIAM)
优势:
- 5000+ 预集成应用
- 强大的自动化能力
- 完善的开发者生态
定价:
- 按用户数收费($2~15/user/month)
- 高昂的企业成本
Auth0 (by Okta)
架构定位: 面向开发者的身份平台
graph LR
subgraph Auth0["Auth0"]
Rules[Rules引擎]
Actions[Actions]
Connections[Identity Connections]
MFA[MFA]
end
subgraph Integrations["身份源"]
Social[社交登录<br/>Google/FB/GitHub]
Enterprise[企业IdP<br/>AD/SAML]
DB[用户名密码]
end
Integrations --> Connections
Connections --> Rules
Rules --> Actions
style Auth0 fill:#eb5424,stroke:#ffffff,stroke-width:2px
核心能力:
-
Universal Login:
- 托管登录页面
- 自定义品牌和UI
- 跨域SSO
-
Rules & Actions:
- 可编程的认证流程
- JavaScript/Node.js编写
- 支持调用外部API
Rules示例:
function addRolesToToken(user, context, callback) {
// 从数据库查询用户角色
const roles = getUserRoles(user.email);
// 添加到Token的claims
context.idToken['https://myapp.com/roles'] = roles;
context.accessToken['https://myapp.com/roles'] = roles;
callback(null, user, context);
}
- Organizations (B2B多租户):
- 租户隔离
- 每个租户独立的身份源
- 租户级别的品牌定制
Token机制:
- 标准JWT
- 支持自定义Claims
- Token签名算法可配置(RS256/HS256)
特色功能:
- Extensibility: 高度可定制的认证流程
- Social Connections: 30+社交登录提供商
- Passwordless: 邮箱/短信验证码登录
- Breached Password Detection: 泄露密码检测
适用场景:
- SaaS应用的用户认证
- 移动应用登录
- 多租户B2B应用
优势:
- 开发者友好的API和SDK
- 灵活的可扩展性
- 快速集成(几小时完成)
定价:
- 免费版: 7000 MAU (月活用户)
- 付费版: $35~240/month (起步价)
国内方案
身份宝 (IDaaS by 阿里云)
架构定位: 企业级身份管理云服务
graph TB
subgraph IDaaS["身份宝"]
UD[统一用户目录]
SSO[单点登录]
MFA[多因素认证]
Sync[账号同步]
end
subgraph Sources["身份源"]
AD[Active Directory]
LDAP[LDAP]
DingTalk[钉钉]
WeChat[企业微信]
end
subgraph Apps["应用"]
Internal[内部应用]
SaaS[SaaS应用]
Alibaba[阿里云控制台]
end
Sources --> Sync
Sync --> UD
UD --> SSO
SSO --> Apps
style IDaaS fill:#ff6a00,stroke:#ffffff,stroke-width:2px
核心能力:
-
多源身份集成:
- AD/LDAP同步
- 钉钉、企业微信集成
- HR系统对接
-
应用门户:
- 企业应用市场
- 统一工作台
- 移动端APP
-
权限管理:
- RBAC权限模型
- 应用级授权
- 数据权限控制
特色功能:
- 国密支持: SM2/SM3/SM4算法
- 本地化部署: 支持私有化部署
- 合规认证: 等保三级、ISO 27001
适用场景:
- 国企/央企身份管理
- 需要本地化部署的场景
- 阿里云生态深度用户
玉符 (AuthingCloud)
架构定位: 开发者友好的身份云
graph LR
subgraph Authing["玉符"]
Core[认证核心]
Pipeline[Pipeline扩展]
Rules[业务规则]
end
subgraph Features["功能"]
Social[社交登录]
MFA[MFA]
RBAC[权限管理]
Webhooks[Webhooks]
end
Core --> Pipeline
Pipeline --> Rules
Features --> Core
style Authing fill:#396aff,stroke:#ffffff,stroke-width:2px
核心能力:
-
快速集成:
- 5分钟接入
- 丰富的SDK(20+语言)
- 托管登录页
-
Pipeline机制:
- 可编程的认证流程
- 类似Auth0的Rules
- 支持JavaScript/Python
Pipeline示例:
async function pipe(user, context, callback) {
// 从自己的数据库获取额外信息
const userInfo = await getUserFromDB(user.id);
// 添加到Token
user.customData = {
department: userInfo.department,
level: userInfo.level
};
return callback(null, user, context);
}
- 多租户SaaS:
- 租户隔离
- 独立用户池
- 品牌定制
Token机制:
- 标准JWT (RS256/HS256)
- 自定义Claims
- Token生命周期可配置
特色功能:
- 国内社交登录: 微信、支付宝、QQ等
- 小程序登录: 微信小程序一键登录
- 实名认证: 集成第三方实名认证服务
- 国密支持: 符合国内合规要求
适用场景:
- 互联网应用/SaaS
- 移动应用/小程序
- 需要国内社交登录的场景
优势:
- 国内网络环境优化
- 微信生态深度集成
- 性价比高
定价:
- 免费版: 7000 MAU
- 付费版: ¥3000~30000/year
竹云 (BambooCloud)
架构定位: 企业级IAM全栈方案
核心能力:
- 4A统一管控: Account(账号)、Authentication(认证)、Authorization(授权)、Audit(审计)
- 特权账号管理(PAM): 运维人员权限管控
- 堡垒机集成: 安全运维审计
特色功能:
- 面向传统企业/政府
- 强调合规和审计
- 支持私有化部署
适用场景:
- 大型企业集团
- 金融/政府行业
- 需要PAM能力的场景
7.3 开源IAM方案
Keycloak
架构定位: 开源身份和访问管理解决方案
graph TB
subgraph Keycloak["Keycloak"]
Realm[Realm<br/>租户隔离]
Client[Client<br/>应用注册]
User[User Federation]
IDP[Identity Brokering]
end
subgraph Features["功能"]
SSO[SSO]
OAuth[OAuth 2.0/OIDC]
SAML[SAML 2.0]
LDAP[LDAP/AD]
end
Realm --> Client
Realm --> User
User --> LDAP
IDP --> Features
style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px
核心能力:
-
Realm多租户:
- 每个Realm独立的用户、角色、客户端
- 跨Realm SSO
-
User Federation:
- LDAP/AD集成
- 自定义User Storage SPI
-
Identity Brokering:
- 作为OAuth/OIDC代理
- 支持社交登录(Google、GitHub等)
Token机制:
- 标准JWT (RS256)
- Access Token / ID Token / Refresh Token
- Token mapper自定义Claims
Client配置示例:
{
"clientId": "my-app",
"enabled": true,
"clientAuthenticatorType": "client-secret",
"redirectUris": ["https://myapp.com/callback"],
"webOrigins": ["https://myapp.com"],
"protocol": "openid-connect",
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": false
}
特色功能:
- Admin Console: 完善的Web管理界面
- Account Console: 用户自服务门户
- Custom Themes: 自定义登录页面
- Event Listeners: 审计和集成
适用场景:
- 企业内部应用SSO
- 微服务身份管理
- 需要自主可控的场景
优势:
- 完全开源免费
- 功能完整(不输商业产品)
- 活跃的社区支持
劣势:
- 学习曲线陡峭
- 资源消耗较大(Java应用)
- 需要自己运维
Casdoor
架构定位: 国产化开源IAM平台
graph LR
subgraph Casdoor["Casdoor"]
Org[Organization]
App[Application]
User[User]
Provider[Provider]
end
subgraph Features["特色"]
WeChat[微信登录]
Alipay[支付宝]
DingTalk[钉钉]
LDAP[LDAP]
end
Org --> App
App --> User
Provider --> Features
style Casdoor fill:#2e7d32,stroke:#ffffff,stroke-width:2px
核心能力:
- 多组织管理: Organization概念隔离租户
- 国内生态: 微信、支付宝、钉钉等登录
- Casbin集成: 内置权限管理
Token机制:
- 标准JWT
- 支持自定义字段
特色功能:
- 全中文界面和文档
- 支持国密算法
- 轻量级(Go语言开发)
适用场景:
- 国内中小企业
- 需要快速部署的场景
- 对国密有要求的项目
7.4 方案选型决策树
graph TB
Start{需求分析}
Start --> Q1{部署方式?}
Q1 -->|云服务| Q2{预算?}
Q1 -->|私有化| Q3{规模?}
Q2 -->|充足| Okta[Okta/Auth0]
Q2 -->|有限| Authing[玉符/身份宝]
Q3 -->|大型| Keycloak[Keycloak]
Q3 -->|中小型| Casdoor[Casdoor]
Start --> Q4{场景?}
Q4 -->|云资源| Cloud{云厂商?}
Q4 -->|应用| App{类型?}
Cloud -->|AWS| AWS_IAM[AWS IAM]
Cloud -->|Azure| Azure_AD[Azure AD]
Cloud -->|阿里云| Alibaba_RAM[阿里云RAM]
Cloud -->|腾讯云| Tencent_CAM[腾讯云CAM]
App -->|B2B SaaS| Auth0
App -->|B2C| Authing
App -->|企业内部| Keycloak
style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px
style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px
style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px
7.5 综合对比表
| 方案 | 类型 | 部署方式 | 适用场景 | Token类型 | 价格 | 优势 | 劣势 |
|---|---|---|---|---|---|---|---|
| AWS IAM | 云IAM | SaaS | AWS资源 | AWS签名 | 按使用量 | AWS生态 | 仅限AWS |
| Azure AD | 企业IdP | SaaS | 企业SSO | JWT | $6~21/user/month | 企业功能完整 | 价格较高 |
| Okta | IDaaS | SaaS | 应用SSO | JWT | $2~15/user/month | 应用集成多 | 价格昂贵 |
| Auth0 | IDaaS | SaaS | 开发者 | JWT | $35~240/month | 开发者友好 | MAU限制 |
| Keycloak | 开源 | 自建 | 企业内部 | JWT | 免费 | 功能完整 | 运维成本 |
| 阿里云RAM | 云IAM | SaaS | 阿里云资源 | STS Token | 按使用量 | 阿里云集成 | 仅限阿里云 |
| 玉符 | IDaaS | SaaS | 国内应用 | JWT | ¥3000~30000/year | 国内生态 | 功能相对简单 |
| 身份宝 | IDaaS | SaaS/私有化 | 企业级 | JWT | 定制报价 | 本地化支持 | 价格不透明 |
| Casdoor | 开源 | 自建 | 中小企业 | JWT | 免费 | 轻量级 | 社区较小 |
7.6 架构设计建议
场景一: 初创公司 (0~50人)
推荐方案: Auth0 / 玉符
架构:
用户 → Auth0托管登录 → 应用 (接收JWT)
理由:
- 快速集成(1天完成)
- 无需运维
- 成本可控(免费额度内)
场景二: 成长期公司 (50~500人)
推荐方案: Keycloak (自建) / Okta (预算充足)
架构:
用户 → Keycloak SSO → 内部应用(10+)
→ SaaS应用(SAML)
→ API网关(JWT)
理由:
- 应用数量增加,SSO需求
- 自建Keycloak节省成本
- 支持LDAP集成企业AD
场景三: 大型企业 (500+人)
推荐方案: Azure AD (Microsoft生态) / 混合架构
混合架构:
[Azure AD] ←→ [On-Premises AD]
↓
[应用网关]
↓ ↓
[内部应用] [SaaS应用]
↓
[Keycloak] (微服务内部)
理由:
- 企业级功能(PIM、条件访问)
- 与Office 365集成
- 混合云支持
- 微服务层使用轻量级Keycloak
场景四: 多云/混合云
推荐方案: Workload Identity Federation
架构:
[GKE Pod] → [Workload Identity] → [GCP Service Account]
↓
[AssumeRoleWithWebIdentity]
↓
[AWS IAM Role]
关键技术:
- Google Workload Identity
- AWS IAM OIDC Provider
- Azure Managed Identity
理由:
- 无需管理云凭证
- 支持跨云访问
- 最小权限原则
7.7 实战建议
对于您的问题场景
基于您的需求:
客户的IAM系统给用户和AI Agent颁发身份令牌,网关需要还原身份并进行权限控制
推荐方案:
-
如果客户IAM是云厂商的:
- AWS: 使用STS AssumeRole
- Azure: 使用Managed Identity
- 阿里云: 使用RAM STS
-
如果客户IAM是企业自建的:
- 要求客户IAM提供JWKS端点
- 网关使用公钥验证JWT
- 实现本文档第6章的方案
-
如果需要从零建设IAM:
- 小型项目: 使用Auth0/玉符
- 中型项目: 自建Keycloak
- 大型项目: Azure AD + Keycloak混合
关键对接点:
# 与客户IAM的对接清单
1. JWKS端点: https://customer-iam.com/.well-known/jwks.json
2. Token格式: JWT (RS256)
3. Claims约定:
- sub: 用户/Agent ID
- roles: 角色数组
- permissions: 权限数组
- agent_type: AI Agent标识
4. Token有效期:
- 用户Token: 1小时
- Agent Token: 7天
5. 撤销机制:
- 方式1: jti黑名单(Redis)
- 方式2: Introspection端点
参考资源
标准规范
- RFC 7519: JWT
- RFC 8693: Token Exchange
- RFC 7662: Token Introspection
- OAuth 2.1
- OpenID Connect Core
- NIST SP 800-63B: Digital Identity Guidelines
云厂商文档
开源项目
书籍推荐
- OAuth 2.0 in Action by Justin Richer
- Solving Identity Management in Modern Applications by Yvonne Wilson
- Zero Trust Networks by Evan Gilman