分布式系统架构深度指南
分布式系统架构深度指南
第一部分:微服务架构设计模式
1.1 微服务架构基础
什么是微服务?
微服务是一种架构风格,将单一应用程序开发为一组小型服务,每个服务运行在自己的进程中,使用轻量级机制(通常是HTTP RESTful API)进行通信。
核心特征:
- 独立部署: 每个服务可以独立部署,不影响其他服务
- 业务能力: 围绕业务能力组织服务
- 去中心化: 数据管理去中心化,每个服务管理自己的数据
- 容错设计: 服务必须能够容忍其他服务的失败
单体架构 vs 微服务架构
单体架构:
┌─────────────────────────────────┐
│ Monolithic Application │
│ ┌───────┐ ┌────────┐ ┌──────┐ │
│ │ UI │ │Business│ │ Data │ │
│ │ Layer │ │ Logic │ │Access│ │
│ └───────┘ └────────┘ └──────┘ │
└─────────────────────────────────┘
│
┌────▼────┐
│Database │
└─────────┘
微服务架构:
┌─────────┐ ┌─────────┐ ┌─────────┐
│Service A│ │Service B│ │Service C│
│ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │
│ │Logic│ │ │ │Logic│ │ │ │Logic│ │
│ └──┬──┘ │ │ └──┬──┘ │ │ └──┬──┘ │
└────┼────┘ └────┼────┘ └────┼────┘
│ │ │
┌──▼──┐ ┌──▼──┐ ┌──▼──┐
│DB-A │ │DB-B │ │DB-C │
└─────┘ └─────┘ └─────┘
1.2 服务拆分原则
领域驱动设计 (DDD) 拆分
限界上下文 (Bounded Context):
- 每个微服务代表一个限界上下文
- 上下文内有自己的领域模型和通用语言
示例:电商系统拆分
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 用户服务 │ │ 订单服务 │ │ 库存服务 │
├──────────────┤ ├──────────────┤ ├──────────────┤
│- 用户注册 │ │- 创建订单 │ │- 库存查询 │
│- 用户认证 │ │- 订单查询 │ │- 库存扣减 │
│- 用户信息 │ │- 订单状态 │ │- 库存归还 │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│User DB│ │Order │ │Stock │
└───────┘ │ DB │ │ DB │
└───────┘ └───────┘
拆分原则:
- 单一职责原则: 一个服务只负责一个业务领域
- 高内聚低耦合: 相关功能聚集,减少服务间依赖
- 数据独立性: 每个服务拥有独立的数据库
- 业务能力: 按业务能力而非技术层次拆分
1.3 核心设计模式
Pattern 1: API Gateway 模式
问题: 客户端需要调用多个微服务,导致复杂性增加
解决方案: 提供单一入口点,路由请求到对应的微服务
客户端请求流程:
客户端
│
▼
┌─────────────────┐
│ API Gateway │
│ ┌───────────┐ │
│ │ 认证/授权 │ │
│ ├───────────┤ │
│ │ 路由 │ │
│ ├───────────┤ │
│ │ 限流 │ │
│ ├───────────┤ │
│ │ 聚合 │ │
│ └───────────┘ │
└────┬─────┬─────┬┘
│ │ │
▼ ▼ ▼
服务A 服务B 服务C
实现示例 (Node.js + Express):
const express = require('express');
const { createProxyMiddleware } = require('http-proxy-middleware');
const app = express();
// 认证中间件
const authenticate = (req, res, next) => {
const token = req.headers.authorization;
if (!token) {
return res.status(401).json({ error: 'Unauthorized' });
}
// 验证 token
next();
};
// 路由到用户服务
app.use('/api/users', authenticate, createProxyMiddleware({
target: 'http://user-service:3001',
changeOrigin: true
}));
// 路由到订单服务
app.use('/api/orders', authenticate, createProxyMiddleware({
target: 'http://order-service:3002',
changeOrigin: true
}));
// 聚合多个服务的数据
app.get('/api/user-dashboard/:userId', authenticate, async (req, res) => {
const userId = req.params.userId;
try {
// 并行调用多个服务
const [userInfo, orders, recommendations] = await Promise.all([
fetch(`http://user-service:3001/users/${userId}`),
fetch(`http://order-service:3002/orders?userId=${userId}`),
fetch(`http://recommendation-service:3003/recommendations/${userId}`)
]);
res.json({
user: await userInfo.json(),
recentOrders: await orders.json(),
recommendations: await recommendations.json()
});
} catch (error) {
res.status(500).json({ error: 'Service unavailable' });
}
});
app.listen(3000);
优势:
- 简化客户端调用
- 减少网络往返次数
- 集中处理横切关注点(认证、日志、限流)
劣势:
- 可能成为单点故障
- 增加延迟
- 需要额外维护
Pattern 2: 断路器模式 (Circuit Breaker)
问题: 服务故障可能导致级联失败和资源耗尽
原理: 类似电路断路器,当错误率超过阈值时自动切断请求
状态转换:
初始状态: CLOSED (正常)
│
│ 错误率 > 阈值
▼
OPEN (断开)
│
│ 超时后
▼
HALF_OPEN (半开)
│
├─► 成功 ─► CLOSED
│
└─► 失败 ─► OPEN
实现示例 (JavaScript):
class CircuitBreaker {
constructor(request, options = {}) {
this.request = request;
this.state = 'CLOSED';
this.failureCount = 0;
this.successCount = 0;
this.nextAttempt = Date.now();
this.failureThreshold = options.failureThreshold || 5;
this.successThreshold = options.successThreshold || 2;
this.timeout = options.timeout || 60000; // 60秒
}
async call(...args) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
// 进入半开状态
this.state = 'HALF_OPEN';
}
try {
const response = await this.request(...args);
return this.onSuccess(response);
} catch (error) {
return this.onFailure(error);
}
}
onSuccess(response) {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= this.successThreshold) {
this.state = 'CLOSED';
this.successCount = 0;
}
}
return response;
}
onFailure(error) {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
}
throw error;
}
getState() {
return this.state;
}
}
// 使用示例
const fetchUserData = async (userId) => {
const response = await fetch(`http://user-service/users/${userId}`);
if (!response.ok) throw new Error('Service error');
return response.json();
};
const breaker = new CircuitBreaker(fetchUserData, {
failureThreshold: 3,
timeout: 30000
});
// 调用服务
try {
const userData = await breaker.call('user123');
console.log(userData);
} catch (error) {
console.log('服务不可用,返回降级数据');
// 返回缓存或默认数据
}
Pattern 3: 服务发现
问题: 微服务实例动态变化,如何找到服务地址?
解决方案: 使用服务注册中心
架构:
┌────────────────────────────────────┐
│ 服务注册中心 (Consul/Eureka) │
│ │
│ 服务A: 192.168.1.10:8001 │
│ 服务A: 192.168.1.11:8001 │
│ 服务B: 192.168.1.20:8002 │
└──────▲──────────────┬──────────────┘
│ │
注册/心跳 发现/查询
│ │
│ ▼
┌──────┴─────┐ ┌────────┐
│ 服务实例 │ │ 客户端 │
│ │ │ │
│ 启动时注册 │ │ 查询服务│
│ 定期心跳 │ │ 负载均衡│
└────────────┘ └────────┘
服务注册示例 (Consul):
const Consul = require('consul');
const express = require('express');
const consul = new Consul();
const app = express();
const PORT = process.env.PORT || 3001;
const SERVICE_NAME = 'user-service';
const SERVICE_ID = `${SERVICE_NAME}-${PORT}`;
// 服务健康检查端点
app.get('/health', (req, res) => {
res.json({ status: 'healthy' });
});
// 注册服务到 Consul
const registerService = () => {
const details = {
name: SERVICE_NAME,
id: SERVICE_ID,
address: 'localhost',
port: PORT,
check: {
http: `http://localhost:${PORT}/health`,
interval: '10s',
timeout: '5s'
}
};
consul.agent.service.register(details, (err) => {
if (err) {
console.error('注册服务失败:', err);
} else {
console.log('服务注册成功');
}
});
};
// 注销服务
const deregisterService = () => {
consul.agent.service.deregister(SERVICE_ID, (err) => {
console.log('服务已注销');
process.exit();
});
};
// 启动服务
app.listen(PORT, () => {
console.log(`服务运行在端口 ${PORT}`);
registerService();
});
// 优雅关闭
process.on('SIGINT', deregisterService);
process.on('SIGTERM', deregisterService);
服务发现客户端示例:
const Consul = require('consul');
const consul = new Consul();
// 从 Consul 获取服务实例
const getServiceInstance = async (serviceName) => {
return new Promise((resolve, reject) => {
consul.health.service({
service: serviceName,
passing: true // 只返回健康的实例
}, (err, result) => {
if (err) return reject(err);
if (result.length === 0) {
return reject(new Error('No healthy instances found'));
}
// 简单的随机负载均衡
const instance = result[Math.floor(Math.random() * result.length)];
resolve({
address: instance.Service.Address,
port: instance.Service.Port
});
});
});
};
// 调用服务
const callUserService = async (userId) => {
const instance = await getServiceInstance('user-service');
const url = `http://${instance.address}:${instance.port}/users/${userId}`;
const response = await fetch(url);
return response.json();
};
第二部分:服务网格 (Service Mesh)
2.1 什么是服务网格?
服务网格是一个专用的基础设施层,用于处理服务间通信。它将网络功能(如负载均衡、服务发现、故障恢复、指标收集等)从应用代码中抽离出来。
架构对比:
传统微服务:
┌─────────────────┐ ┌─────────────────┐
│ Service A │ │ Service B │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ 业务逻辑 │ │ │ │ 业务逻辑 │ │
│ ├───────────┤ │ │ ├───────────┤ │
│ │ 网络库 │ │────▶│ │ 网络库 │ │
│ │(重试/限流)│ │ │ │(重试/限流)│ │
│ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘
服务网格:
┌─────────────────┐ ┌─────────────────┐
│ Service A │ │ Service B │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ 业务逻辑 │ │ │ │ 业务逻辑 │ │
│ └─────┬─────┘ │ │ └─────▲─────┘ │
└────────┼────────┘ └────────┼────────┘
│ │
▼ │
┌────────┐ ┌────────┐
│Sidecar │──────────────▶│Sidecar │
│ Proxy │ │ Proxy │
└────────┘ └────────┘
│ │
└───────────┬───────────┘
▼
┌──────────────┐
│ Control Plane│
│ (Istio) │
└──────────────┘
2.2 Istio 架构
核心组件:
┌──────────────────────────────────────────┐
│ Control Plane (istiod) │
│ ┌──────────┐ ┌────────┐ ┌──────────┐ │
│ │ Pilot │ │Citadel │ │ Galley │ │
│ │(流量管理)│ │(安全) │ │(配置) │ │
│ └──────────┘ └────────┘ └──────────┘ │
└─────────────────┬────────────────────────┘
│ 配置下发
┌─────────┼─────────┐
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Envoy │ │Envoy │ │Envoy │ Data Plane
│Proxy │ │Proxy │ │Proxy │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│App A │ │App B │ │App C │
└───────┘ └───────┘ └───────┘
组件职责:
- Pilot: 服务发现、流量管理配置
- Citadel: 证书管理、mTLS
- Galley: 配置验证和分发
- Envoy Proxy: 数据平面代理
2.3 流量管理
虚拟服务 (Virtual Service)
定义路由规则,控制流量如何路由到服务。
示例:金丝雀发布 (Canary Release)
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: reviews-route
spec:
hosts:
- reviews # 目标服务
http:
- match:
- headers:
user-type:
exact: premium # VIP 用户
route:
- destination:
host: reviews
subset: v2 # 路由到 v2 版本
weight: 100
- route: # 普通用户
- destination:
host: reviews
subset: v1
weight: 90 # 90% 流量到 v1
- destination:
host: reviews
subset: v2
weight: 10 # 10% 流量到 v2 (金丝雀)
目标规则 (Destination Rule)
定义服务的子集和负载均衡策略。
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: reviews-destination
spec:
host: reviews
trafficPolicy:
loadBalancer:
simple: LEAST_REQUEST # 负载均衡策略
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 2
outlierDetection: # 异常检测
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
重试和超时
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: order-service
spec:
hosts:
- order-service
http:
- route:
- destination:
host: order-service
timeout: 3s # 请求超时
retries:
attempts: 3 # 重试次数
perTryTimeout: 1s # 每次重试超时
retryOn: 5xx,reset,connect-failure # 重试条件
2.4 可观测性
分布式追踪
Jaeger 集成示例:
请求链路追踪:
客户端请求
│
▼ Trace ID: abc123
┌──────────────┐
│ API Gateway │ Span 1: gateway (200ms)
└──────┬───────┘
│
├─► ┌──────────────┐
│ │ User Service │ Span 2: user-lookup (50ms)
│ └──────────────┘
│
└─► ┌──────────────┐
│Order Service │ Span 3: order-create (100ms)
└──────┬───────┘
│
└─► ┌──────────────┐
│Payment Svc │ Span 4: payment (80ms)
└──────────────┘
总耗时: 230ms (有并行调用)
自动追踪配置: Istio 自动注入追踪头并收集 span 数据,应用只需传播 headers。
// Express 中间件传播追踪头
const propagateTracingHeaders = (req, res, next) => {
const tracingHeaders = [
'x-request-id',
'x-b3-traceid',
'x-b3-spanid',
'x-b3-parentspanid',
'x-b3-sampled',
'x-b3-flags'
];
req.tracingHeaders = {};
tracingHeaders.forEach(header => {
if (req.headers[header]) {
req.tracingHeaders[header] = req.headers[header];
}
});
next();
};
// 调用下游服务时传播 headers
app.get('/api/orders', propagateTracingHeaders, async (req, res) => {
const response = await fetch('http://payment-service/pay', {
headers: req.tracingHeaders
});
// ...
});
Metrics 收集
Istio 自动收集四大黄金信号:
- 延迟 (Latency): 请求响应时间
- 流量 (Traffic): 请求速率 (QPS)
- 错误 (Errors): 错误率
- 饱和度 (Saturation): 资源使用率
Prometheus 查询示例:
# 服务请求速率
rate(istio_requests_total{destination_service="order-service"}[1m])
# P99 延迟
histogram_quantile(0.99,
rate(istio_request_duration_milliseconds_bucket[1m])
)
# 错误率
rate(istio_requests_total{response_code=~"5.."}[1m])
/
rate(istio_requests_total[1m])
第三部分:分布式事务与一致性
3.1 分布式事务的挑战
ACID 在分布式系统中的问题:
- Atomicity: 跨多个服务的原子性难以保证
- Consistency: 不同服务有独立数据库
- Isolation: 分布式锁成本高
- Durability: 需要复杂的协调
示例场景:电商下单
下单流程涉及多个服务:
1. 订单服务:创建订单
2. 库存服务:扣减库存
3. 支付服务:扣款
4. 积分服务:增加积分
要求:要么全部成功,要么全部失败
3.2 两阶段提交 (2PC)
协议流程:
协调者 (Coordinator) 参与者 (Participants)
│ 订单服务 库存服务 支付服务
│ │ │ │
├──── Prepare ─────────┼────────┼────────┤
│ │ │ │
│ [准备] [准备] [准备]
│ │ │ │
│◄──── Vote-Yes ───────┤ │ │
│◄──── Vote-Yes ────────────────┤ │
│◄──── Vote-Yes ─────────────────────────┤
│ │ │ │
├──── Commit ──────────┼────────┼────────┤
│ │ │ │
│ [提交] [提交] [提交]
│ │ │ │
│◄──── ACK ────────────┤ │ │
│◄──── ACK ─────────────────────┤ │
│◄──── ACK ──────────────────────────────┤
实现示例 (简化版):
class TwoPhaseCommitCoordinator {
constructor(participants) {
this.participants = participants; // [{id, prepare, commit, abort}]
}
async execute(transaction) {
const votes = [];
// Phase 1: Prepare
console.log('Phase 1: Prepare');
for (const participant of this.participants) {
try {
const vote = await participant.prepare(transaction);
votes.push({ participant, vote });
if (vote !== 'YES') {
console.log(`${participant.id} voted NO`);
await this.abortAll(votes);
return { status: 'ABORTED', reason: 'Participant voted NO' };
}
} catch (error) {
console.log(`${participant.id} prepare failed:`, error.message);
await this.abortAll(votes);
return { status: 'ABORTED', reason: error.message };
}
}
// Phase 2: Commit
console.log('Phase 2: Commit');
const commitResults = [];
for (const { participant } of votes) {
try {
await participant.commit(transaction);
commitResults.push({ participant: participant.id, status: 'COMMITTED' });
} catch (error) {
// 这里出现问题就麻烦了 - 需要重试或人工介入
console.error(`${participant.id} commit failed:`, error.message);
commitResults.push({ participant: participant.id, status: 'FAILED' });
}
}
return { status: 'COMMITTED', results: commitResults };
}
async abortAll(votes) {
for (const { participant } of votes) {
try {
await participant.abort();
} catch (error) {
console.error(`${participant.id} abort failed:`, error.message);
}
}
}
}
// 参与者实现示例 - 库存服务
class InventoryServiceParticipant {
constructor() {
this.id = 'inventory-service';
this.prepared = new Map(); // 存储准备状态
}
async prepare(transaction) {
const { orderId, items } = transaction;
// 检查库存
for (const item of items) {
const stock = await this.checkStock(item.productId);
if (stock < item.quantity) {
return 'NO'; // 库存不足,投票拒绝
}
}
// 锁定库存 (但不真正扣减)
await this.lockInventory(orderId, items);
this.prepared.set(orderId, items);
return 'YES';
}
async commit(transaction) {
const { orderId } = transaction;
const items = this.prepared.get(orderId);
if (!items) {
throw new Error('Transaction not prepared');
}
// 真正扣减库存
await this.deductInventory(items);
this.prepared.delete(orderId);
}
async abort(transaction) {
const { orderId } = transaction;
const items = this.prepared.get(orderId);
if (items) {
// 释放锁定的库存
await this.releaseInventory(orderId);
this.prepared.delete(orderId);
}
}
// ... 实际的数据库操作方法
}
2PC 的问题:
- 阻塞问题: 参与者在 prepare 后会阻塞等待协调者的决定
- 单点故障: 协调者故障导致参与者无限期阻塞
- 性能问题: 两个阶段的网络往返,延迟高
- 数据一致性: Commit 阶段失败可能导致不一致
3.3 Saga 模式
Saga 将长事务拆分为多个本地事务,每个本地事务有对应的补偿操作。
核心思想:
- 每个服务执行本地事务并发布事件
- 如果某个步骤失败,执行补偿事务回滚之前的操作
编排式 Saga (Choreography)
流程示例:
成功流程:
订单服务 库存服务 支付服务 积分服务
│ │ │ │
├─创建订单────────┤ │ │
│ │ │ │
│ ◄──订单已创建事件 │ │
│ │ │ │
│ ├─扣减库存───────┤ │
│ │ │ │
│ ◄──库存已扣减事件 │ │
│ │ │ │
│ │ ├─处理支付───────┤
│ │ │ │
│ ◄──支付已完成事件 │ │
│ │ │ │
│ │ │ ┌─增加积分
│ │ │ │
│ ◄──积分已增加事件───────────────────┘
失败流程 (支付失败):
订单服务 库存服务 支付服务
│ │ │
├─创建订单────────┤ │
│ ◄──订单已创建 │
│ ├─扣减库存───────┤
│ ◄──库存已扣减 │
│ │ ├─处理支付
│ │ │ (失败)
│ ◄──支付失败事件─────────┤
│ │ │
│ ├─补偿:归还库存 │
│ │ │
├─补偿:取消订单 │ │
实现示例 (基于事件总线):
// 事件总线 (使用 RabbitMQ/Kafka)
class EventBus {
constructor() {
this.handlers = new Map();
}
subscribe(eventType, handler) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType).push(handler);
}
async publish(event) {
const handlers = this.handlers.get(event.type) || [];
for (const handler of handlers) {
try {
await handler(event);
} catch (error) {
console.error(`Handler failed for ${event.type}:`, error);
}
}
}
}
const eventBus = new EventBus();
// 订单服务
class OrderService {
async createOrder(orderData) {
// 执行本地事务
const order = await this.db.orders.create({
...orderData,
status: 'PENDING'
});
// 发布事件
await eventBus.publish({
type: 'ORDER_CREATED',
data: {
orderId: order.id,
items: order.items,
userId: order.userId
}
});
return order;
}
async cancelOrder(orderId) {
await this.db.orders.update(orderId, { status: 'CANCELLED' });
await eventBus.publish({
type: 'ORDER_CANCELLED',
data: { orderId }
});
}
}
// 库存服务
class InventoryService {
constructor() {
// 监听订单创建事件
eventBus.subscribe('ORDER_CREATED', this.handleOrderCreated.bind(this));
eventBus.subscribe('PAYMENT_FAILED', this.handlePaymentFailed.bind(this));
}
async handleOrderCreated(event) {
const { orderId, items } = event.data;
try {
// 扣减库存
await this.deductInventory(items);
await eventBus.publish({
type: 'INVENTORY_DEDUCTED',
data: { orderId, items }
});
} catch (error) {
// 库存不足,发布失败事件
await eventBus.publish({
type: 'INVENTORY_DEDUCTION_FAILED',
data: { orderId, reason: error.message }
});
}
}
async handlePaymentFailed(event) {
const { orderId, items } = event.data;
// 补偿操作:归还库存
await this.restoreInventory(items);
await eventBus.publish({
type: 'INVENTORY_RESTORED',
data: { orderId }
});
}
async deductInventory(items) {
for (const item of items) {
await this.db.inventory.decrement(item.productId, item.quantity);
}
}
async restoreInventory(items) {
for (const item of items) {
await this.db.inventory.increment(item.productId, item.quantity);
}
}
}
// 支付服务
class PaymentService {
constructor() {
eventBus.subscribe('INVENTORY_DEDUCTED', this.handleInventoryDeducted.bind(this));
}
async handleInventoryDeducted(event) {
const { orderId } = event.data;
try {
// 处理支付
const payment = await this.processPayment(orderId);
await eventBus.publish({
type: 'PAYMENT_COMPLETED',
data: { orderId, paymentId: payment.id }
});
} catch (error) {
await eventBus.publish({
type: 'PAYMENT_FAILED',
data: { orderId, reason: error.message, ...event.data }
});
}
}
}
编排式 Saga (Orchestration)
使用中央协调器控制流程。
class SagaOrchestrator {
constructor() {
this.steps = [];
}
addStep(name, action, compensation) {
this.steps.push({ name, action, compensation });
return this;
}
async execute(context) {
const executedSteps = [];
try {
// 正向执行所有步骤
for (const step of this.steps) {
console.log(`Executing step: ${step.name}`);
const result = await step.action(context);
executedSteps.push({ step, result });
context[step.name] = result; // 保存结果供后续步骤使用
}
return { status: 'SUCCESS', data: context };
} catch (error) {
console.log(`Step failed: ${error.message}`);
console.log('Starting compensation...');
// 反向执行补偿操作
for (let i = executedSteps.length - 1; i >= 0; i--) {
const { step, result } = executedSteps[i];
try {
console.log(`Compensating step: ${step.name}`);
await step.compensation(context, result);
} catch (compError) {
console.error(`Compensation failed for ${step.name}:`, compError);
// 补偿失败需要记录并可能需要人工介入
}
}
return { status: 'FAILED', error: error.message };
}
}
}
// 使用示例:订单 Saga
const createOrderSaga = () => {
const saga = new SagaOrchestrator();
saga.addStep(
'createOrder',
async (ctx) => {
const order = await orderService.create(ctx.orderData);
return order;
},
async (ctx, order) => {
await orderService.cancel(order.id);
}
);
saga.addStep(
'reserveInventory',
async (ctx) => {
const reservation = await inventoryService.reserve(ctx.orderData.items);
return reservation;
},
async (ctx, reservation) => {
await inventoryService.release(reservation.id);
}
);
saga.addStep(
'processPayment',
async (ctx) => {
const payment = await paymentService.charge({
amount: ctx.createOrder.totalAmount,
userId: ctx.orderData.userId
});
return payment;
},
async (ctx, payment) => {
await paymentService.refund(payment.id);
}
);
saga.addStep(
'confirmInventory',
async (ctx) => {
await inventoryService.confirm(ctx.reserveInventory.id);
},
async (ctx) => {
// 如果确认失败,前面的补偿已经释放了库存
}
);
return saga;
};
// 执行 Saga
const orderSaga = createOrderSaga();
const result = await orderSaga.execute({
orderData: {
userId: 'user123',
items: [
{ productId: 'prod1', quantity: 2, price: 100 },
{ productId: 'prod2', quantity: 1, price: 50 }
]
}
});
console.log(result);
Saga 模式对比:
| 特性 | 编排式 (Choreography) | 编排式 (Orchestration) |
|---|---|---|
| 耦合度 | 低 - 服务通过事件通信 | 高 - 依赖中央协调器 |
| 复杂度 | 高 - 逻辑分散 | 低 - 集中管理 |
| 可观测性 | 难 - 需要追踪事件链 | 易 - 协调器掌握全局 |
| 单点故障 | 无 | 协调器是单点 |
| 适用场景 | 简单流程、松耦合 | 复杂流程、需要集中控制 |
第四部分:CAP 理论与最终一致性
4.1 CAP 定理
定理内容: 分布式系统最多只能同时满足以下三项中的两项:
Consistency (一致性)
▲
╱ ╲
╱ ╲
╱ ╲
╱ ╲
╱ CA ╲
╱ ╲
╱ ╲
╱ P ╲
╱ (网络分区) ╲
╱ ╲
╱_____________________ ╲
Partition Tolerance Availability
(分区容错性) (可用性)
CP: 一致性 + 分区容错性 (牺牲可用性)
AP: 可用性 + 分区容错性 (牺牲一致性)
CA: 一致性 + 可用性 (不能容忍分区,实际不存在)
详细解释:
-
Consistency (一致性): 所有节点在同一时间看到相同的数据
写入 A=1 ┌────┐ ┌────┐ ┌────┐ │ N1 │───▶│ N2 │───▶│ N3 │ │A=1 │ │A=1 │ │A=1 │ └────┘ └────┘ └────┘ 读取任何节点都返回 A=1 -
Availability (可用性): 每个请求都能得到响应 (成功或失败)
即使某些节点故障,系统仍能响应请求 ┌────┐ ┌────┐ ┌────┐ │ N1 │ │ N2 │ │ N3 │ │可用│ │故障│ │可用│ └────┘ └────┘ └────┘ ╲ ╱ ╲ ╱ ▼ ▼ 仍能处理请求 -
Partition Tolerance (分区容错性): 系统在网络分区时仍能继续运行
网络分区 ┌────┐ ╱╱╱╱╱ ┌────┐ │ N1 │ 断开 │ N2 │ │A=1 │ │A=? │ └────┘ └────┘ 系统能容忍网络分区并继续运作
4.2 实际系统的 CAP 选择
CP 系统示例:HBase, MongoDB(强一致性配置)
特征:
- 网络分区时,少数派节点拒绝服务
- 保证数据一致性
// ZooKeeper (CP系统) 示例
class ZooKeeperClient {
async write(path, data) {
// 写入需要多数派同意
const nodes = this.getAllNodes();
const majority = Math.floor(nodes.length / 2) + 1;
const acks = await Promise.all(
nodes.map(node => node.write(path, data))
);
const successCount = acks.filter(ack => ack.success).length;
if (successCount >= majority) {
return { success: true };
} else {
// 未达到多数派,写入失败
throw new Error('Cannot reach majority');
}
}
async read(path) {
// 从 leader 读取,保证一致性
const leader = await this.getLeader();
return leader.read(path);
}
}
AP 系统示例:Cassandra, DynamoDB
特征:
- 网络分区时,所有节点仍可服务
- 牺牲强一致性,提供最终一致性
// Cassandra 风格的 AP 系统
class CassandraClient {
async write(key, value, options = {}) {
const { consistencyLevel = 'ONE' } = options;
const nodes = this.getReplicaNodes(key);
// 异步写入多个副本
const writePromises = nodes.map(node =>
node.write(key, value).catch(err => ({ error: err }))
);
// 根据一致性级别决定等待多少个响应
const requiredAcks = this.getRequiredAcks(consistencyLevel, nodes.length);
// 等待足够数量的成功响应
let successCount = 0;
for (const promise of writePromises) {
const result = await promise;
if (!result.error) {
successCount++;
if (successCount >= requiredAcks) {
// 已达到要求,立即返回
return { success: true };
}
}
}
// 即使部分节点失败,只要达到要求就成功
return { success: successCount >= requiredAcks };
}
getRequiredAcks(level, totalNodes) {
switch (level) {
case 'ONE': return 1; // AP: 任意一个节点成功即可
case 'QUORUM': return Math.floor(totalNodes / 2) + 1; // 中间状态
case 'ALL': return totalNodes; // CP: 所有节点都要成功
}
}
async read(key, options = {}) {
const { consistencyLevel = 'ONE' } = options;
const nodes = this.getReplicaNodes(key);
if (consistencyLevel === 'ONE') {
// 从任意可用节点读取 (可能读到旧数据)
for (const node of nodes) {
try {
return await node.read(key);
} catch (err) {
continue; // 尝试下一个节点
}
}
} else if (consistencyLevel === 'QUORUM') {
// 从多数派读取,并返回最新版本
const results = await Promise.allSettled(
nodes.map(node => node.read(key))
);
const values = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
// 返回版本号最高的值
return values.reduce((latest, current) =>
current.version > latest.version ? current : latest
);
}
}
}
4.3 最终一致性 (Eventual Consistency)
定义: 如果没有新的更新,所有副本最终会达到一致状态。
一致性级别金字塔:
强一致性 (Linearizability)
▲
│
顺序一致性 (Sequential)
│
因果一致性 (Causal)
│
会话一致性 (Session)
│
最终一致性 (Eventual)
向量时钟 (Vector Clock)
用于检测并发写入和解决冲突。
class VectorClock {
constructor(nodeId) {
this.nodeId = nodeId;
this.clock = {}; // {nodeId: counter}
}
// 增加本地时钟
increment() {
this.clock[this.nodeId] = (this.clock[this.nodeId] || 0) + 1;
}
// 合并其他节点的时钟
merge(otherClock) {
for (const [nodeId, counter] of Object.entries(otherClock)) {
this.clock[nodeId] = Math.max(
this.clock[nodeId] || 0,
counter
);
}
}
// 比较两个向量时钟
compare(other) {
let thisGreater = false;
let otherGreater = false;
const allNodes = new Set([
...Object.keys(this.clock),
...Object.keys(other)
]);
for (const nodeId of allNodes) {
const thisVal = this.clock[nodeId] || 0;
const otherVal = other[nodeId] || 0;
if (thisVal > otherVal) thisGreater = true;
if (otherVal > thisVal) otherGreater = true;
}
if (thisGreater && !otherGreater) return 'AFTER'; // this 发生在 other 之后
if (otherGreater && !thisGreater) return 'BEFORE'; // this 发生在 other 之前
if (!thisGreater && !otherGreater) return 'EQUAL'; // 相同
return 'CONCURRENT'; // 并发冲突
}
}
// 使用示例
class DistributedKVStore {
constructor(nodeId) {
this.nodeId = nodeId;
this.data = new Map(); // key -> {value, vectorClock}
}
write(key, value) {
const existing = this.data.get(key);
const clock = new VectorClock(this.nodeId);
if (existing) {
// 基于已有数据的向量时钟
clock.merge(existing.vectorClock.clock);
}
clock.increment();
this.data.set(key, {
value,
vectorClock: clock
});
// 异步同步到其他节点
this.replicateToOtherNodes(key, value, clock);
}
handleReplication(key, value, remoteClock) {
const existing = this.data.get(key);
if (!existing) {
// 本地没有数据,直接写入
this.data.set(key, {
value,
vectorClock: new VectorClock(this.nodeId)
});
this.data.get(key).vectorClock.clock = remoteClock;
return;
}
const comparison = existing.vectorClock.compare(remoteClock);
switch (comparison) {
case 'BEFORE':
// 远程数据更新,覆盖本地
this.data.set(key, {
value,
vectorClock: new VectorClock(this.nodeId)
});
this.data.get(key).vectorClock.clock = remoteClock;
break;
case 'AFTER':
// 本地数据更新,忽略远程数据
break;
case 'CONCURRENT':
// 冲突!需要解决
const resolved = this.resolveConflict(existing.value, value);
const mergedClock = new VectorClock(this.nodeId);
mergedClock.merge(existing.vectorClock.clock);
mergedClock.merge(remoteClock);
mergedClock.increment();
this.data.set(key, {
value: resolved,
vectorClock: mergedClock
});
break;
}
}
resolveConflict(value1, value2) {
// 冲突解决策略
// 1. Last-Write-Wins (LWW) - 使用时间戳
// 2. 应用层合并 (如购物车合并所有商品)
// 3. 保留所有版本让用户选择
// 示例:购物车合并
if (Array.isArray(value1) && Array.isArray(value2)) {
return [...new Set([...value1, ...value2])];
}
// 默认:保留两个版本
return [value1, value2];
}
}
4.4 Quorum 机制
读写 Quorum 保证数据一致性。
公式: W + R > N
- N: 副本总数
- W: 写入时需要确认的副本数
- R: 读取时需要查询的副本数
示例:N=3, W=2, R=2
写入流程:
客户端写 value=X, version=5
│
├──► Node1 (成功) ✓
├──► Node2 (成功) ✓
└──► Node3 (失败) ✗
W=2 已满足,写入成功
读取流程:
客户端读取
│
├──► Node1: {value=X, version=5}
├──► Node2: {value=X, version=5}
└──► Node3: {value=Y, version=3} (旧数据)
取最新版本: value=X, version=5
同时修复 Node3 的数据 (Read Repair)
实现:
class QuorumStore {
constructor(nodes, N, W, R) {
this.nodes = nodes;
this.N = N; // 总副本数
this.W = W; // 写 Quorum
this.R = R; // 读 Quorum
}
async write(key, value) {
const version = Date.now(); // 简单版本号
const writePromises = this.nodes.map(node =>
node.write(key, { value, version })
.then(() => ({ success: true, node }))
.catch(err => ({ success: false, node, error: err }))
);
let successCount = 0;
const results = [];
for (const promise of writePromises) {
const result = await promise;
results.push(result);
if (result.success) {
successCount++;
if (successCount >= this.W) {
// 达到写 Quorum
return { success: true, version };
}
}
}
throw new Error(`Write failed: only ${successCount}/${this.W} nodes confirmed`);
}
async read(key) {
const readPromises = this.nodes.map(node =>
node.read(key)
.then(data => ({ success: true, data, node }))
.catch(err => ({ success: false, node, error: err }))
);
const results = [];
let successCount = 0;
for (const promise of readPromises) {
const result = await promise;
if (result.success) {
results.push(result);
successCount++;
if (successCount >= this.R) {
// 达到读 Quorum,返回最新版本
const latest = results.reduce((newest, current) =>
current.data.version > newest.data.version ? current : newest
);
// Read Repair: 修复旧数据
this.readRepair(key, latest.data, results);
return latest.data.value;
}
}
}
throw new Error(`Read failed: only ${successCount}/${this.R} nodes responded`);
}
async readRepair(key, latestData, results) {
for (const result of results) {
if (result.success && result.data.version < latestData.version) {
// 异步修复旧数据
result.node.write(key, latestData).catch(err =>
console.error('Read repair failed:', err)
);
}
}
}
}
// 配置示例
const nodes = [node1, node2, node3, node4, node5];
const store = new QuorumStore(nodes, 5, 3, 3); // N=5, W=3, R=3
// W + R = 6 > N = 5,保证强一致性
不同配置的权衡:
| 配置 | 一致性 | 可用性 | 性能 | 场景 |
|---|---|---|---|---|
| W=N, R=1 | 强 | 低 | 写慢读快 | 读多写少 |
| W=1, R=N | 弱 | 高 | 写快读慢 | 写多读少 |
| W=Q, R=Q (Q>N/2) | 强 | 中 | 平衡 | 通用场景 |
| W=1, R=1 | 弱 | 高 | 快 | 可容忍不一致 |
第五部分:综合实践案例
案例:构建高可用的订单系统
需求:
- 支持每秒 10,000 笔订单
- 99.99% 可用性
- 跨地域部署
- 最终一致性可接受
架构设计:
用户请求
│
▼
┌────────────────┐
│ API Gateway │
│ (限流/认证) │
└────────┬───────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌────────┐
│订单服务 │ │库存服务 │ │支付服务│
│(AP系统) │ │(CP系统) │ │(CP系统)│
└────┬───┘ └────┬────┘ └────┬───┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌────────┐
│Cassandra │ Redis │ │ MySQL │
│(最终一致) │(强一致) │ │(强一致)│
└────────┘ └─────────┘ └────────┘
│ │ │
└────────────┼─────────────┘
│
▼
┌─────────────────┐
│ Kafka (事件流) │
└─────────────────┘
实现要点:
// 1. 订单服务 - 使用 Saga 协调
class OrderService {
async createOrder(orderRequest) {
// 生成幂等性 key
const idempotencyKey = this.generateIdempotencyKey(orderRequest);
// 检查幂等性
const existing = await this.checkIdempotency(idempotencyKey);
if (existing) {
return existing;
}
// 创建订单 Saga
const saga = new SagaOrchestrator();
saga.addStep(
'createOrder',
async (ctx) => {
// 写入订单到 Cassandra (AP)
const order = await this.orderRepository.create({
...orderRequest,
status: 'PENDING',
idempotencyKey
});
return order;
},
async (ctx, order) => {
await this.orderRepository.updateStatus(order.id, 'CANCELLED');
}
);
saga.addStep(
'checkInventory',
async (ctx) => {
// 调用库存服务 (CP系统,强一致性)
const result = await this.inventoryService.reserve({
orderId: ctx.createOrder.id,
items: orderRequest.items
});
if (!result.success) {
throw new Error('Insufficient inventory');
}
return result;
},
async (ctx, reservation) => {
await this.inventoryService.release(reservation.id);
}
);
saga.addStep(
'processPayment',
async (ctx) => {
const payment = await this.paymentService.charge({
orderId: ctx.createOrder.id,
amount: ctx.createOrder.totalAmount,
userId: orderRequest.userId
});
return payment;
},
async (ctx, payment) => {
await this.paymentService.refund(payment.id);
}
);
saga.addStep(
'confirmOrder',
async (ctx) => {
// 确认订单 - 异步更新状态
await this.orderRepository.updateStatus(
ctx.createOrder.id,
'CONFIRMED'
);
// 发布订单确认事件
await this.eventBus.publish({
type: 'ORDER_CONFIRMED',
data: ctx.createOrder
});
},
async (ctx) => {
// 已经在前面的补偿中取消订单
}
);
const result = await saga.execute({ orderRequest });
if (result.status === 'SUCCESS') {
// 缓存结果用于幂等性检查
await this.cacheIdempotencyResult(idempotencyKey, result.data);
}
return result;
}
generateIdempotencyKey(orderRequest) {
// 基于用户ID + 时间窗口 + 订单内容生成
const hash = crypto
.createHash('sha256')
.update(JSON.stringify(orderRequest))
.digest('hex');
return `order:${orderRequest.userId}:${hash}`;
}
}
// 2. 库存服务 - 强一致性
class InventoryService {
async reserve(request) {
const { orderId, items } = request;
// 使用分布式锁保证一致性
const lock = await this.redlock.lock(
`inventory:lock:${items.map(i => i.productId).join(':')}`,
1000 // 1秒超时
);
try {
// 在事务中检查和扣减库存
const result = await this.db.transaction(async (trx) => {
for (const item of items) {
const stock = await trx('inventory')
.where('product_id', item.productId)
.forUpdate() // 行锁
.first();
if (!stock || stock.available < item.quantity) {
throw new Error(`Insufficient stock for ${item.productId}`);
}
// 扣减可用库存,增加保留库存
await trx('inventory')
.where('product_id', item.productId)
.update({
available: stock.available - item.quantity,
reserved: stock.reserved + item.quantity
});
// 记录保留
await trx('inventory_reservations').insert({
order_id: orderId,
product_id: item.productId,
quantity: item.quantity,
expires_at: new Date(Date.now() + 15 * 60 * 1000) // 15分钟后过期
});
}
return { success: true, orderId };
});
return result;
} finally {
await lock.unlock();
}
}
// 定时任务:释放过期的库存保留
async releaseExpiredReservations() {
const expired = await this.db('inventory_reservations')
.where('expires_at', '<', new Date())
.where('status', 'RESERVED');
for (const reservation of expired) {
await this.release(reservation.order_id);
}
}
}
// 3. 可观测性:分布式追踪
class TracingMiddleware {
constructor(serviceName) {
this.serviceName = serviceName;
}
middleware() {
return async (req, res, next) => {
const traceId = req.headers['x-trace-id'] || this.generateTraceId();
const spanId = this.generateSpanId();
const parentSpanId = req.headers['x-span-id'];
// 创建 span
const span = {
traceId,
spanId,
parentSpanId,
serviceName: this.serviceName,
operationName: `${req.method} ${req.path}`,
startTime: Date.now()
};
// 注入到请求上下文
req.trace = { traceId, spanId };
// 响应完成时记录 span
res.on('finish', () => {
span.endTime = Date.now();
span.duration = span.endTime - span.startTime;
span.statusCode = res.statusCode;
// 发送到 Jaeger/Zipkin
this.sendSpan(span);
});
// 传播追踪头
res.setHeader('x-trace-id', traceId);
next();
};
}
generateTraceId() {
return crypto.randomBytes(16).toString('hex');
}
generateSpanId() {
return crypto.randomBytes(8).toString('hex');
}
}
总结与最佳实践
微服务架构关键要点
- 合理拆分: 基于业务领域,不要过度拆分
- 容错设计: 使用断路器、超时、重试
- 服务发现: 使用注册中心实现动态服务发现
- API 网关: 统一入口,处理横切关注点
服务网格优势
- 流量管理: 灰度发布、流量分割无需修改代码
- 可观测性: 自动追踪、指标收集
- 安全: mTLS 加密服务间通信
分布式事务选择
- 2PC: 强一致性,但性能差,避免使用
- Saga: 推荐用于微服务,编排式更易管理
- 最终一致性: 大多数场景可接受
CAP 权衡
- 关键数据: 选择 CP (如库存、支付)
- 非关键数据: 选择 AP (如用户浏览历史)
- Quorum: 灵活调整一致性级别
推荐学习路径
- 阅读 DDIA 第 5-9 章 (2-3 周)
- 动手搭建微服务 Demo (1-2 周)
- 学习 Istio 并实践 (2 周)
- 实现 Saga 模式 (1 周)
- 研究实际案例 (持续)
参考资料:
- Designing Data-Intensive Applications: https://dataintensive.net/
- 微软云设计模式: https://learn.microsoft.com/azure/architecture/patterns/
- Istio 官方文档: https://istio.io/docs/
- Saga 模式: https://microservices.io/patterns/data/saga.html
- CAP 定理: https://en.wikipedia.org/wiki/CAP_theorem