身份认证与访问控制深度理论知识

#安全

身份认证与访问控制深度理论知识

本文档专注于IAM系统、令牌机制、网关认证鉴权的完整理论与实践

目录

  1. IAM基础理论
  2. 身份令牌机制
  3. 网关认证鉴权架构
  4. 权限控制模型
  5. AI Agent身份管理
  6. 实战场景解决方案

IAM基础理论

1.1 核心概念辨析

graph TB
    subgraph Identity["身份 (Identity)"]
        User["用户<br/>• 人类用户<br/>• 服务账号<br/>• AI Agent"]
        Attributes["身份属性<br/>• ID: user_123<br/>• Email<br/>• Roles<br/>• Groups"]
    end

    subgraph Authentication["认证 (Authentication)"]
        Verify["验证身份<br/>回答: 你是谁?"]
        Methods["认证方式<br/>• 密码<br/>• MFA<br/>• 证书<br/>• 生物识别"]
    end

    subgraph Authorization["授权 (Authorization)"]
        CheckPerm["检查权限<br/>回答: 你能做什么?"]
        Models["权限模型<br/>• RBAC<br/>• ABAC<br/>• ReBAC"]
    end

    Identity --> Authentication
    Authentication --> Authorization

    style Identity fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style Authentication fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
    style Authorization fill:#e8f5e9,stroke:#43a047,stroke-width:2px

关键区别:

概念问题验证内容结果
身份 (Identity)你声称是谁?身份标识符 (ID, Email)身份声明
认证 (Authentication)你真的是这个人吗?凭证 (密码, Token, 证书)身份确认
授权 (Authorization)你可以访问这个资源吗?权限、角色、策略访问决策

1.2 IAM系统的核心职责

graph LR
    subgraph IAM["IAM系统"]
        IM[身份管理<br/>Identity Management]
        AM[访问管理<br/>Access Management]
        PM[策略管理<br/>Policy Management]
        TM[令牌管理<br/>Token Management]
    end

    IM --> |用户、组、角色| AM
    PM --> |定义权限规则| AM
    AM --> |颁发令牌| TM

    External[外部请求] --> TM
    TM --> |验证令牌| Gateway[API网关]

IAM的四大支柱:

  1. 身份管理 (Identity Management)

    • 用户生命周期管理(创建、更新、删除)
    • 身份属性存储(用户信息、元数据)
    • 身份联合(SSO、LDAP、SAML)
  2. 访问管理 (Access Management)

    • 认证流程(登录、MFA)
    • 会话管理(Session、Cookie)
    • 令牌颁发(JWT、OAuth Token)
  3. 策略管理 (Policy Management)

    • 权限定义(RBAC角色、ABAC策略)
    • 策略存储与分发
    • 策略评估引擎
  4. 令牌管理 (Token Management)

    • 令牌生成与签名
    • 令牌验证与刷新
    • 令牌撤销(黑名单)

身份令牌机制

2.1 令牌类型对比

令牌类型特点验证方式适用场景安全性
JWT (自包含令牌)令牌本身包含用户信息本地验证签名无状态API、微服务中(无法主动撤销)
Opaque Token (引用令牌)随机字符串,需查询获取信息调用IAM验证需要集中控制、可撤销高(集中管理)
Session ID服务器端存储,客户端持有ID查询Session存储单体应用、Web应用高(服务器控制)

2.2 JWT详解

JWT结构

JWT = Header.Payload.Signature

完整示例:

eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzEyMyIsIm5hbWUiOiJBbGljZSIsInJvbGVzIjpbImFkbWluIl0sImlhdCI6MTcwNjQ1MTYwMCwiZXhwIjoxNzA2NDU1MjAwfQ.signature_here

Header (头部):

{
  "alg": "RS256",        // 签名算法: RSA + SHA256
  "typ": "JWT",          // 令牌类型
  "kid": "key-2024-01"   // 密钥ID (用于密钥轮换)
}

Payload (负载):

{
  // 标准声明 (Standard Claims)
  "sub": "user_123",           // Subject: 用户ID
  "iss": "https://iam.example.com",  // Issuer: 颁发者
  "aud": "api.example.com",    // Audience: 目标受众
  "exp": 1706455200,           // Expiration: 过期时间 (Unix时间戳)
  "iat": 1706451600,           // Issued At: 颁发时间
  "nbf": 1706451600,           // Not Before: 生效时间
  "jti": "jwt-uuid-456",       // JWT ID: 唯一标识

  // 自定义声明 (Custom Claims)
  "name": "Alice",
  "email": "alice@example.com",
  "roles": ["admin", "editor"],
  "permissions": ["read:users", "write:posts"],
  "tenant_id": "org_789",
  "agent_type": "ai_assistant"  // AI Agent标识
}

Signature (签名):

// 签名算法
RSASHA256(
  base64UrlEncode(header) + "." + base64UrlEncode(payload),
  privateKey  // 使用私钥签名
)

JWT签名算法对比

算法类型密钥安全性性能适用场景
HS256对称共享密钥单体应用、内部服务
RS256非对称公钥/私钥分布式系统、多方验证
ES256非对称椭圆曲线移动应用、IoT

安全建议:

  • 微服务网关: RS256 (网关只需公钥验证,IAM持有私钥签名)
  • 内部服务间: HS256 (性能更好,密钥可共享)
  • 移动端/前端: ES256 (签名短,性能平衡)

2.3 Opaque Token机制

sequenceDiagram
    participant Client as 客户端
    participant Gateway as API网关
    participant IAM as IAM系统
    participant Redis as Token存储
    participant Service as 后端服务

    Note over Client,IAM: 1. 令牌颁发阶段
    Client->>IAM: 登录请求 (username + password)
    IAM->>Redis: 存储令牌信息<br/>token_abc123 → {user_id, roles, ...}
    IAM->>Client: 返回 Opaque Token: abc123xyz

    Note over Client,Service: 2. 令牌验证阶段
    Client->>Gateway: 请求 + Token: abc123xyz
    Gateway->>IAM: 验证令牌 (Introspection Endpoint)
    IAM->>Redis: 查询 token_abc123
    Redis->>IAM: 返回用户信息
    IAM->>Gateway: 返回验证结果 + 用户信息
    Gateway->>Service: 转发请求 (附加用户上下文)
    Service->>Gateway: 响应
    Gateway->>Client: 响应

Opaque Token的优势:

  1. 可主动撤销: 删除Redis中的令牌即可立即失效
  2. 安全性高: 令牌本身不包含任何信息,泄露后无法直接读取
  3. 集中控制: IAM系统完全控制令牌生命周期

劣势:

  1. 性能开销: 每次请求都需要调用IAM验证
  2. 可用性依赖: IAM/Redis故障会影响所有服务
  3. 网络延迟: 增加额外的网络调用

2.4 JWT vs Opaque Token 选型

graph TB
    Start{令牌选型}

    Start --> Q1{需要主动撤销?}
    Q1 -->|是| Opaque[Opaque Token]
    Q1 -->|否| Q2{高并发场景?}

    Q2 -->|是| JWT[JWT]
    Q2 -->|否| Q3{多方验证?}

    Q3 -->|是| JWT
    Q3 -->|否| Q4{对性能敏感?}

    Q4 -->|是| JWT
    Q4 -->|否| Opaque

    style JWT fill:#e8f5e9,stroke:#43a047,stroke-width:2px
    style Opaque fill:#fff3e0,stroke:#ef6c00,stroke-width:2px

推荐方案:

场景推荐理由
高并发APIJWT无需每次查询IAM,减少延迟
金融/安全敏感Opaque可主动撤销,集中控制
微服务网关JWT网关本地验证,降低IAM压力
内部管理系统Opaque需要实时权限更新
AI AgentJWT长期有效,减少认证次数

混合方案 (推荐):

用户登录 → IAM颁发 JWT (短期,1小时) + Refresh Token (Opaque, 7天)
- JWT: 日常API访问,网关本地验证
- Refresh Token: 刷新JWT,通过IAM验证
- 撤销: 将JWT的jti加入黑名单 (Redis)

网关认证鉴权架构

3.1 完整认证鉴权流程

sequenceDiagram
    participant Client as 客户端
    participant Gateway as API网关
    participant IAM as IAM系统
    participant AuthZ as 权限服务
    participant Service as 后端服务

    Note over Client,Service: 阶段1: 认证 (Authentication)
    Client->>Gateway: 请求 + JWT Token

    Note over Gateway: 1.1 Token格式验证
    alt Token格式错误
        Gateway->>Client: 401 Unauthorized
    end

    Note over Gateway: 1.2 签名验证
    Gateway->>Gateway: 使用IAM公钥验证签名
    alt 签名验证失败
        Gateway->>Client: 401 Invalid Token
    end

    Note over Gateway: 1.3 过期时间检查
    alt Token已过期
        Gateway->>Client: 401 Token Expired
    end

    Note over Gateway: 1.4 黑名单检查
    Gateway->>IAM: 检查jti是否在黑名单
    alt 在黑名单中
        Gateway->>Client: 401 Token Revoked
    end

    Note over Client,Service: 阶段2: 身份还原
    Note over Gateway: 2.1 解析JWT Payload
    Gateway->>Gateway: 提取用户信息:<br/>• sub (用户ID)<br/>• roles<br/>• permissions<br/>• tenant_id

    Note over Gateway: 2.2 构建用户上下文
    Gateway->>Gateway: 创建 UserContext 对象

    Note over Client,Service: 阶段3: 授权 (Authorization)
    Note over Gateway: 3.1 路径匹配
    Gateway->>Gateway: 请求路径: POST /api/users

    Note over Gateway: 3.2 权限检查
    alt 简单RBAC (网关实现)
        Gateway->>Gateway: 检查 roles 是否包含 "admin"
    else 复杂策略 (调用权限服务)
        Gateway->>AuthZ: 请求授权决策<br/>{user, resource, action}
        AuthZ->>Gateway: 返回 Allow/Deny
    end

    alt 权限不足
        Gateway->>Client: 403 Forbidden
    end

    Note over Client,Service: 阶段4: 请求转发
    Gateway->>Service: 转发请求 + 用户上下文<br/>Headers:<br/>• X-User-ID: user_123<br/>• X-User-Roles: admin,editor<br/>• X-Tenant-ID: org_789

    Note over Service: 服务信任网关<br/>无需再次认证
    Service->>Gateway: 响应
    Gateway->>Client: 响应

3.2 网关身份还原实现

方案一: 从JWT中提取(推荐)

# 网关实现示例
import jwt
from typing import Dict, List

class JWTAuthenticator:
    def __init__(self, public_key: str):
        self.public_key = public_key

    def verify_and_extract(self, token: str) -> Dict:
        """验证JWT并提取用户信息"""
        try:
            # 1. 验证签名和过期时间
            payload = jwt.decode(
                token,
                self.public_key,
                algorithms=["RS256"],
                audience="api.example.com",
                issuer="https://iam.example.com"
            )

            # 2. 提取用户信息
            user_context = {
                "user_id": payload.get("sub"),
                "email": payload.get("email"),
                "name": payload.get("name"),
                "roles": payload.get("roles", []),
                "permissions": payload.get("permissions", []),
                "tenant_id": payload.get("tenant_id"),
                "agent_type": payload.get("agent_type"),  # AI Agent标识
                "exp": payload.get("exp")
            }

            # 3. 黑名单检查(可选)
            jti = payload.get("jti")
            if jti and self.is_blacklisted(jti):
                raise Exception("Token has been revoked")

            return user_context

        except jwt.ExpiredSignatureError:
            raise Exception("Token expired")
        except jwt.InvalidTokenError as e:
            raise Exception(f"Invalid token: {str(e)}")

    def is_blacklisted(self, jti: str) -> bool:
        """检查JWT ID是否在黑名单中"""
        # 查询Redis黑名单
        return redis_client.sismember("jwt:blacklist", jti)

方案二: 调用IAM Introspection端点

class OpaqueTokenAuthenticator:
    def __init__(self, iam_url: str):
        self.iam_url = iam_url

    def verify_and_extract(self, token: str) -> Dict:
        """调用IAM验证Opaque Token"""
        # RFC 7662: OAuth Token Introspection
        response = requests.post(
            f"{self.iam_url}/oauth/introspect",
            data={"token": token},
            auth=(gateway_client_id, gateway_client_secret)
        )

        result = response.json()

        if not result.get("active"):
            raise Exception("Token is not active")

        return {
            "user_id": result.get("sub"),
            "roles": result.get("roles", []),
            "permissions": result.get("permissions", []),
            "tenant_id": result.get("tenant_id"),
            "exp": result.get("exp")
        }

3.3 用户上下文传递

网关验证完成后,需要将用户信息传递给后端服务:

方式一: HTTP Header传递(推荐)

# 网关添加自定义Header
GET /api/users HTTP/1.1
Host: user-service
Authorization: Bearer <original_jwt>
X-User-ID: user_123
X-User-Email: alice@example.com
X-User-Roles: admin,editor
X-User-Permissions: read:users,write:posts
X-Tenant-ID: org_789
X-Agent-Type: ai_assistant

后端服务获取用户上下文:

from flask import request

def get_current_user():
    return {
        "user_id": request.headers.get("X-User-ID"),
        "roles": request.headers.get("X-User-Roles", "").split(","),
        "permissions": request.headers.get("X-User-Permissions", "").split(","),
        "tenant_id": request.headers.get("X-Tenant-ID"),
        "agent_type": request.headers.get("X-Agent-Type")
    }

# 在业务逻辑中使用
@app.route("/api/users")
def list_users():
    user = get_current_user()
    if "admin" not in user["roles"]:
        return {"error": "Forbidden"}, 403
    # ...业务逻辑

方式二: 重新签名JWT传递

网关验证原始JWT后,生成内部JWT传递给服务:

def create_internal_jwt(user_context: Dict) -> str:
    """创建内部服务使用的JWT"""
    payload = {
        "sub": user_context["user_id"],
        "roles": user_context["roles"],
        "iss": "api-gateway",
        "aud": "internal-services",
        "exp": time.time() + 300  # 5分钟短期有效
    }
    return jwt.encode(payload, internal_secret, algorithm="HS256")

3.4 网关配置示例

Kong网关配置

services:
  - name: user-service
    url: http://user-service:8080
    routes:
      - name: user-routes
        paths:
          - /api/users
    plugins:
      # JWT验证插件
      - name: jwt
        config:
          key_claim_name: kid
          secret_is_base64: false
          uri_param_names:
            - jwt
          cookie_names:
            - jwt
          claims_to_verify:
            - exp

      # 权限控制插件
      - name: acl
        config:
          whitelist:
            - admin
            - editor

      # 请求转换插件 (添加用户上下文)
      - name: request-transformer
        config:
          add:
            headers:
              - X-User-ID:$(jwt.sub)
              - X-User-Roles:$(jwt.roles)
              - X-Tenant-ID:$(jwt.tenant_id)

Nginx + Lua脚本

-- nginx.conf
location /api/ {
    access_by_lua_block {
        local jwt = require "resty.jwt"
        local cjson = require "cjson"

        -- 1. 提取Token
        local auth_header = ngx.var.http_Authorization
        local token = string.match(auth_header, "Bearer%s+(.+)")

        if not token then
            ngx.status = 401
            ngx.say(cjson.encode({error = "Missing token"}))
            return ngx.exit(401)
        end

        -- 2. 验证JWT
        local jwt_obj = jwt:verify(public_key, token)

        if not jwt_obj.verified then
            ngx.status = 401
            ngx.say(cjson.encode({error = "Invalid token"}))
            return ngx.exit(401)
        end

        -- 3. 提取用户信息
        local payload = jwt_obj.payload
        ngx.req.set_header("X-User-ID", payload.sub)
        ngx.req.set_header("X-User-Roles", table.concat(payload.roles, ","))
        ngx.req.set_header("X-Tenant-ID", payload.tenant_id)
    }

    proxy_pass http://backend-service;
}

权限控制模型

4.1 RBAC (基于角色的访问控制)

graph LR
    User[用户: Alice] --> UR[用户-角色] --> Role[角色: Admin]
    Role --> RP[角色-权限] --> Perm1[权限: read:users]
    Role --> RP --> Perm2[权限: write:users]
    Role --> RP --> Perm3[权限: delete:users]

    style User fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style Role fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
    style Perm1 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
    style Perm2 fill:#e8f5e9,stroke:#43a047,stroke-width:2px
    style Perm3 fill:#e8f5e9,stroke:#43a047,stroke-width:2px

网关实现RBAC:

class RBACAuthorizer:
    def __init__(self):
        # 权限规则配置
        self.rules = {
            "/api/users": {
                "GET": ["admin", "viewer"],
                "POST": ["admin"],
                "PUT": ["admin"],
                "DELETE": ["admin"]
            },
            "/api/posts": {
                "GET": ["admin", "editor", "viewer"],
                "POST": ["admin", "editor"],
                "PUT": ["admin", "editor"],
                "DELETE": ["admin"]
            }
        }

    def authorize(self, user_roles: List[str], path: str, method: str) -> bool:
        """检查用户是否有权限访问资源"""
        required_roles = self.rules.get(path, {}).get(method, [])

        if not required_roles:
            return True  # 无限制

        # 检查用户是否拥有任一所需角色
        return any(role in required_roles for role in user_roles)

    def check(self, user_context: Dict, request_path: str, request_method: str):
        user_roles = user_context.get("roles", [])

        if not self.authorize(user_roles, request_path, request_method):
            raise PermissionDenied(
                f"User with roles {user_roles} cannot {request_method} {request_path}"
            )

4.2 ABAC (基于属性的访问控制)

更灵活的权限模型,基于用户属性、资源属性、环境属性进行决策。

class ABACAuthorizer:
    def authorize(self, user_context: Dict, resource: Dict, action: str, environment: Dict) -> bool:
        """基于属性的授权决策"""

        # 示例策略: 只有同租户的管理员才能删除资源
        if action == "delete":
            return (
                "admin" in user_context.get("roles", []) and
                user_context.get("tenant_id") == resource.get("tenant_id")
            )

        # 示例策略: 工作时间才能访问财务数据
        if resource.get("category") == "financial":
            current_hour = environment.get("hour")
            return 9 <= current_hour <= 18

        # 示例策略: 只能访问自己部门的数据
        if action == "read":
            return user_context.get("department") == resource.get("department")

        return False

网关集成ABAC(调用OPA):

import requests

class OPAAuthorizer:
    def __init__(self, opa_url: str):
        self.opa_url = opa_url

    def authorize(self, user_context: Dict, resource_path: str, action: str) -> bool:
        """调用OPA进行授权决策"""
        policy_input = {
            "input": {
                "user": user_context,
                "resource": {"path": resource_path},
                "action": action,
                "environment": {
                    "time": datetime.now().isoformat(),
                    "ip": request.remote_addr
                }
            }
        }

        response = requests.post(
            f"{self.opa_url}/v1/data/authz/allow",
            json=policy_input
        )

        result = response.json()
        return result.get("result", False)

OPA策略示例 (Rego语言):

package authz

# 默认拒绝
default allow = false

# 管理员可以访问所有资源
allow {
    input.user.roles[_] == "admin"
}

# 用户只能访问自己租户的资源
allow {
    input.user.tenant_id == input.resource.tenant_id
    input.action == "read"
}

# AI Agent只能读取,不能修改
allow {
    input.user.agent_type == "ai_assistant"
    input.action == "read"
}

AI Agent身份管理

5.1 AI Agent的身份特点

AI Agent与普通用户的区别:

特性普通用户AI Agent
身份类型人类用户服务账号
认证方式密码 + MFAAPI Key / Client Credentials
令牌有效期短期 (1小时)长期 (7天~永久)
权限范围基于角色基于用途限定
审计要求记录操作人记录代理用户 (on-behalf-of)

5.2 AI Agent的令牌设计

{
  "sub": "agent_ai_assistant_123",
  "agent_type": "ai_assistant",
  "agent_name": "Customer Support Bot",

  "on_behalf_of": "user_789",  // 代表哪个用户
  "delegated_by": "user_789",  // 由谁授权

  "roles": ["ai_agent"],
  "permissions": [
    "read:conversations",
    "write:responses",
    "read:knowledge_base"
  ],

  "scope": "assistant:basic",  // OAuth scope限定

  "constraints": {
    "rate_limit": 1000,         // 速率限制
    "allowed_resources": ["conversations", "knowledge_base"],
    "forbidden_actions": ["delete", "admin"]
  },

  "iss": "https://iam.example.com",
  "aud": "api.example.com",
  "exp": 1706883600,  // 7天有效期
  "iat": 1706278800
}

5.3 AI Agent的授权模式

模式一: Client Credentials Flow

适用于完全自主的AI Agent (无需用户授权)

sequenceDiagram
    participant Agent as AI Agent
    participant IAM as IAM系统
    participant API as API网关

    Agent->>IAM: 请求令牌<br/>client_id + client_secret
    IAM->>IAM: 验证客户端凭证
    IAM->>Agent: 返回 Access Token (JWT)

    Agent->>API: 请求 + Access Token
    API->>API: 验证JWT<br/>提取 agent_type
    API->>Agent: 返回数据

模式二: Token Exchange (代表用户)

适用于需要代表用户操作的AI Agent

sequenceDiagram
    participant User as 用户
    participant Agent as AI Agent
    participant IAM as IAM系统
    participant API as API网关

    User->>IAM: 登录获取用户Token
    IAM->>User: 返回 user_token

    User->>Agent: 授权AI Agent<br/>user_token
    Agent->>IAM: Token Exchange<br/>user_token + agent_credentials
    IAM->>Agent: 返回 delegated_token<br/>(包含 on_behalf_of)

    Agent->>API: 请求 + delegated_token
    API->>API: 验证JWT<br/>识别是Agent代表用户
    API->>Agent: 返回数据

Token Exchange实现 (RFC 8693):

def token_exchange(user_token: str, agent_credentials: Dict) -> str:
    """交换令牌:用户令牌 → AI Agent代理令牌"""
    response = requests.post(
        "https://iam.example.com/oauth/token",
        data={
            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
            "subject_token": user_token,
            "subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
            "client_id": agent_credentials["client_id"],
            "client_secret": agent_credentials["client_secret"],
            "scope": "assistant:basic"
        }
    )
    return response.json()["access_token"]

5.4 AI Agent权限控制策略

class AIAgentAuthorizer:
    def authorize(self, user_context: Dict, action: str, resource: str) -> bool:
        """AI Agent特殊授权逻辑"""

        # 1. 识别AI Agent
        if user_context.get("agent_type") == "ai_assistant":
            # 2. 检查约束条件
            constraints = user_context.get("constraints", {})

            # 禁止的操作
            if action in constraints.get("forbidden_actions", []):
                return False

            # 限定的资源
            allowed_resources = constraints.get("allowed_resources", [])
            if allowed_resources and resource not in allowed_resources:
                return False

            # 3. 检查代理权限
            if "on_behalf_of" in user_context:
                # 验证代理用户是否有权限
                original_user_id = user_context["on_behalf_of"]
                return self.check_user_permission(original_user_id, action, resource)

        # 普通用户权限检查
        return self.check_user_permission(user_context["user_id"], action, resource)

实战场景解决方案

6.1 您的问题解决方案

问题: 客户的IAM系统给用户和AI Agent颁发了身份令牌,但是我在网关上如何基于身份令牌还原用户身份原始信息,再进行权限匹配控制?

完整解决方案架构

graph TB
    subgraph Client["客户端"]
        User[用户]
        Agent[AI Agent]
    end

    subgraph IAM["客户IAM系统"]
        IssueToken[令牌颁发]
        PublicKey[公钥分发]
    end

    subgraph Gateway["API网关"]
        Extract[1. 提取Token]
        Verify[2. 验证签名]
        Parse[3. 解析Payload]
        Restore[4. 还原身份]
        Authorize[5. 权限检查]
    end

    subgraph Backend["后端服务"]
        Service[业务服务]
    end

    User --> |JWT Token| Extract
    Agent --> |JWT Token| Extract

    IAM --> |颁发| User
    IAM --> |颁发| Agent
    IAM -.->|提供公钥| Verify

    Extract --> Verify
    Verify --> Parse
    Parse --> Restore
    Restore --> Authorize
    Authorize --> |通过| Service
    Authorize --> |拒绝| User

详细实现步骤

步骤1: 获取IAM公钥

import requests
import json

class IAMPublicKeyProvider:
    def __init__(self, iam_jwks_url: str):
        self.jwks_url = iam_jwks_url
        self.keys_cache = {}

    def get_public_key(self, kid: str) -> str:
        """获取指定kid的公钥"""
        if kid in self.keys_cache:
            return self.keys_cache[kid]

        # 从IAM的JWKS端点获取公钥
        response = requests.get(self.jwks_url)
        jwks = response.json()

        for key in jwks["keys"]:
            if key["kid"] == kid:
                # 转换JWK为PEM格式
                public_key = self.jwk_to_pem(key)
                self.keys_cache[kid] = public_key
                return public_key

        raise Exception(f"Public key not found for kid: {kid}")

    def jwk_to_pem(self, jwk: Dict) -> str:
        """将JWK转换为PEM格式"""
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import rsa
        from cryptography.hazmat.backends import default_backend
        import base64

        # 解析JWK
        n = int.from_bytes(base64.urlsafe_b64decode(jwk["n"] + "=="), "big")
        e = int.from_bytes(base64.urlsafe_b64decode(jwk["e"] + "=="), "big")

        # 构造公钥
        public_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend())

        # 转换为PEM
        pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        return pem.decode("utf-8")

步骤2: 网关验证和身份还原

import jwt
from typing import Dict, Optional
import redis

class GatewayAuthenticator:
    def __init__(self,
                 public_key_provider: IAMPublicKeyProvider,
                 redis_client: redis.Redis,
                 iam_issuer: str,
                 api_audience: str):
        self.public_key_provider = public_key_provider
        self.redis = redis_client
        self.iam_issuer = iam_issuer
        self.api_audience = api_audience

    def authenticate_and_restore_identity(self, authorization_header: str) -> Dict:
        """完整的认证和身份还原流程"""

        # 1. 提取Token
        token = self.extract_token(authorization_header)
        if not token:
            raise AuthenticationError("Missing or invalid Authorization header")

        # 2. 解码Header获取kid
        unverified_header = jwt.get_unverified_header(token)
        kid = unverified_header.get("kid")
        if not kid:
            raise AuthenticationError("Missing kid in JWT header")

        # 3. 获取公钥
        public_key = self.public_key_provider.get_public_key(kid)

        # 4. 验证JWT签名和claims
        try:
            payload = jwt.decode(
                token,
                public_key,
                algorithms=["RS256"],
                audience=self.api_audience,
                issuer=self.iam_issuer,
                options={
                    "verify_signature": True,
                    "verify_exp": True,
                    "verify_iat": True,
                    "verify_aud": True,
                    "verify_iss": True
                }
            )
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Token has expired")
        except jwt.InvalidTokenError as e:
            raise AuthenticationError(f"Invalid token: {str(e)}")

        # 5. 黑名单检查(可选)
        jti = payload.get("jti")
        if jti and self.is_token_revoked(jti):
            raise AuthenticationError("Token has been revoked")

        # 6. 还原用户身份
        user_identity = self.restore_identity(payload)

        return user_identity

    def extract_token(self, authorization_header: str) -> Optional[str]:
        """从Authorization header中提取token"""
        if not authorization_header:
            return None

        parts = authorization_header.split()
        if len(parts) != 2 or parts[0].lower() != "bearer":
            return None

        return parts[1]

    def is_token_revoked(self, jti: str) -> bool:
        """检查token是否被撤销"""
        return self.redis.sismember("jwt:blacklist", jti)

    def restore_identity(self, payload: Dict) -> Dict:
        """从JWT payload还原完整用户身份"""

        # 基础身份信息
        identity = {
            "user_id": payload.get("sub"),
            "email": payload.get("email"),
            "name": payload.get("name"),
            "tenant_id": payload.get("tenant_id"),

            # 权限相关
            "roles": payload.get("roles", []),
            "permissions": payload.get("permissions", []),
            "groups": payload.get("groups", []),

            # Token元数据
            "token_id": payload.get("jti"),
            "issued_at": payload.get("iat"),
            "expires_at": payload.get("exp"),

            # 识别用户类型
            "is_human": payload.get("agent_type") is None,
            "is_agent": payload.get("agent_type") is not None,
        }

        # AI Agent特殊字段
        if identity["is_agent"]:
            identity.update({
                "agent_type": payload.get("agent_type"),
                "agent_name": payload.get("agent_name"),
                "on_behalf_of": payload.get("on_behalf_of"),  # 代理的用户
                "delegated_by": payload.get("delegated_by"),
                "constraints": payload.get("constraints", {})
            })

        return identity

步骤3: 权限匹配控制

class GatewayAuthorizer:
    def __init__(self, policy_config: Dict):
        self.policies = policy_config

    def authorize(self, user_identity: Dict, request_path: str, request_method: str) -> bool:
        """基于用户身份进行权限检查"""

        # 1. 查找匹配的策略
        policy = self.match_policy(request_path, request_method)
        if not policy:
            return True  # 无策略限制,默认允许

        # 2. 检查是否是AI Agent
        if user_identity.get("is_agent"):
            return self.authorize_agent(user_identity, policy)

        # 3. 检查普通用户权限
        return self.authorize_user(user_identity, policy)

    def match_policy(self, path: str, method: str) -> Optional[Dict]:
        """匹配路径的权限策略"""
        for pattern, policy in self.policies.items():
            if self.path_matches(path, pattern):
                return policy.get(method)
        return None

    def authorize_user(self, user_identity: Dict, policy: Dict) -> bool:
        """普通用户权限检查"""
        required_roles = policy.get("required_roles", [])
        required_permissions = policy.get("required_permissions", [])

        user_roles = set(user_identity.get("roles", []))
        user_permissions = set(user_identity.get("permissions", []))

        # 检查角色
        if required_roles:
            if not any(role in user_roles for role in required_roles):
                return False

        # 检查权限
        if required_permissions:
            if not all(perm in user_permissions for perm in required_permissions):
                return False

        return True

    def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
        """AI Agent权限检查"""
        constraints = user_identity.get("constraints", {})

        # 1. 检查禁止的操作
        forbidden_actions = constraints.get("forbidden_actions", [])
        if policy.get("action") in forbidden_actions:
            return False

        # 2. 如果Agent代理用户,检查被代理用户的权限
        if user_identity.get("on_behalf_of"):
            # 这里需要查询原始用户的权限
            original_user = self.get_user_by_id(user_identity["on_behalf_of"])
            return self.authorize_user(original_user, policy)

        # 3. Agent自身的权限检查
        agent_permissions = set(user_identity.get("permissions", []))
        required_permissions = set(policy.get("required_permissions", []))

        return required_permissions.issubset(agent_permissions)

步骤4: 完整的网关中间件

from flask import Flask, request, jsonify, g

app = Flask(__name__)

# 初始化组件
public_key_provider = IAMPublicKeyProvider("https://customer-iam.com/.well-known/jwks.json")
redis_client = redis.Redis(host="redis", port=6379)
authenticator = GatewayAuthenticator(
    public_key_provider=public_key_provider,
    redis_client=redis_client,
    iam_issuer="https://customer-iam.com",
    api_audience="api.yourservice.com"
)
authorizer = GatewayAuthorizer(policy_config={
    "/api/users": {
        "GET": {"required_roles": ["admin", "viewer"]},
        "POST": {"required_roles": ["admin"]},
        "DELETE": {"required_roles": ["admin"], "action": "delete"}
    },
    "/api/posts": {
        "GET": {"required_permissions": ["read:posts"]},
        "POST": {"required_permissions": ["write:posts"]},
    }
})

@app.before_request
def authenticate_request():
    """请求前置:认证和鉴权"""

    # 1. 认证:验证token并还原身份
    try:
        auth_header = request.headers.get("Authorization")
        user_identity = authenticator.authenticate_and_restore_identity(auth_header)
        g.user = user_identity  # 存储到Flask的g对象
    except AuthenticationError as e:
        return jsonify({"error": str(e)}), 401

    # 2. 鉴权:检查权限
    if not authorizer.authorize(user_identity, request.path, request.method):
        return jsonify({
            "error": "Forbidden",
            "message": f"User {user_identity['user_id']} does not have permission to {request.method} {request.path}"
        }), 403

@app.after_request
def add_user_context_headers(response):
    """向后端服务传递用户上下文"""
    if hasattr(g, "user"):
        user = g.user
        # 添加自定义header
        headers_to_add = {
            "X-User-ID": user["user_id"],
            "X-User-Email": user.get("email", ""),
            "X-User-Roles": ",".join(user.get("roles", [])),
            "X-Tenant-ID": user.get("tenant_id", ""),
            "X-Is-Agent": str(user.get("is_agent", False)),
        }

        if user.get("is_agent"):
            headers_to_add["X-Agent-Type"] = user.get("agent_type", "")
            headers_to_add["X-On-Behalf-Of"] = user.get("on_behalf_of", "")

        for key, value in headers_to_add.items():
            response.headers[key] = value

    return response

# 业务路由
@app.route("/api/users", methods=["GET"])
def list_users():
    user = g.user
    # 业务逻辑...
    return jsonify({
        "message": f"Hello {user['name']}",
        "users": [...]
    })

6.2 关键配置清单

IAM系统需要提供的信息

  1. JWKS端点: https://customer-iam.com/.well-known/jwks.json

    {
      "keys": [
        {
          "kty": "RSA",
          "kid": "key-2024-01",
          "use": "sig",
          "alg": "RS256",
          "n": "base64_encoded_modulus",
          "e": "AQAB"
        }
      ]
    }
  2. JWT格式约定:

    • Header必须包含kid字段
    • Payload必须包含sub, iss, aud, exp, iat
    • 自定义claims的字段名约定(roles, permissions等)
  3. Token撤销机制 (可选):

    • 提供Introspection端点: POST /oauth/introspect
    • 或提供黑名单同步机制

网关配置项

# gateway-config.yaml
authentication:
  iam:
    issuer: "https://customer-iam.com"
    jwks_url: "https://customer-iam.com/.well-known/jwks.json"
    audience: "api.yourservice.com"

  token:
    blacklist_enabled: true
    blacklist_redis_key: "jwt:blacklist"
    cache_public_keys: true
    cache_ttl: 3600  # 1小时

authorization:
  mode: "rbac"  # 或 "abac", "opa"

  # RBAC配置
  rbac:
    policies:
      - path: "/api/users"
        methods:
          GET: ["admin", "viewer"]
          POST: ["admin"]
          DELETE: ["admin"]

      - path: "/api/posts"
        methods:
          GET: ["admin", "editor", "viewer"]
          POST: ["admin", "editor"]

  # AI Agent特殊规则
  agent:
    forbidden_actions: ["delete", "admin"]
    enforce_delegation: true  # 要求Agent必须代理用户

user_context:
  headers:
    user_id: "X-User-ID"
    email: "X-User-Email"
    roles: "X-User-Roles"
    tenant_id: "X-Tenant-ID"
    is_agent: "X-Is-Agent"
    agent_type: "X-Agent-Type"

6.3 测试验证

测试用例1: 普通用户请求

# 1. 获取用户Token
USER_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
  -d "grant_type=password" \
  -d "username=alice@example.com" \
  -d "password=secret" \
  | jq -r '.access_token')

# 2. 请求API
curl -H "Authorization: Bearer $USER_TOKEN" \
  https://api.yourservice.com/api/users

# 3. 检查后端收到的Header
# X-User-ID: user_123
# X-User-Roles: admin
# X-Is-Agent: False

测试用例2: AI Agent请求

# 1. 获取Agent Token
AGENT_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
  -d "grant_type=client_credentials" \
  -d "client_id=agent_ai_assistant" \
  -d "client_secret=agent_secret" \
  | jq -r '.access_token')

# 2. 请求API
curl -H "Authorization: Bearer $AGENT_TOKEN" \
  https://api.yourservice.com/api/posts

# 3. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-Agent-Type: ai_assistant

测试用例3: Agent代理用户请求

# 1. 用户授权Agent
USER_TOKEN="user_token_here"

# 2. Agent交换令牌
DELEGATED_TOKEN=$(curl -X POST https://customer-iam.com/oauth/token \
  -d "grant_type=urn:ietf:params:oauth:grant-type:token-exchange" \
  -d "subject_token=$USER_TOKEN" \
  -d "client_id=agent_ai_assistant" \
  -d "client_secret=agent_secret" \
  | jq -r '.access_token')

# 3. Agent代表用户请求
curl -H "Authorization: Bearer $DELEGATED_TOKEN" \
  https://api.yourservice.com/api/users

# 4. 检查后端收到的Header
# X-User-ID: agent_ai_assistant_123
# X-Is-Agent: True
# X-On-Behalf-Of: user_123  ← 代理的用户

6.4 常见问题解决

问题1: 公钥获取失败

原因: IAM的JWKS端点不可达或格式错误

解决:

# 添加重试和降级机制
from tenacity import retry, stop_after_attempt, wait_exponential

class IAMPublicKeyProvider:
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    def get_public_key(self, kid: str) -> str:
        # ... 获取公钥逻辑
        pass

    def get_cached_or_default_key(self, kid: str) -> str:
        """降级方案:使用缓存的公钥"""
        try:
            return self.get_public_key(kid)
        except Exception as e:
            logger.error(f"Failed to get public key: {e}")
            # 返回缓存的公钥
            cached_key = self.redis.get(f"public_key:{kid}")
            if cached_key:
                return cached_key.decode()
            raise

问题2: Token过期但用户仍在使用

原因: Token过期时间太短,用户体验不好

解决: 实现Refresh Token机制

@app.errorhandler(401)
def handle_expired_token(error):
    if "Token has expired" in str(error):
        return jsonify({
            "error": "token_expired",
            "message": "Please refresh your token",
            "refresh_endpoint": "/oauth/token",
            "hint": "Use refresh_token grant"
        }), 401
    return jsonify({"error": str(error)}), 401

# 客户端自动刷新
def request_with_auto_refresh(url, access_token, refresh_token):
    response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})

    if response.status_code == 401 and response.json().get("error") == "token_expired":
        # 自动刷新token
        new_tokens = refresh_access_token(refresh_token)
        # 重试请求
        return requests.get(url, headers={"Authorization": f"Bearer {new_tokens['access_token']}"})

    return response

问题3: AI Agent权限过大

原因: Agent使用了用户的完整权限

解决: 使用Scope限定Agent权限

def authorize_agent(self, user_identity: Dict, policy: Dict) -> bool:
    # 检查Token的scope
    token_scope = user_identity.get("scope", "")
    scopes = set(token_scope.split())

    # Agent只有基础scope,不能执行管理操作
    if "admin" in policy.get("required_roles", []):
        return "admin:full" in scopes

    # 其他操作需要对应的scope
    action = policy.get("action")
    required_scope = f"{action}:{resource_type}"

    return required_scope in scopes

总结

核心要点

  1. IAM职责: 颁发令牌、管理身份、定义策略

  2. JWT优势: 自包含、无状态、适合分布式

  3. 网关核心:

    • 验证令牌签名
    • 还原用户身份
    • 执行权限检查
    • 传递用户上下文
  4. AI Agent特殊处理:

    • 使用Client Credentials或Token Exchange
    • 限定权限scope
    • 记录代理关系(on-behalf-of)

最佳实践

场景推荐方案
用户令牌JWT (RS256), 短期(1小时) + Refresh Token
AI Agent令牌JWT (RS256), 长期(7天), 限定scope
网关验证本地验证JWT签名 + 黑名单检查
权限模型简单场景用RBAC, 复杂场景用ABAC(OPA)
用户上下文传递自定义HTTP Header

安全建议

  1. 始终使用HTTPS传输Token
  2. 使用RS256等非对称算法签名JWT
  3. 设置合理的Token过期时间
  4. 实现Token黑名单机制
  5. 定期轮换签名密钥(kid)
  6. 记录完整的审计日志
  7. AI Agent使用最小权限原则

主流IAM实现方案深度分析

7.1 云厂商IAM方案对比

国际云厂商

AWS IAM (Amazon Identity and Access Management)

架构特点:

graph TB
    subgraph AWS_IAM["AWS IAM架构"]
        User[IAM User]
        Role[IAM Role]
        Policy[Policy Document]
        STS[STS临时凭证]

        User --> Policy
        Role --> Policy
        Role --> STS
    end

    subgraph Resources["AWS资源"]
        S3[S3 Bucket]
        EC2[EC2 Instance]
        Lambda[Lambda Function]
    end

    User --> |访问| Resources
    STS --> |临时凭证| Resources

    style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px

核心机制:

  1. Policy结构 (JSON策略文档):
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "arn:aws:s3:::my-bucket/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": "203.0.113.0/24"
        }
      }
    }
  ]
}

特色功能:

  • STS (Security Token Service): 临时凭证,支持跨账号访问
  • AssumeRole: 角色扮演机制,适合服务间调用
  • Resource-based Policy: 资源级策略,细粒度控制
  • Conditions: 基于IP、时间、MFA等条件的策略

Token机制:

  • 使用AWS Signature V4签名算法
  • Access Key + Secret Key生成签名
  • STS临时凭证: Access Key + Secret Key + Session Token (有效期15分钟~12小时)

适用场景:

  • AWS生态内的资源访问控制
  • 跨账号资源共享
  • 服务间的安全调用

局限性:

  • 仅限AWS资源,无法管理外部应用
  • 学习曲线陡峭,策略语法复杂
  • 无内置UI管理界面

Azure AD / Microsoft Entra ID

架构特点:

graph LR
    subgraph Azure_AD["Azure AD"]
        User[用户]
        App[应用注册]
        SP[Service Principal]
        ManagedID[托管标识]
    end

    subgraph Protocols["认证协议"]
        OAuth[OAuth 2.0]
        OIDC[OpenID Connect]
        SAML[SAML 2.0]
    end

    User --> OAuth
    App --> OAuth
    SP --> OAuth
    ManagedID --> OAuth

    OAuth --> Resources[Azure资源]
    OIDC --> Resources
    SAML --> Resources

    style Azure_AD fill:#0078d4,stroke:#ffffff,stroke-width:2px

核心机制:

  1. 应用注册 (Application Registration):
{
  "appId": "00000000-0000-0000-0000-000000000000",
  "appRoles": [
    {
      "id": "role-id",
      "displayName": "Admin",
      "value": "Admin",
      "allowedMemberTypes": ["User", "Application"]
    }
  ],
  "requiredResourceAccess": [
    {
      "resourceAppId": "00000003-0000-0000-c000-000000000000",
      "resourceAccess": [
        {
          "id": "e1fe6dd8-ba31-4d61-89e7-88639da4683d",
          "type": "Scope"
        }
      ]
    }
  ]
}
  1. Managed Identity (托管标识):
  • System-assigned: 与资源生命周期绑定
  • User-assigned: 可复用的独立身份

Token机制:

  • 使用标准JWT (RS256签名)
  • Access Token: 访问Azure资源
  • ID Token: 用户身份信息
  • Refresh Token: 刷新访问令牌

Token示例:

{
  "aud": "https://graph.microsoft.com",
  "iss": "https://sts.windows.net/{tenant-id}/",
  "iat": 1706451600,
  "nbf": 1706451600,
  "exp": 1706455200,
  "sub": "user-object-id",
  "oid": "user-object-id",
  "roles": ["Directory.Read.All"],
  "scp": "User.Read Mail.Send",
  "tid": "tenant-id"
}

特色功能:

  • Conditional Access: 基于风险、设备、位置的条件访问
  • Privileged Identity Management (PIM): 即时权限提升
  • Identity Protection: 风险检测和自动响应
  • B2B/B2C: 支持外部用户和客户身份管理

适用场景:

  • 企业级身份管理
  • Microsoft 365生态集成
  • 混合云环境(Azure + On-Premises)

优势:

  • 完整的企业身份解决方案
  • 强大的条件访问和风险控制
  • 与Windows、Office深度集成

Google Cloud IAM

架构特点:

graph TB
    subgraph GCP_IAM["Google Cloud IAM"]
        Member[成员<br/>User/SA/Group]
        Role[角色<br/>Primitive/Predefined/Custom]
        Policy[IAM Policy]

        Member --> Policy
        Role --> Policy
    end

    subgraph Resources["资源层级"]
        Org[Organization]
        Folder[Folder]
        Project[Project]
        Resource[Resource]

        Org --> Folder
        Folder --> Project
        Project --> Resource
    end

    Policy --> Resources

    style GCP_IAM fill:#4285f4,stroke:#ffffff,stroke-width:2px

核心机制:

  1. IAM Policy Binding:
{
  "bindings": [
    {
      "role": "roles/storage.objectViewer",
      "members": [
        "user:alice@example.com",
        "serviceAccount:my-sa@project.iam.gserviceaccount.com"
      ],
      "condition": {
        "title": "Expires in 2024",
        "expression": "request.time < timestamp('2024-12-31T23:59:59Z')"
      }
    }
  ]
}
  1. Service Account:
  • 应用身份,非人类用户
  • 可以模拟(Impersonate)其他服务账号
  • 支持短期密钥(Short-lived credentials)

Token机制:

  • 使用标准JWT
  • 通过Metadata Server自动获取(GCE/GKE)
  • 支持Workload Identity Federation(联合身份)

特色功能:

  • Hierarchical Inheritance: 策略继承(Organization → Folder → Project → Resource)
  • IAM Conditions: 基于CEL(Common Expression Language)的条件策略
  • Workload Identity: 将Kubernetes SA映射到GCP SA,无需密钥
  • Policy Analyzer: 分析和审计权限

适用场景:

  • GCP资源访问控制
  • 多项目/多组织管理
  • Kubernetes workload身份管理

优势:

  • 简洁的权限模型(Member-Role-Resource)
  • 强大的条件策略(CEL表达式)
  • Workload Identity消除密钥管理

国内云厂商

阿里云RAM (Resource Access Management)

架构特点:

graph TB
    subgraph Alibaba_RAM["阿里云RAM"]
        User[RAM用户]
        Role[RAM角色]
        Policy[权限策略]
        STS[STS临时Token]

        User --> Policy
        Role --> Policy
        Role --> STS
    end

    subgraph Resources["阿里云资源"]
        ECS[ECS]
        OSS[OSS]
        RDS[RDS]
    end

    User --> Resources
    STS --> Resources

    style Alibaba_RAM fill:#ff6a00,stroke:#ffffff,stroke-width:2px

核心机制:

  1. Policy语法 (类似AWS):
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "oss:GetObject",
        "oss:PutObject"
      ],
      "Resource": [
        "acs:oss:*:*:my-bucket/*"
      ],
      "Condition": {
        "IpAddress": {
          "acs:SourceIp": ["192.168.0.0/16"]
        }
      }
    }
  ]
}

Token机制:

  • AccessKey/SecretKey: 永久凭证
  • STS临时Token: SecurityToken + TempAccessKey + TempSecretKey
  • 有效期: 900秒~3600秒

特色功能:

  • 跨云账号授权: 支持主账号授权给其他阿里云账号
  • 角色SSO: 支持企业IdP集成
  • 资源组: 跨产品的资源分组管理
  • 操作审计: 记录所有API调用

适用场景:

  • 阿里云资源访问控制
  • 移动应用临时授权(STS)
  • 跨账号资源共享

腾讯云CAM (Cloud Access Management)

架构特点:

graph TB
    subgraph Tencent_CAM["腾讯云CAM"]
        User[子用户]
        Role[角色]
        Policy[策略]
        Group[用户组]

        User --> Group
        Group --> Policy
        Role --> Policy
    end

    subgraph Resources["腾讯云资源"]
        CVM[CVM]
        COS[COS]
        TKE[TKE]
    end

    User --> Resources
    Role --> Resources

    style Tencent_CAM fill:#006eff,stroke:#ffffff,stroke-width:2px

核心机制:

  1. Policy结构:
{
  "version": "2.0",
  "statement": [
    {
      "effect": "allow",
      "action": [
        "cos:GetObject",
        "cos:PutObject"
      ],
      "resource": [
        "qcs::cos:ap-guangzhou:uid/1250000000:my-bucket/*"
      ],
      "condition": {
        "ip_equal": {
          "qcs:ip": ["10.0.0.1/24"]
        }
      }
    }
  ]
}

Token机制:

  • 永久密钥: SecretId + SecretKey
  • 临时密钥: TmpSecretId + TmpSecretKey + Token
  • 签名算法: HMAC-SHA256

特色功能:

  • 服务角色: 授权腾讯云服务访问其他服务
  • 标签授权: 基于资源标签的权限控制
  • 会话管理: 查看和撤销子用户登录会话
  • 操作保护: 敏感操作二次验证

适用场景:

  • 腾讯云资源权限管理
  • 企业多账号管理
  • 临时授权(STS)

华为云IAM

核心特点:

  • 统一身份认证: 支持华为云账号、IAM用户、联邦用户
  • 委托授权: 类似AWS的AssumeRole
  • 虚拟MFA: 支持虚拟MFA设备
  • 自定义策略: 支持细粒度的资源级授权

Token机制:

  • 永久AK/SK: Access Key + Secret Key
  • 临时Token: 通过委托获取临时凭证
  • IAM Token: 基于用户名密码获取的临时token(24小时有效)

7.2 IDaaS方案对比

国际方案

Okta

架构定位: 云原生身份即服务(IDaaS)平台

graph TB
    subgraph Okta["Okta平台"]
        UD[Universal Directory]
        SSO[Single Sign-On]
        MFA[Multi-Factor Auth]
        LC[Lifecycle Management]
    end

    subgraph Apps["应用集成"]
        SaaS[SaaS应用<br/>5000+]
        OnPrem[本地应用]
        Custom[自定义应用]
    end

    subgraph Protocols["协议支持"]
        SAML2[SAML 2.0]
        OIDC[OIDC]
        OAuth2[OAuth 2.0]
    end

    UD --> SSO
    SSO --> Apps
    Protocols --> Apps

    style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px

核心能力:

  1. Universal Directory:

    • 集中式用户目录
    • 支持LDAP、AD集成
    • 自定义用户属性
  2. Adaptive MFA:

    • 基于风险的自适应认证
    • 支持TOTP、WebAuthn、生物识别
    • 无密码认证(Passwordless)
  3. Lifecycle Management:

    • 用户自动Provisioning/Deprovisioning
    • 与HR系统集成(Workday、SAP等)

Token机制:

  • 标准JWT (RS256)
  • 短期Access Token (1小时)
  • 长期Refresh Token (可配置)

OAuth 2.0 Flow支持:

  • Authorization Code + PKCE
  • Client Credentials
  • Resource Owner Password (不推荐)
  • Device Authorization

特色功能:

  • Okta Workflows: 低代码自动化(类似Zapier)
  • API Access Management: 为自己的API提供OAuth服务器
  • Customer Identity (CIAM): B2C场景的用户管理

适用场景:

  • 企业SSO门户
  • 应用集成中心
  • 客户身份管理(CIAM)

优势:

  • 5000+ 预集成应用
  • 强大的自动化能力
  • 完善的开发者生态

定价:

  • 按用户数收费($2~15/user/month)
  • 高昂的企业成本

Auth0 (by Okta)

架构定位: 面向开发者的身份平台

graph LR
    subgraph Auth0["Auth0"]
        Rules[Rules引擎]
        Actions[Actions]
        Connections[Identity Connections]
        MFA[MFA]
    end

    subgraph Integrations["身份源"]
        Social[社交登录<br/>Google/FB/GitHub]
        Enterprise[企业IdP<br/>AD/SAML]
        DB[用户名密码]
    end

    Integrations --> Connections
    Connections --> Rules
    Rules --> Actions

    style Auth0 fill:#eb5424,stroke:#ffffff,stroke-width:2px

核心能力:

  1. Universal Login:

    • 托管登录页面
    • 自定义品牌和UI
    • 跨域SSO
  2. Rules & Actions:

    • 可编程的认证流程
    • JavaScript/Node.js编写
    • 支持调用外部API

Rules示例:

function addRolesToToken(user, context, callback) {
  // 从数据库查询用户角色
  const roles = getUserRoles(user.email);

  // 添加到Token的claims
  context.idToken['https://myapp.com/roles'] = roles;
  context.accessToken['https://myapp.com/roles'] = roles;

  callback(null, user, context);
}
  1. Organizations (B2B多租户):
    • 租户隔离
    • 每个租户独立的身份源
    • 租户级别的品牌定制

Token机制:

  • 标准JWT
  • 支持自定义Claims
  • Token签名算法可配置(RS256/HS256)

特色功能:

  • Extensibility: 高度可定制的认证流程
  • Social Connections: 30+社交登录提供商
  • Passwordless: 邮箱/短信验证码登录
  • Breached Password Detection: 泄露密码检测

适用场景:

  • SaaS应用的用户认证
  • 移动应用登录
  • 多租户B2B应用

优势:

  • 开发者友好的API和SDK
  • 灵活的可扩展性
  • 快速集成(几小时完成)

定价:

  • 免费版: 7000 MAU (月活用户)
  • 付费版: $35~240/month (起步价)

国内方案

身份宝 (IDaaS by 阿里云)

架构定位: 企业级身份管理云服务

graph TB
    subgraph IDaaS["身份宝"]
        UD[统一用户目录]
        SSO[单点登录]
        MFA[多因素认证]
        Sync[账号同步]
    end

    subgraph Sources["身份源"]
        AD[Active Directory]
        LDAP[LDAP]
        DingTalk[钉钉]
        WeChat[企业微信]
    end

    subgraph Apps["应用"]
        Internal[内部应用]
        SaaS[SaaS应用]
        Alibaba[阿里云控制台]
    end

    Sources --> Sync
    Sync --> UD
    UD --> SSO
    SSO --> Apps

    style IDaaS fill:#ff6a00,stroke:#ffffff,stroke-width:2px

核心能力:

  1. 多源身份集成:

    • AD/LDAP同步
    • 钉钉、企业微信集成
    • HR系统对接
  2. 应用门户:

    • 企业应用市场
    • 统一工作台
    • 移动端APP
  3. 权限管理:

    • RBAC权限模型
    • 应用级授权
    • 数据权限控制

特色功能:

  • 国密支持: SM2/SM3/SM4算法
  • 本地化部署: 支持私有化部署
  • 合规认证: 等保三级、ISO 27001

适用场景:

  • 国企/央企身份管理
  • 需要本地化部署的场景
  • 阿里云生态深度用户

玉符 (AuthingCloud)

架构定位: 开发者友好的身份云

graph LR
    subgraph Authing["玉符"]
        Core[认证核心]
        Pipeline[Pipeline扩展]
        Rules[业务规则]
    end

    subgraph Features["功能"]
        Social[社交登录]
        MFA[MFA]
        RBAC[权限管理]
        Webhooks[Webhooks]
    end

    Core --> Pipeline
    Pipeline --> Rules
    Features --> Core

    style Authing fill:#396aff,stroke:#ffffff,stroke-width:2px

核心能力:

  1. 快速集成:

    • 5分钟接入
    • 丰富的SDK(20+语言)
    • 托管登录页
  2. Pipeline机制:

    • 可编程的认证流程
    • 类似Auth0的Rules
    • 支持JavaScript/Python

Pipeline示例:

async function pipe(user, context, callback) {
  // 从自己的数据库获取额外信息
  const userInfo = await getUserFromDB(user.id);

  // 添加到Token
  user.customData = {
    department: userInfo.department,
    level: userInfo.level
  };

  return callback(null, user, context);
}
  1. 多租户SaaS:
    • 租户隔离
    • 独立用户池
    • 品牌定制

Token机制:

  • 标准JWT (RS256/HS256)
  • 自定义Claims
  • Token生命周期可配置

特色功能:

  • 国内社交登录: 微信、支付宝、QQ等
  • 小程序登录: 微信小程序一键登录
  • 实名认证: 集成第三方实名认证服务
  • 国密支持: 符合国内合规要求

适用场景:

  • 互联网应用/SaaS
  • 移动应用/小程序
  • 需要国内社交登录的场景

优势:

  • 国内网络环境优化
  • 微信生态深度集成
  • 性价比高

定价:

  • 免费版: 7000 MAU
  • 付费版: ¥3000~30000/year

竹云 (BambooCloud)

架构定位: 企业级IAM全栈方案

核心能力:

  • 4A统一管控: Account(账号)、Authentication(认证)、Authorization(授权)、Audit(审计)
  • 特权账号管理(PAM): 运维人员权限管控
  • 堡垒机集成: 安全运维审计

特色功能:

  • 面向传统企业/政府
  • 强调合规和审计
  • 支持私有化部署

适用场景:

  • 大型企业集团
  • 金融/政府行业
  • 需要PAM能力的场景

7.3 开源IAM方案

Keycloak

架构定位: 开源身份和访问管理解决方案

graph TB
    subgraph Keycloak["Keycloak"]
        Realm[Realm<br/>租户隔离]
        Client[Client<br/>应用注册]
        User[User Federation]
        IDP[Identity Brokering]
    end

    subgraph Features["功能"]
        SSO[SSO]
        OAuth[OAuth 2.0/OIDC]
        SAML[SAML 2.0]
        LDAP[LDAP/AD]
    end

    Realm --> Client
    Realm --> User
    User --> LDAP
    IDP --> Features

    style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px

核心能力:

  1. Realm多租户:

    • 每个Realm独立的用户、角色、客户端
    • 跨Realm SSO
  2. User Federation:

    • LDAP/AD集成
    • 自定义User Storage SPI
  3. Identity Brokering:

    • 作为OAuth/OIDC代理
    • 支持社交登录(Google、GitHub等)

Token机制:

  • 标准JWT (RS256)
  • Access Token / ID Token / Refresh Token
  • Token mapper自定义Claims

Client配置示例:

{
  "clientId": "my-app",
  "enabled": true,
  "clientAuthenticatorType": "client-secret",
  "redirectUris": ["https://myapp.com/callback"],
  "webOrigins": ["https://myapp.com"],
  "protocol": "openid-connect",
  "publicClient": false,
  "standardFlowEnabled": true,
  "directAccessGrantsEnabled": false
}

特色功能:

  • Admin Console: 完善的Web管理界面
  • Account Console: 用户自服务门户
  • Custom Themes: 自定义登录页面
  • Event Listeners: 审计和集成

适用场景:

  • 企业内部应用SSO
  • 微服务身份管理
  • 需要自主可控的场景

优势:

  • 完全开源免费
  • 功能完整(不输商业产品)
  • 活跃的社区支持

劣势:

  • 学习曲线陡峭
  • 资源消耗较大(Java应用)
  • 需要自己运维

Casdoor

架构定位: 国产化开源IAM平台

graph LR
    subgraph Casdoor["Casdoor"]
        Org[Organization]
        App[Application]
        User[User]
        Provider[Provider]
    end

    subgraph Features["特色"]
        WeChat[微信登录]
        Alipay[支付宝]
        DingTalk[钉钉]
        LDAP[LDAP]
    end

    Org --> App
    App --> User
    Provider --> Features

    style Casdoor fill:#2e7d32,stroke:#ffffff,stroke-width:2px

核心能力:

  • 多组织管理: Organization概念隔离租户
  • 国内生态: 微信、支付宝、钉钉等登录
  • Casbin集成: 内置权限管理

Token机制:

  • 标准JWT
  • 支持自定义字段

特色功能:

  • 全中文界面和文档
  • 支持国密算法
  • 轻量级(Go语言开发)

适用场景:

  • 国内中小企业
  • 需要快速部署的场景
  • 对国密有要求的项目

7.4 方案选型决策树

graph TB
    Start{需求分析}

    Start --> Q1{部署方式?}
    Q1 -->|云服务| Q2{预算?}
    Q1 -->|私有化| Q3{规模?}

    Q2 -->|充足| Okta[Okta/Auth0]
    Q2 -->|有限| Authing[玉符/身份宝]

    Q3 -->|大型| Keycloak[Keycloak]
    Q3 -->|中小型| Casdoor[Casdoor]

    Start --> Q4{场景?}
    Q4 -->|云资源| Cloud{云厂商?}
    Q4 -->|应用| App{类型?}

    Cloud -->|AWS| AWS_IAM[AWS IAM]
    Cloud -->|Azure| Azure_AD[Azure AD]
    Cloud -->|阿里云| Alibaba_RAM[阿里云RAM]
    Cloud -->|腾讯云| Tencent_CAM[腾讯云CAM]

    App -->|B2B SaaS| Auth0
    App -->|B2C| Authing
    App -->|企业内部| Keycloak

    style Okta fill:#007dc1,stroke:#ffffff,stroke-width:2px
    style Keycloak fill:#4d4d4d,stroke:#00b8e3,stroke-width:2px
    style AWS_IAM fill:#ff9900,stroke:#232f3e,stroke-width:2px

7.5 综合对比表

方案类型部署方式适用场景Token类型价格优势劣势
AWS IAM云IAMSaaSAWS资源AWS签名按使用量AWS生态仅限AWS
Azure AD企业IdPSaaS企业SSOJWT$6~21/user/month企业功能完整价格较高
OktaIDaaSSaaS应用SSOJWT$2~15/user/month应用集成多价格昂贵
Auth0IDaaSSaaS开发者JWT$35~240/month开发者友好MAU限制
Keycloak开源自建企业内部JWT免费功能完整运维成本
阿里云RAM云IAMSaaS阿里云资源STS Token按使用量阿里云集成仅限阿里云
玉符IDaaSSaaS国内应用JWT¥3000~30000/year国内生态功能相对简单
身份宝IDaaSSaaS/私有化企业级JWT定制报价本地化支持价格不透明
Casdoor开源自建中小企业JWT免费轻量级社区较小

7.6 架构设计建议

场景一: 初创公司 (0~50人)

推荐方案: Auth0 / 玉符

架构:

用户 → Auth0托管登录 → 应用 (接收JWT)

理由:

  • 快速集成(1天完成)
  • 无需运维
  • 成本可控(免费额度内)

场景二: 成长期公司 (50~500人)

推荐方案: Keycloak (自建) / Okta (预算充足)

架构:

用户 → Keycloak SSO → 内部应用(10+)
                    → SaaS应用(SAML)
                    → API网关(JWT)

理由:

  • 应用数量增加,SSO需求
  • 自建Keycloak节省成本
  • 支持LDAP集成企业AD

场景三: 大型企业 (500+人)

推荐方案: Azure AD (Microsoft生态) / 混合架构

混合架构:

        [Azure AD] ←→ [On-Premises AD]

      [应用网关]
       ↓         ↓
   [内部应用] [SaaS应用]

   [Keycloak] (微服务内部)

理由:

  • 企业级功能(PIM、条件访问)
  • 与Office 365集成
  • 混合云支持
  • 微服务层使用轻量级Keycloak

场景四: 多云/混合云

推荐方案: Workload Identity Federation

架构:

[GKE Pod] → [Workload Identity] → [GCP Service Account]

                                [AssumeRoleWithWebIdentity]

                                  [AWS IAM Role]

关键技术:

  • Google Workload Identity
  • AWS IAM OIDC Provider
  • Azure Managed Identity

理由:

  • 无需管理云凭证
  • 支持跨云访问
  • 最小权限原则

7.7 实战建议

对于您的问题场景

基于您的需求:

客户的IAM系统给用户和AI Agent颁发身份令牌,网关需要还原身份并进行权限控制

推荐方案:

  1. 如果客户IAM是云厂商的:

    • AWS: 使用STS AssumeRole
    • Azure: 使用Managed Identity
    • 阿里云: 使用RAM STS
  2. 如果客户IAM是企业自建的:

    • 要求客户IAM提供JWKS端点
    • 网关使用公钥验证JWT
    • 实现本文档第6章的方案
  3. 如果需要从零建设IAM:

    • 小型项目: 使用Auth0/玉符
    • 中型项目: 自建Keycloak
    • 大型项目: Azure AD + Keycloak混合

关键对接点:

# 与客户IAM的对接清单
1. JWKS端点: https://customer-iam.com/.well-known/jwks.json
2. Token格式: JWT (RS256)
3. Claims约定:
   - sub: 用户/Agent ID
   - roles: 角色数组
   - permissions: 权限数组
   - agent_type: AI Agent标识
4. Token有效期:
   - 用户Token: 1小时
   - Agent Token: 7天
5. 撤销机制:
   - 方式1: jti黑名单(Redis)
   - 方式2: Introspection端点

参考资源

标准规范

云厂商文档

开源项目

书籍推荐

  • OAuth 2.0 in Action by Justin Richer
  • Solving Identity Management in Modern Applications by Yvonne Wilson
  • Zero Trust Networks by Evan Gilman