分布式系统架构深度指南

#云原生架构

分布式系统架构深度指南


第一部分:微服务架构设计模式

1.1 微服务架构基础

什么是微服务?

微服务是一种架构风格,将单一应用程序开发为一组小型服务,每个服务运行在自己的进程中,使用轻量级机制(通常是HTTP RESTful API)进行通信。

核心特征:

  • 独立部署: 每个服务可以独立部署,不影响其他服务
  • 业务能力: 围绕业务能力组织服务
  • 去中心化: 数据管理去中心化,每个服务管理自己的数据
  • 容错设计: 服务必须能够容忍其他服务的失败

单体架构 vs 微服务架构

单体架构:
┌─────────────────────────────────┐
│     Monolithic Application      │
│  ┌───────┐ ┌────────┐ ┌──────┐ │
│  │  UI   │ │Business│ │ Data │ │
│  │ Layer │ │ Logic  │ │Access│ │
│  └───────┘ └────────┘ └──────┘ │
└─────────────────────────────────┘

    ┌────▼────┐
    │Database │
    └─────────┘

微服务架构:
┌─────────┐    ┌─────────┐    ┌─────────┐
│Service A│    │Service B│    │Service C│
│ ┌─────┐ │    │ ┌─────┐ │    │ ┌─────┐ │
│ │Logic│ │    │ │Logic│ │    │ │Logic│ │
│ └──┬──┘ │    │ └──┬──┘ │    │ └──┬──┘ │
└────┼────┘    └────┼────┘    └────┼────┘
     │              │              │
  ┌──▼──┐        ┌──▼──┐        ┌──▼──┐
  │DB-A │        │DB-B │        │DB-C │
  └─────┘        └─────┘        └─────┘

1.2 服务拆分原则

领域驱动设计 (DDD) 拆分

限界上下文 (Bounded Context):

  • 每个微服务代表一个限界上下文
  • 上下文内有自己的领域模型和通用语言

示例:电商系统拆分

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ 用户服务      │     │ 订单服务      │     │ 库存服务      │
├──────────────┤     ├──────────────┤     ├──────────────┤
│- 用户注册    │     │- 创建订单    │     │- 库存查询    │
│- 用户认证    │     │- 订单查询    │     │- 库存扣减    │
│- 用户信息    │     │- 订单状态    │     │- 库存归还    │
└──────────────┘     └──────────────┘     └──────────────┘
        │                    │                    │
    ┌───▼───┐            ┌───▼───┐            ┌───▼───┐
    │User DB│            │Order  │            │Stock  │
    └───────┘            │  DB   │            │  DB   │
                         └───────┘            └───────┘

拆分原则:

  1. 单一职责原则: 一个服务只负责一个业务领域
  2. 高内聚低耦合: 相关功能聚集,减少服务间依赖
  3. 数据独立性: 每个服务拥有独立的数据库
  4. 业务能力: 按业务能力而非技术层次拆分

1.3 核心设计模式

Pattern 1: API Gateway 模式

问题: 客户端需要调用多个微服务,导致复杂性增加

解决方案: 提供单一入口点,路由请求到对应的微服务

客户端请求流程:

客户端


┌─────────────────┐
│  API Gateway    │
│  ┌───────────┐  │
│  │ 认证/授权 │  │
│  ├───────────┤  │
│  │ 路由      │  │
│  ├───────────┤  │
│  │ 限流      │  │
│  ├───────────┤  │
│  │ 聚合      │  │
│  └───────────┘  │
└────┬─────┬─────┬┘
     │     │     │
     ▼     ▼     ▼
  服务A  服务B  服务C

实现示例 (Node.js + Express):

const express = require('express');
const { createProxyMiddleware } = require('http-proxy-middleware');

const app = express();

// 认证中间件
const authenticate = (req, res, next) => {
  const token = req.headers.authorization;
  if (!token) {
    return res.status(401).json({ error: 'Unauthorized' });
  }
  // 验证 token
  next();
};

// 路由到用户服务
app.use('/api/users', authenticate, createProxyMiddleware({
  target: 'http://user-service:3001',
  changeOrigin: true
}));

// 路由到订单服务
app.use('/api/orders', authenticate, createProxyMiddleware({
  target: 'http://order-service:3002',
  changeOrigin: true
}));

// 聚合多个服务的数据
app.get('/api/user-dashboard/:userId', authenticate, async (req, res) => {
  const userId = req.params.userId;

  try {
    // 并行调用多个服务
    const [userInfo, orders, recommendations] = await Promise.all([
      fetch(`http://user-service:3001/users/${userId}`),
      fetch(`http://order-service:3002/orders?userId=${userId}`),
      fetch(`http://recommendation-service:3003/recommendations/${userId}`)
    ]);

    res.json({
      user: await userInfo.json(),
      recentOrders: await orders.json(),
      recommendations: await recommendations.json()
    });
  } catch (error) {
    res.status(500).json({ error: 'Service unavailable' });
  }
});

app.listen(3000);

优势:

  • 简化客户端调用
  • 减少网络往返次数
  • 集中处理横切关注点(认证、日志、限流)

劣势:

  • 可能成为单点故障
  • 增加延迟
  • 需要额外维护

Pattern 2: 断路器模式 (Circuit Breaker)

问题: 服务故障可能导致级联失败和资源耗尽

原理: 类似电路断路器,当错误率超过阈值时自动切断请求

状态转换:

初始状态: CLOSED (正常)

           │ 错误率 > 阈值

        OPEN (断开)

           │ 超时后

      HALF_OPEN (半开)

           ├─► 成功 ─► CLOSED

           └─► 失败 ─► OPEN

实现示例 (JavaScript):

class CircuitBreaker {
  constructor(request, options = {}) {
    this.request = request;
    this.state = 'CLOSED';
    this.failureCount = 0;
    this.successCount = 0;
    this.nextAttempt = Date.now();

    this.failureThreshold = options.failureThreshold || 5;
    this.successThreshold = options.successThreshold || 2;
    this.timeout = options.timeout || 60000; // 60秒
  }

  async call(...args) {
    if (this.state === 'OPEN') {
      if (Date.now() < this.nextAttempt) {
        throw new Error('Circuit breaker is OPEN');
      }
      // 进入半开状态
      this.state = 'HALF_OPEN';
    }

    try {
      const response = await this.request(...args);
      return this.onSuccess(response);
    } catch (error) {
      return this.onFailure(error);
    }
  }

  onSuccess(response) {
    this.failureCount = 0;

    if (this.state === 'HALF_OPEN') {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = 'CLOSED';
        this.successCount = 0;
      }
    }

    return response;
  }

  onFailure(error) {
    this.failureCount++;

    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      this.nextAttempt = Date.now() + this.timeout;
    }

    throw error;
  }

  getState() {
    return this.state;
  }
}

// 使用示例
const fetchUserData = async (userId) => {
  const response = await fetch(`http://user-service/users/${userId}`);
  if (!response.ok) throw new Error('Service error');
  return response.json();
};

const breaker = new CircuitBreaker(fetchUserData, {
  failureThreshold: 3,
  timeout: 30000
});

// 调用服务
try {
  const userData = await breaker.call('user123');
  console.log(userData);
} catch (error) {
  console.log('服务不可用,返回降级数据');
  // 返回缓存或默认数据
}

Pattern 3: 服务发现

问题: 微服务实例动态变化,如何找到服务地址?

解决方案: 使用服务注册中心

架构:

┌────────────────────────────────────┐
│     服务注册中心 (Consul/Eureka)    │
│                                    │
│  服务A: 192.168.1.10:8001         │
│  服务A: 192.168.1.11:8001         │
│  服务B: 192.168.1.20:8002         │
└──────▲──────────────┬──────────────┘
       │              │
    注册/心跳      发现/查询
       │              │
       │              ▼
┌──────┴─────┐   ┌────────┐
│  服务实例   │   │ 客户端 │
│            │   │        │
│  启动时注册 │   │ 查询服务│
│  定期心跳   │   │ 负载均衡│
└────────────┘   └────────┘

服务注册示例 (Consul):

const Consul = require('consul');
const express = require('express');

const consul = new Consul();
const app = express();
const PORT = process.env.PORT || 3001;
const SERVICE_NAME = 'user-service';
const SERVICE_ID = `${SERVICE_NAME}-${PORT}`;

// 服务健康检查端点
app.get('/health', (req, res) => {
  res.json({ status: 'healthy' });
});

// 注册服务到 Consul
const registerService = () => {
  const details = {
    name: SERVICE_NAME,
    id: SERVICE_ID,
    address: 'localhost',
    port: PORT,
    check: {
      http: `http://localhost:${PORT}/health`,
      interval: '10s',
      timeout: '5s'
    }
  };

  consul.agent.service.register(details, (err) => {
    if (err) {
      console.error('注册服务失败:', err);
    } else {
      console.log('服务注册成功');
    }
  });
};

// 注销服务
const deregisterService = () => {
  consul.agent.service.deregister(SERVICE_ID, (err) => {
    console.log('服务已注销');
    process.exit();
  });
};

// 启动服务
app.listen(PORT, () => {
  console.log(`服务运行在端口 ${PORT}`);
  registerService();
});

// 优雅关闭
process.on('SIGINT', deregisterService);
process.on('SIGTERM', deregisterService);

服务发现客户端示例:

const Consul = require('consul');
const consul = new Consul();

// 从 Consul 获取服务实例
const getServiceInstance = async (serviceName) => {
  return new Promise((resolve, reject) => {
    consul.health.service({
      service: serviceName,
      passing: true  // 只返回健康的实例
    }, (err, result) => {
      if (err) return reject(err);

      if (result.length === 0) {
        return reject(new Error('No healthy instances found'));
      }

      // 简单的随机负载均衡
      const instance = result[Math.floor(Math.random() * result.length)];
      resolve({
        address: instance.Service.Address,
        port: instance.Service.Port
      });
    });
  });
};

// 调用服务
const callUserService = async (userId) => {
  const instance = await getServiceInstance('user-service');
  const url = `http://${instance.address}:${instance.port}/users/${userId}`;
  const response = await fetch(url);
  return response.json();
};

第二部分:服务网格 (Service Mesh)

2.1 什么是服务网格?

服务网格是一个专用的基础设施层,用于处理服务间通信。它将网络功能(如负载均衡、服务发现、故障恢复、指标收集等)从应用代码中抽离出来。

架构对比:

传统微服务:
┌─────────────────┐     ┌─────────────────┐
│   Service A     │     │   Service B     │
│  ┌───────────┐  │     │  ┌───────────┐  │
│  │ 业务逻辑  │  │     │  │ 业务逻辑  │  │
│  ├───────────┤  │     │  ├───────────┤  │
│  │ 网络库    │  │────▶│  │ 网络库    │  │
│  │(重试/限流)│  │     │  │(重试/限流)│  │
│  └───────────┘  │     │  └───────────┘  │
└─────────────────┘     └─────────────────┘

服务网格:
┌─────────────────┐     ┌─────────────────┐
│   Service A     │     │   Service B     │
│  ┌───────────┐  │     │  ┌───────────┐  │
│  │ 业务逻辑  │  │     │  │ 业务逻辑  │  │
│  └─────┬─────┘  │     │  └─────▲─────┘  │
└────────┼────────┘     └────────┼────────┘
         │                       │
         ▼                       │
    ┌────────┐              ┌────────┐
    │Sidecar │──────────────▶│Sidecar │
    │ Proxy  │              │ Proxy  │
    └────────┘              └────────┘
         │                       │
         └───────────┬───────────┘

              ┌──────────────┐
              │ Control Plane│
              │   (Istio)    │
              └──────────────┘

2.2 Istio 架构

核心组件:

┌──────────────────────────────────────────┐
│           Control Plane (istiod)         │
│  ┌──────────┐  ┌────────┐  ┌──────────┐ │
│  │  Pilot   │  │Citadel │  │ Galley   │ │
│  │(流量管理)│  │(安全)  │  │(配置)    │ │
│  └──────────┘  └────────┘  └──────────┘ │
└─────────────────┬────────────────────────┘
                  │ 配置下发
        ┌─────────┼─────────┐
        │         │         │
        ▼         ▼         ▼
    ┌───────┐ ┌───────┐ ┌───────┐
    │Envoy  │ │Envoy  │ │Envoy  │  Data Plane
    │Proxy  │ │Proxy  │ │Proxy  │
    └───┬───┘ └───┬───┘ └───┬───┘
        │         │         │
        ▼         ▼         ▼
    ┌───────┐ ┌───────┐ ┌───────┐
    │App A  │ │App B  │ │App C  │
    └───────┘ └───────┘ └───────┘

组件职责:

  • Pilot: 服务发现、流量管理配置
  • Citadel: 证书管理、mTLS
  • Galley: 配置验证和分发
  • Envoy Proxy: 数据平面代理

2.3 流量管理

虚拟服务 (Virtual Service)

定义路由规则,控制流量如何路由到服务。

示例:金丝雀发布 (Canary Release)

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: reviews-route
spec:
  hosts:
  - reviews  # 目标服务
  http:
  - match:
    - headers:
        user-type:
          exact: premium  # VIP 用户
    route:
    - destination:
        host: reviews
        subset: v2  # 路由到 v2 版本
      weight: 100
  - route:  # 普通用户
    - destination:
        host: reviews
        subset: v1
      weight: 90  # 90% 流量到 v1
    - destination:
        host: reviews
        subset: v2
      weight: 10  # 10% 流量到 v2 (金丝雀)

目标规则 (Destination Rule)

定义服务的子集和负载均衡策略。

apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: reviews-destination
spec:
  host: reviews
  trafficPolicy:
    loadBalancer:
      simple: LEAST_REQUEST  # 负载均衡策略
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        maxRequestsPerConnection: 2
    outlierDetection:  # 异常检测
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2
    trafficPolicy:
      loadBalancer:
        simple: ROUND_ROBIN

重试和超时

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: order-service
spec:
  hosts:
  - order-service
  http:
  - route:
    - destination:
        host: order-service
    timeout: 3s  # 请求超时
    retries:
      attempts: 3  # 重试次数
      perTryTimeout: 1s  # 每次重试超时
      retryOn: 5xx,reset,connect-failure  # 重试条件

2.4 可观测性

分布式追踪

Jaeger 集成示例:

请求链路追踪:

客户端请求

   ▼ Trace ID: abc123
┌──────────────┐
│ API Gateway  │ Span 1: gateway (200ms)
└──────┬───────┘

       ├─► ┌──────────────┐
       │   │ User Service │ Span 2: user-lookup (50ms)
       │   └──────────────┘

       └─► ┌──────────────┐
           │Order Service │ Span 3: order-create (100ms)
           └──────┬───────┘

                  └─► ┌──────────────┐
                      │Payment Svc   │ Span 4: payment (80ms)
                      └──────────────┘

总耗时: 230ms (有并行调用)

自动追踪配置: Istio 自动注入追踪头并收集 span 数据,应用只需传播 headers。

// Express 中间件传播追踪头
const propagateTracingHeaders = (req, res, next) => {
  const tracingHeaders = [
    'x-request-id',
    'x-b3-traceid',
    'x-b3-spanid',
    'x-b3-parentspanid',
    'x-b3-sampled',
    'x-b3-flags'
  ];

  req.tracingHeaders = {};
  tracingHeaders.forEach(header => {
    if (req.headers[header]) {
      req.tracingHeaders[header] = req.headers[header];
    }
  });

  next();
};

// 调用下游服务时传播 headers
app.get('/api/orders', propagateTracingHeaders, async (req, res) => {
  const response = await fetch('http://payment-service/pay', {
    headers: req.tracingHeaders
  });
  // ...
});

Metrics 收集

Istio 自动收集四大黄金信号:

  1. 延迟 (Latency): 请求响应时间
  2. 流量 (Traffic): 请求速率 (QPS)
  3. 错误 (Errors): 错误率
  4. 饱和度 (Saturation): 资源使用率

Prometheus 查询示例:

# 服务请求速率
rate(istio_requests_total{destination_service="order-service"}[1m])

# P99 延迟
histogram_quantile(0.99,
  rate(istio_request_duration_milliseconds_bucket[1m])
)

# 错误率
rate(istio_requests_total{response_code=~"5.."}[1m])
/
rate(istio_requests_total[1m])

第三部分:分布式事务与一致性

3.1 分布式事务的挑战

ACID 在分布式系统中的问题:

  • Atomicity: 跨多个服务的原子性难以保证
  • Consistency: 不同服务有独立数据库
  • Isolation: 分布式锁成本高
  • Durability: 需要复杂的协调

示例场景:电商下单

下单流程涉及多个服务:
1. 订单服务:创建订单
2. 库存服务:扣减库存
3. 支付服务:扣款
4. 积分服务:增加积分

要求:要么全部成功,要么全部失败

3.2 两阶段提交 (2PC)

协议流程:

协调者 (Coordinator)          参与者 (Participants)
      │                   订单服务  库存服务  支付服务
      │                      │        │        │
      ├──── Prepare ─────────┼────────┼────────┤
      │                      │        │        │
      │                   [准备]   [准备]   [准备]
      │                      │        │        │
      │◄──── Vote-Yes ───────┤        │        │
      │◄──── Vote-Yes ────────────────┤        │
      │◄──── Vote-Yes ─────────────────────────┤
      │                      │        │        │
      ├──── Commit ──────────┼────────┼────────┤
      │                      │        │        │
      │                   [提交]   [提交]   [提交]
      │                      │        │        │
      │◄──── ACK ────────────┤        │        │
      │◄──── ACK ─────────────────────┤        │
      │◄──── ACK ──────────────────────────────┤

实现示例 (简化版):

class TwoPhaseCommitCoordinator {
  constructor(participants) {
    this.participants = participants; // [{id, prepare, commit, abort}]
  }

  async execute(transaction) {
    const votes = [];

    // Phase 1: Prepare
    console.log('Phase 1: Prepare');
    for (const participant of this.participants) {
      try {
        const vote = await participant.prepare(transaction);
        votes.push({ participant, vote });

        if (vote !== 'YES') {
          console.log(`${participant.id} voted NO`);
          await this.abortAll(votes);
          return { status: 'ABORTED', reason: 'Participant voted NO' };
        }
      } catch (error) {
        console.log(`${participant.id} prepare failed:`, error.message);
        await this.abortAll(votes);
        return { status: 'ABORTED', reason: error.message };
      }
    }

    // Phase 2: Commit
    console.log('Phase 2: Commit');
    const commitResults = [];
    for (const { participant } of votes) {
      try {
        await participant.commit(transaction);
        commitResults.push({ participant: participant.id, status: 'COMMITTED' });
      } catch (error) {
        // 这里出现问题就麻烦了 - 需要重试或人工介入
        console.error(`${participant.id} commit failed:`, error.message);
        commitResults.push({ participant: participant.id, status: 'FAILED' });
      }
    }

    return { status: 'COMMITTED', results: commitResults };
  }

  async abortAll(votes) {
    for (const { participant } of votes) {
      try {
        await participant.abort();
      } catch (error) {
        console.error(`${participant.id} abort failed:`, error.message);
      }
    }
  }
}

// 参与者实现示例 - 库存服务
class InventoryServiceParticipant {
  constructor() {
    this.id = 'inventory-service';
    this.prepared = new Map(); // 存储准备状态
  }

  async prepare(transaction) {
    const { orderId, items } = transaction;

    // 检查库存
    for (const item of items) {
      const stock = await this.checkStock(item.productId);
      if (stock < item.quantity) {
        return 'NO'; // 库存不足,投票拒绝
      }
    }

    // 锁定库存 (但不真正扣减)
    await this.lockInventory(orderId, items);
    this.prepared.set(orderId, items);

    return 'YES';
  }

  async commit(transaction) {
    const { orderId } = transaction;
    const items = this.prepared.get(orderId);

    if (!items) {
      throw new Error('Transaction not prepared');
    }

    // 真正扣减库存
    await this.deductInventory(items);
    this.prepared.delete(orderId);
  }

  async abort(transaction) {
    const { orderId } = transaction;
    const items = this.prepared.get(orderId);

    if (items) {
      // 释放锁定的库存
      await this.releaseInventory(orderId);
      this.prepared.delete(orderId);
    }
  }

  // ... 实际的数据库操作方法
}

2PC 的问题:

  1. 阻塞问题: 参与者在 prepare 后会阻塞等待协调者的决定
  2. 单点故障: 协调者故障导致参与者无限期阻塞
  3. 性能问题: 两个阶段的网络往返,延迟高
  4. 数据一致性: Commit 阶段失败可能导致不一致

3.3 Saga 模式

Saga 将长事务拆分为多个本地事务,每个本地事务有对应的补偿操作。

核心思想:

  • 每个服务执行本地事务并发布事件
  • 如果某个步骤失败,执行补偿事务回滚之前的操作

编排式 Saga (Choreography)

流程示例:

成功流程:
订单服务          库存服务         支付服务         积分服务
   │                │               │               │
   ├─创建订单────────┤               │               │
   │                │               │               │
   │         ◄──订单已创建事件       │               │
   │                │               │               │
   │                ├─扣减库存───────┤               │
   │                │               │               │
   │         ◄──库存已扣减事件       │               │
   │                │               │               │
   │                │               ├─处理支付───────┤
   │                │               │               │
   │         ◄──支付已完成事件       │               │
   │                │               │               │
   │                │               │         ┌─增加积分
   │                │               │         │
   │         ◄──积分已增加事件───────────────────┘

失败流程 (支付失败):
订单服务          库存服务         支付服务
   │                │               │
   ├─创建订单────────┤               │
   │         ◄──订单已创建           │
   │                ├─扣减库存───────┤
   │         ◄──库存已扣减           │
   │                │               ├─处理支付
   │                │               │  (失败)
   │         ◄──支付失败事件─────────┤
   │                │               │
   │                ├─补偿:归还库存 │
   │                │               │
   ├─补偿:取消订单  │               │

实现示例 (基于事件总线):

// 事件总线 (使用 RabbitMQ/Kafka)
class EventBus {
  constructor() {
    this.handlers = new Map();
  }

  subscribe(eventType, handler) {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType).push(handler);
  }

  async publish(event) {
    const handlers = this.handlers.get(event.type) || [];
    for (const handler of handlers) {
      try {
        await handler(event);
      } catch (error) {
        console.error(`Handler failed for ${event.type}:`, error);
      }
    }
  }
}

const eventBus = new EventBus();

// 订单服务
class OrderService {
  async createOrder(orderData) {
    // 执行本地事务
    const order = await this.db.orders.create({
      ...orderData,
      status: 'PENDING'
    });

    // 发布事件
    await eventBus.publish({
      type: 'ORDER_CREATED',
      data: {
        orderId: order.id,
        items: order.items,
        userId: order.userId
      }
    });

    return order;
  }

  async cancelOrder(orderId) {
    await this.db.orders.update(orderId, { status: 'CANCELLED' });

    await eventBus.publish({
      type: 'ORDER_CANCELLED',
      data: { orderId }
    });
  }
}

// 库存服务
class InventoryService {
  constructor() {
    // 监听订单创建事件
    eventBus.subscribe('ORDER_CREATED', this.handleOrderCreated.bind(this));
    eventBus.subscribe('PAYMENT_FAILED', this.handlePaymentFailed.bind(this));
  }

  async handleOrderCreated(event) {
    const { orderId, items } = event.data;

    try {
      // 扣减库存
      await this.deductInventory(items);

      await eventBus.publish({
        type: 'INVENTORY_DEDUCTED',
        data: { orderId, items }
      });
    } catch (error) {
      // 库存不足,发布失败事件
      await eventBus.publish({
        type: 'INVENTORY_DEDUCTION_FAILED',
        data: { orderId, reason: error.message }
      });
    }
  }

  async handlePaymentFailed(event) {
    const { orderId, items } = event.data;

    // 补偿操作:归还库存
    await this.restoreInventory(items);

    await eventBus.publish({
      type: 'INVENTORY_RESTORED',
      data: { orderId }
    });
  }

  async deductInventory(items) {
    for (const item of items) {
      await this.db.inventory.decrement(item.productId, item.quantity);
    }
  }

  async restoreInventory(items) {
    for (const item of items) {
      await this.db.inventory.increment(item.productId, item.quantity);
    }
  }
}

// 支付服务
class PaymentService {
  constructor() {
    eventBus.subscribe('INVENTORY_DEDUCTED', this.handleInventoryDeducted.bind(this));
  }

  async handleInventoryDeducted(event) {
    const { orderId } = event.data;

    try {
      // 处理支付
      const payment = await this.processPayment(orderId);

      await eventBus.publish({
        type: 'PAYMENT_COMPLETED',
        data: { orderId, paymentId: payment.id }
      });
    } catch (error) {
      await eventBus.publish({
        type: 'PAYMENT_FAILED',
        data: { orderId, reason: error.message, ...event.data }
      });
    }
  }
}

编排式 Saga (Orchestration)

使用中央协调器控制流程。

class SagaOrchestrator {
  constructor() {
    this.steps = [];
  }

  addStep(name, action, compensation) {
    this.steps.push({ name, action, compensation });
    return this;
  }

  async execute(context) {
    const executedSteps = [];

    try {
      // 正向执行所有步骤
      for (const step of this.steps) {
        console.log(`Executing step: ${step.name}`);
        const result = await step.action(context);
        executedSteps.push({ step, result });
        context[step.name] = result; // 保存结果供后续步骤使用
      }

      return { status: 'SUCCESS', data: context };

    } catch (error) {
      console.log(`Step failed: ${error.message}`);
      console.log('Starting compensation...');

      // 反向执行补偿操作
      for (let i = executedSteps.length - 1; i >= 0; i--) {
        const { step, result } = executedSteps[i];
        try {
          console.log(`Compensating step: ${step.name}`);
          await step.compensation(context, result);
        } catch (compError) {
          console.error(`Compensation failed for ${step.name}:`, compError);
          // 补偿失败需要记录并可能需要人工介入
        }
      }

      return { status: 'FAILED', error: error.message };
    }
  }
}

// 使用示例:订单 Saga
const createOrderSaga = () => {
  const saga = new SagaOrchestrator();

  saga.addStep(
    'createOrder',
    async (ctx) => {
      const order = await orderService.create(ctx.orderData);
      return order;
    },
    async (ctx, order) => {
      await orderService.cancel(order.id);
    }
  );

  saga.addStep(
    'reserveInventory',
    async (ctx) => {
      const reservation = await inventoryService.reserve(ctx.orderData.items);
      return reservation;
    },
    async (ctx, reservation) => {
      await inventoryService.release(reservation.id);
    }
  );

  saga.addStep(
    'processPayment',
    async (ctx) => {
      const payment = await paymentService.charge({
        amount: ctx.createOrder.totalAmount,
        userId: ctx.orderData.userId
      });
      return payment;
    },
    async (ctx, payment) => {
      await paymentService.refund(payment.id);
    }
  );

  saga.addStep(
    'confirmInventory',
    async (ctx) => {
      await inventoryService.confirm(ctx.reserveInventory.id);
    },
    async (ctx) => {
      // 如果确认失败,前面的补偿已经释放了库存
    }
  );

  return saga;
};

// 执行 Saga
const orderSaga = createOrderSaga();
const result = await orderSaga.execute({
  orderData: {
    userId: 'user123',
    items: [
      { productId: 'prod1', quantity: 2, price: 100 },
      { productId: 'prod2', quantity: 1, price: 50 }
    ]
  }
});

console.log(result);

Saga 模式对比:

特性编排式 (Choreography)编排式 (Orchestration)
耦合度低 - 服务通过事件通信高 - 依赖中央协调器
复杂度高 - 逻辑分散低 - 集中管理
可观测性难 - 需要追踪事件链易 - 协调器掌握全局
单点故障协调器是单点
适用场景简单流程、松耦合复杂流程、需要集中控制

第四部分:CAP 理论与最终一致性

4.1 CAP 定理

定理内容: 分布式系统最多只能同时满足以下三项中的两项:

        Consistency (一致性)

             ╱ ╲
            ╱   ╲
           ╱     ╲
          ╱       ╲
         ╱    CA   ╲
        ╱           ╲
       ╱             ╲
      ╱       P       ╲
     ╱    (网络分区)   ╲
    ╱                   ╲
   ╱_____________________ ╲
  Partition Tolerance     Availability
    (分区容错性)            (可用性)

CP: 一致性 + 分区容错性 (牺牲可用性)
AP: 可用性 + 分区容错性 (牺牲一致性)
CA: 一致性 + 可用性 (不能容忍分区,实际不存在)

详细解释:

  1. Consistency (一致性): 所有节点在同一时间看到相同的数据

    写入 A=1
    ┌────┐    ┌────┐    ┌────┐
    │ N1 │───▶│ N2 │───▶│ N3 │
    │A=1 │    │A=1 │    │A=1 │
    └────┘    └────┘    └────┘
    
    读取任何节点都返回 A=1
  2. Availability (可用性): 每个请求都能得到响应 (成功或失败)

    即使某些节点故障,系统仍能响应请求
    ┌────┐    ┌────┐    ┌────┐
    │ N1 │    │ N2 │    │ N3 │
    │可用│    │故障│    │可用│
    └────┘    └────┘    └────┘
         ╲              ╱
          ╲            ╱
           ▼          ▼
           仍能处理请求
  3. Partition Tolerance (分区容错性): 系统在网络分区时仍能继续运行

    网络分区
    ┌────┐    ╱╱╱╱╱    ┌────┐
    │ N1 │    断开     │ N2 │
    │A=1 │            │A=? │
    └────┘            └────┘
    
    系统能容忍网络分区并继续运作

4.2 实际系统的 CAP 选择

CP 系统示例:HBase, MongoDB(强一致性配置)

特征:

  • 网络分区时,少数派节点拒绝服务
  • 保证数据一致性
// ZooKeeper (CP系统) 示例
class ZooKeeperClient {
  async write(path, data) {
    // 写入需要多数派同意
    const nodes = this.getAllNodes();
    const majority = Math.floor(nodes.length / 2) + 1;

    const acks = await Promise.all(
      nodes.map(node => node.write(path, data))
    );

    const successCount = acks.filter(ack => ack.success).length;

    if (successCount >= majority) {
      return { success: true };
    } else {
      // 未达到多数派,写入失败
      throw new Error('Cannot reach majority');
    }
  }

  async read(path) {
    // 从 leader 读取,保证一致性
    const leader = await this.getLeader();
    return leader.read(path);
  }
}

AP 系统示例:Cassandra, DynamoDB

特征:

  • 网络分区时,所有节点仍可服务
  • 牺牲强一致性,提供最终一致性
// Cassandra 风格的 AP 系统
class CassandraClient {
  async write(key, value, options = {}) {
    const { consistencyLevel = 'ONE' } = options;
    const nodes = this.getReplicaNodes(key);

    // 异步写入多个副本
    const writePromises = nodes.map(node =>
      node.write(key, value).catch(err => ({ error: err }))
    );

    // 根据一致性级别决定等待多少个响应
    const requiredAcks = this.getRequiredAcks(consistencyLevel, nodes.length);

    // 等待足够数量的成功响应
    let successCount = 0;
    for (const promise of writePromises) {
      const result = await promise;
      if (!result.error) {
        successCount++;
        if (successCount >= requiredAcks) {
          // 已达到要求,立即返回
          return { success: true };
        }
      }
    }

    // 即使部分节点失败,只要达到要求就成功
    return { success: successCount >= requiredAcks };
  }

  getRequiredAcks(level, totalNodes) {
    switch (level) {
      case 'ONE': return 1;  // AP: 任意一个节点成功即可
      case 'QUORUM': return Math.floor(totalNodes / 2) + 1;  // 中间状态
      case 'ALL': return totalNodes;  // CP: 所有节点都要成功
    }
  }

  async read(key, options = {}) {
    const { consistencyLevel = 'ONE' } = options;
    const nodes = this.getReplicaNodes(key);

    if (consistencyLevel === 'ONE') {
      // 从任意可用节点读取 (可能读到旧数据)
      for (const node of nodes) {
        try {
          return await node.read(key);
        } catch (err) {
          continue; // 尝试下一个节点
        }
      }
    } else if (consistencyLevel === 'QUORUM') {
      // 从多数派读取,并返回最新版本
      const results = await Promise.allSettled(
        nodes.map(node => node.read(key))
      );

      const values = results
        .filter(r => r.status === 'fulfilled')
        .map(r => r.value);

      // 返回版本号最高的值
      return values.reduce((latest, current) =>
        current.version > latest.version ? current : latest
      );
    }
  }
}

4.3 最终一致性 (Eventual Consistency)

定义: 如果没有新的更新,所有副本最终会达到一致状态。

一致性级别金字塔:

     强一致性 (Linearizability)


    顺序一致性 (Sequential)

    因果一致性 (Causal)

    会话一致性 (Session)

   最终一致性 (Eventual)

向量时钟 (Vector Clock)

用于检测并发写入和解决冲突。

class VectorClock {
  constructor(nodeId) {
    this.nodeId = nodeId;
    this.clock = {};  // {nodeId: counter}
  }

  // 增加本地时钟
  increment() {
    this.clock[this.nodeId] = (this.clock[this.nodeId] || 0) + 1;
  }

  // 合并其他节点的时钟
  merge(otherClock) {
    for (const [nodeId, counter] of Object.entries(otherClock)) {
      this.clock[nodeId] = Math.max(
        this.clock[nodeId] || 0,
        counter
      );
    }
  }

  // 比较两个向量时钟
  compare(other) {
    let thisGreater = false;
    let otherGreater = false;

    const allNodes = new Set([
      ...Object.keys(this.clock),
      ...Object.keys(other)
    ]);

    for (const nodeId of allNodes) {
      const thisVal = this.clock[nodeId] || 0;
      const otherVal = other[nodeId] || 0;

      if (thisVal > otherVal) thisGreater = true;
      if (otherVal > thisVal) otherGreater = true;
    }

    if (thisGreater && !otherGreater) return 'AFTER';   // this 发生在 other 之后
    if (otherGreater && !thisGreater) return 'BEFORE';  // this 发生在 other 之前
    if (!thisGreater && !otherGreater) return 'EQUAL';  // 相同
    return 'CONCURRENT';  // 并发冲突
  }
}

// 使用示例
class DistributedKVStore {
  constructor(nodeId) {
    this.nodeId = nodeId;
    this.data = new Map();  // key -> {value, vectorClock}
  }

  write(key, value) {
    const existing = this.data.get(key);
    const clock = new VectorClock(this.nodeId);

    if (existing) {
      // 基于已有数据的向量时钟
      clock.merge(existing.vectorClock.clock);
    }

    clock.increment();

    this.data.set(key, {
      value,
      vectorClock: clock
    });

    // 异步同步到其他节点
    this.replicateToOtherNodes(key, value, clock);
  }

  handleReplication(key, value, remoteClock) {
    const existing = this.data.get(key);

    if (!existing) {
      // 本地没有数据,直接写入
      this.data.set(key, {
        value,
        vectorClock: new VectorClock(this.nodeId)
      });
      this.data.get(key).vectorClock.clock = remoteClock;
      return;
    }

    const comparison = existing.vectorClock.compare(remoteClock);

    switch (comparison) {
      case 'BEFORE':
        // 远程数据更新,覆盖本地
        this.data.set(key, {
          value,
          vectorClock: new VectorClock(this.nodeId)
        });
        this.data.get(key).vectorClock.clock = remoteClock;
        break;

      case 'AFTER':
        // 本地数据更新,忽略远程数据
        break;

      case 'CONCURRENT':
        // 冲突!需要解决
        const resolved = this.resolveConflict(existing.value, value);
        const mergedClock = new VectorClock(this.nodeId);
        mergedClock.merge(existing.vectorClock.clock);
        mergedClock.merge(remoteClock);
        mergedClock.increment();

        this.data.set(key, {
          value: resolved,
          vectorClock: mergedClock
        });
        break;
    }
  }

  resolveConflict(value1, value2) {
    // 冲突解决策略
    // 1. Last-Write-Wins (LWW) - 使用时间戳
    // 2. 应用层合并 (如购物车合并所有商品)
    // 3. 保留所有版本让用户选择

    // 示例:购物车合并
    if (Array.isArray(value1) && Array.isArray(value2)) {
      return [...new Set([...value1, ...value2])];
    }

    // 默认:保留两个版本
    return [value1, value2];
  }
}

4.4 Quorum 机制

读写 Quorum 保证数据一致性。

公式: W + R > N

  • N: 副本总数
  • W: 写入时需要确认的副本数
  • R: 读取时需要查询的副本数
示例:N=3, W=2, R=2

写入流程:
客户端写 value=X, version=5

   ├──► Node1 (成功) ✓
   ├──► Node2 (成功) ✓
   └──► Node3 (失败) ✗

W=2 已满足,写入成功

读取流程:
客户端读取

   ├──► Node1: {value=X, version=5}
   ├──► Node2: {value=X, version=5}
   └──► Node3: {value=Y, version=3} (旧数据)

取最新版本: value=X, version=5

同时修复 Node3 的数据 (Read Repair)

实现:

class QuorumStore {
  constructor(nodes, N, W, R) {
    this.nodes = nodes;
    this.N = N;  // 总副本数
    this.W = W;  // 写 Quorum
    this.R = R;  // 读 Quorum
  }

  async write(key, value) {
    const version = Date.now(); // 简单版本号
    const writePromises = this.nodes.map(node =>
      node.write(key, { value, version })
        .then(() => ({ success: true, node }))
        .catch(err => ({ success: false, node, error: err }))
    );

    let successCount = 0;
    const results = [];

    for (const promise of writePromises) {
      const result = await promise;
      results.push(result);

      if (result.success) {
        successCount++;
        if (successCount >= this.W) {
          // 达到写 Quorum
          return { success: true, version };
        }
      }
    }

    throw new Error(`Write failed: only ${successCount}/${this.W} nodes confirmed`);
  }

  async read(key) {
    const readPromises = this.nodes.map(node =>
      node.read(key)
        .then(data => ({ success: true, data, node }))
        .catch(err => ({ success: false, node, error: err }))
    );

    const results = [];
    let successCount = 0;

    for (const promise of readPromises) {
      const result = await promise;

      if (result.success) {
        results.push(result);
        successCount++;

        if (successCount >= this.R) {
          // 达到读 Quorum,返回最新版本
          const latest = results.reduce((newest, current) =>
            current.data.version > newest.data.version ? current : newest
          );

          // Read Repair: 修复旧数据
          this.readRepair(key, latest.data, results);

          return latest.data.value;
        }
      }
    }

    throw new Error(`Read failed: only ${successCount}/${this.R} nodes responded`);
  }

  async readRepair(key, latestData, results) {
    for (const result of results) {
      if (result.success && result.data.version < latestData.version) {
        // 异步修复旧数据
        result.node.write(key, latestData).catch(err =>
          console.error('Read repair failed:', err)
        );
      }
    }
  }
}

// 配置示例
const nodes = [node1, node2, node3, node4, node5];
const store = new QuorumStore(nodes, 5, 3, 3);  // N=5, W=3, R=3

// W + R = 6 > N = 5,保证强一致性

不同配置的权衡:

配置一致性可用性性能场景
W=N, R=1写慢读快读多写少
W=1, R=N写快读慢写多读少
W=Q, R=Q (Q>N/2)平衡通用场景
W=1, R=1可容忍不一致

第五部分:综合实践案例

案例:构建高可用的订单系统

需求:

  • 支持每秒 10,000 笔订单
  • 99.99% 可用性
  • 跨地域部署
  • 最终一致性可接受

架构设计:

                    用户请求


              ┌────────────────┐
              │   API Gateway  │
              │  (限流/认证)   │
              └────────┬───────┘

         ┌─────────────┼─────────────┐
         │             │             │
         ▼             ▼             ▼
    ┌────────┐   ┌─────────┐   ┌────────┐
    │订单服务 │   │库存服务 │   │支付服务│
    │(AP系统) │   │(CP系统) │   │(CP系统)│
    └────┬───┘   └────┬────┘   └────┬───┘
         │            │             │
         ▼            ▼             ▼
    ┌────────┐   ┌─────────┐   ┌────────┐
    │Cassandra   │  Redis   │   │ MySQL  │
    │(最终一致)  │(强一致)  │  │(强一致)│
    └────────┘   └─────────┘   └────────┘
         │            │             │
         └────────────┼─────────────┘


            ┌─────────────────┐
            │  Kafka (事件流)  │
            └─────────────────┘

实现要点:

// 1. 订单服务 - 使用 Saga 协调
class OrderService {
  async createOrder(orderRequest) {
    // 生成幂等性 key
    const idempotencyKey = this.generateIdempotencyKey(orderRequest);

    // 检查幂等性
    const existing = await this.checkIdempotency(idempotencyKey);
    if (existing) {
      return existing;
    }

    // 创建订单 Saga
    const saga = new SagaOrchestrator();

    saga.addStep(
      'createOrder',
      async (ctx) => {
        // 写入订单到 Cassandra (AP)
        const order = await this.orderRepository.create({
          ...orderRequest,
          status: 'PENDING',
          idempotencyKey
        });
        return order;
      },
      async (ctx, order) => {
        await this.orderRepository.updateStatus(order.id, 'CANCELLED');
      }
    );

    saga.addStep(
      'checkInventory',
      async (ctx) => {
        // 调用库存服务 (CP系统,强一致性)
        const result = await this.inventoryService.reserve({
          orderId: ctx.createOrder.id,
          items: orderRequest.items
        });

        if (!result.success) {
          throw new Error('Insufficient inventory');
        }

        return result;
      },
      async (ctx, reservation) => {
        await this.inventoryService.release(reservation.id);
      }
    );

    saga.addStep(
      'processPayment',
      async (ctx) => {
        const payment = await this.paymentService.charge({
          orderId: ctx.createOrder.id,
          amount: ctx.createOrder.totalAmount,
          userId: orderRequest.userId
        });
        return payment;
      },
      async (ctx, payment) => {
        await this.paymentService.refund(payment.id);
      }
    );

    saga.addStep(
      'confirmOrder',
      async (ctx) => {
        // 确认订单 - 异步更新状态
        await this.orderRepository.updateStatus(
          ctx.createOrder.id,
          'CONFIRMED'
        );

        // 发布订单确认事件
        await this.eventBus.publish({
          type: 'ORDER_CONFIRMED',
          data: ctx.createOrder
        });
      },
      async (ctx) => {
        // 已经在前面的补偿中取消订单
      }
    );

    const result = await saga.execute({ orderRequest });

    if (result.status === 'SUCCESS') {
      // 缓存结果用于幂等性检查
      await this.cacheIdempotencyResult(idempotencyKey, result.data);
    }

    return result;
  }

  generateIdempotencyKey(orderRequest) {
    // 基于用户ID + 时间窗口 + 订单内容生成
    const hash = crypto
      .createHash('sha256')
      .update(JSON.stringify(orderRequest))
      .digest('hex');
    return `order:${orderRequest.userId}:${hash}`;
  }
}

// 2. 库存服务 - 强一致性
class InventoryService {
  async reserve(request) {
    const { orderId, items } = request;

    // 使用分布式锁保证一致性
    const lock = await this.redlock.lock(
      `inventory:lock:${items.map(i => i.productId).join(':')}`,
      1000 // 1秒超时
    );

    try {
      // 在事务中检查和扣减库存
      const result = await this.db.transaction(async (trx) => {
        for (const item of items) {
          const stock = await trx('inventory')
            .where('product_id', item.productId)
            .forUpdate()  // 行锁
            .first();

          if (!stock || stock.available < item.quantity) {
            throw new Error(`Insufficient stock for ${item.productId}`);
          }

          // 扣减可用库存,增加保留库存
          await trx('inventory')
            .where('product_id', item.productId)
            .update({
              available: stock.available - item.quantity,
              reserved: stock.reserved + item.quantity
            });

          // 记录保留
          await trx('inventory_reservations').insert({
            order_id: orderId,
            product_id: item.productId,
            quantity: item.quantity,
            expires_at: new Date(Date.now() + 15 * 60 * 1000) // 15分钟后过期
          });
        }

        return { success: true, orderId };
      });

      return result;

    } finally {
      await lock.unlock();
    }
  }

  // 定时任务:释放过期的库存保留
  async releaseExpiredReservations() {
    const expired = await this.db('inventory_reservations')
      .where('expires_at', '<', new Date())
      .where('status', 'RESERVED');

    for (const reservation of expired) {
      await this.release(reservation.order_id);
    }
  }
}

// 3. 可观测性:分布式追踪
class TracingMiddleware {
  constructor(serviceName) {
    this.serviceName = serviceName;
  }

  middleware() {
    return async (req, res, next) => {
      const traceId = req.headers['x-trace-id'] || this.generateTraceId();
      const spanId = this.generateSpanId();
      const parentSpanId = req.headers['x-span-id'];

      // 创建 span
      const span = {
        traceId,
        spanId,
        parentSpanId,
        serviceName: this.serviceName,
        operationName: `${req.method} ${req.path}`,
        startTime: Date.now()
      };

      // 注入到请求上下文
      req.trace = { traceId, spanId };

      // 响应完成时记录 span
      res.on('finish', () => {
        span.endTime = Date.now();
        span.duration = span.endTime - span.startTime;
        span.statusCode = res.statusCode;

        // 发送到 Jaeger/Zipkin
        this.sendSpan(span);
      });

      // 传播追踪头
      res.setHeader('x-trace-id', traceId);

      next();
    };
  }

  generateTraceId() {
    return crypto.randomBytes(16).toString('hex');
  }

  generateSpanId() {
    return crypto.randomBytes(8).toString('hex');
  }
}

总结与最佳实践

微服务架构关键要点

  1. 合理拆分: 基于业务领域,不要过度拆分
  2. 容错设计: 使用断路器、超时、重试
  3. 服务发现: 使用注册中心实现动态服务发现
  4. API 网关: 统一入口,处理横切关注点

服务网格优势

  1. 流量管理: 灰度发布、流量分割无需修改代码
  2. 可观测性: 自动追踪、指标收集
  3. 安全: mTLS 加密服务间通信

分布式事务选择

  1. 2PC: 强一致性,但性能差,避免使用
  2. Saga: 推荐用于微服务,编排式更易管理
  3. 最终一致性: 大多数场景可接受

CAP 权衡

  1. 关键数据: 选择 CP (如库存、支付)
  2. 非关键数据: 选择 AP (如用户浏览历史)
  3. Quorum: 灵活调整一致性级别

推荐学习路径

  1. 阅读 DDIA 第 5-9 章 (2-3 周)
  2. 动手搭建微服务 Demo (1-2 周)
  3. 学习 Istio 并实践 (2 周)
  4. 实现 Saga 模式 (1 周)
  5. 研究实际案例 (持续)

参考资料: