一个持续超过30秒、跨越多个微服务的数据处理流程,一旦中间环节失败,就会导致数据状态不一致。在生产环境中,这种“数据孤岛”和“半成品”状态是灾难的开始。传统的两阶段提交(2PC)协议在这种长周期、异步的 serverless 场景下,由于其同步阻塞的特性,不仅不适用,还会造成严重的性能瓶颈和可用性问题。这正是我们团队遇到的棘手挑战。
我们的目标是构建一个健壮的、可观测的、并且能够优雅处理失败回滚的分布式事务解决方案。最终,我们选择了 Saga 模式作为核心思想,并围绕它构建了一套技术栈:使用 Google Cloud Functions 作为无服务器计算层,XState 作为状态管理的“大脑”,Cassandra 作为高可用的状态持久化存储,而 Chef 则负责自动化我们自建 Cassandra 集群的部署与维护。
初始痛点:不可靠的流程与手动恢复
最初的实现非常粗糙,一个主 Cloud Function 依次调用其他几个服务。当步骤三失败时,我们没有任何自动化的回滚机制。工程师需要介入,手动清理步骤一和步骤二产生的数据,然后重新触发整个流程。这种方式在业务量上升后变得完全不可行,运维成本飙升,且极易出错。
我们需要的是一个协调器(Orchestrator),它必须:
- 状态化:精确知道当前流程执行到哪一步。
- 持久化:即使协调器自身发生故障重启,也能从上次的状态恢复。
- 可回滚:当任何一步失败时,能反向执行之前所有步骤的补偿操作。
技术选型与权衡
1. 状态管理:为何是 XState?
Saga 本质上就是一个状态机。你可以用一堆 if/else 或数据库中的 status 字段来手动管理。但在真实项目中,这种方式很快会变得难以维护。状态转换逻辑、附带的副作用(actions)、防护条件(guards)都散落在代码各处。
XState 强制我们用一种声明式、可预测的方式来定义整个 Saga 流程。它的核心优势在于:
- 逻辑与实现分离:状态图(Machine)的定义是纯粹的 JSON 对象,可以独立于业务逻辑进行测试和可视化。
- 可恢复性:XState 的状态对象是可序列化的。我们可以将它完整存入数据库,在需要时取出来,就能完美恢复当时的状态机上下文,继续执行。
- 复杂流程建模:对于包含并行、分支、历史状态等复杂场景的 Saga,XState 提供了强大的建模能力,这是手动管理难以企及的。
2. 计算层:Google Cloud Functions
Serverless 是这个场景的绝佳选择。Saga 流程可能是低频的,也可能在某个时间点爆发。Cloud Functions 的按需伸缩和事件驱动模型,让我们无需关心底层服务器,只需专注于业务逻辑。我们将协调器本身实现为一个 HTTP 触发的 Cloud Function,而 Saga 的每个步骤则是独立的、由 Pub/Sub 消息触发的 Function。
3. 持久化层:自建 Cassandra 集群
协调器的状态必须被持久化。我们为什么没有选择像 Firestore 或 Cloud SQL 这样的全托管服务?
- 写入吞吐量与可用性:Saga 流程的每次状态变更都需要记录,这是一个写密集型场景。Cassandra 的无主(Masterless)架构和可调一致性级别(Tunable Consistency)使其在高写入负载和节点故障时依然能保持极高的可用性。
- 成本与控制:对于特定的、可预测的高负载,长期来看自建集群的成本可能低于同等性能的托管服务。更重要的是,我们能完全控制其拓扑、版本和性能调优。
- 现有技术栈:团队已经有管理其他 Cassandra 集群的经验,复用现有知识体系是务实的选择。
4. 基础设施管理:Chef
自建 Cassandra 意味着我们需要处理节点的配置、启动、监控和扩容。手动操作是不可靠的。Chef 作为一款成熟的配置管理工具,允许我们用代码来定义基础设施的状态。我们可以编写一个 cookbook 来描述一个 Cassandra 节点应该是什么样子:安装了哪个版本的 Java,cassandra.yaml 文件的具体配置,种子节点的地址等等。这保证了集群中每个节点的一致性,并且让新节点的加入变得自动化。
架构设计
整个系统的交互流程可以用下面的 Mermaid 图来表示:
sequenceDiagram
participant Client
participant Orchestrator (Cloud Function)
participant XState
participant Cassandra
participant Step 1 Service (Cloud Function)
participant Step 2 Service (Cloud Function)
participant Pub/Sub
Client->>+Orchestrator: POST /startSaga (payload)
Orchestrator->>XState: Create Machine instance
Orchestrator->>Cassandra: Persist initial state (saga_id, state_json)
XState-->>Orchestrator: Next action: 'executeStep1'
Orchestrator->>Pub/Sub: Publish 'execute_step1' topic (saga_id, payload)
Orchestrator-->>-Client: 202 Accepted (saga_id)
Pub/Sub->>+Step 1 Service: Trigger with message
Step 1 Service->>Step 1 Service: Perform business logic
Step 1 Service->>Pub/Sub: Publish 'step1_completed' or 'step1_failed'
deactivate Step 1 Service
Pub/Sub->>+Orchestrator: Trigger with 'step1_completed'
Orchestrator->>Cassandra: Load state_json for saga_id
Orchestrator->>XState: Restore machine and send 'STEP1_SUCCESS' event
Orchestrator->>Cassandra: Persist new state
XState-->>Orchestrator: Next action: 'executeStep2'
Orchestrator->>Pub/Sub: Publish 'execute_step2' topic
deactivate Orchestrator
%% Failure Case
Pub/Sub->>+Step 2 Service: Trigger with message
Step 2 Service->>Step 2 Service: Logic fails
Step 2 Service->>Pub/Sub: Publish 'step2_failed'
deactivate Step 2 Service
Pub/Sub->>+Orchestrator: Trigger with 'step2_failed'
Orchestrator->>Cassandra: Load state_json for saga_id
Orchestrator->>XState: Restore machine and send 'STEP2_FAILURE' event
Orchestrator->>Cassandra: Persist new state ('compensatingStep1')
XState-->>Orchestrator: Next action: 'compensateStep1'
Orchestrator->>Pub/Sub: Publish 'compensate_step1' topic
deactivate Orchestrator
步骤化实现
1. Chef Cookbook for Cassandra
我们不会展示一个完整的 cookbook,但以下是 recipes/default.rb 中的关键部分,它展示了如何配置一个 Cassandra 节点。这里的坑在于,Cassandra 对 listen_address 和 rpc_address 的配置非常敏感,必须正确获取到节点的内网 IP。
# cookbooks/cassandra-node/recipes/default.rb
# 1. Add DataStax repository for Cassandra
apt_repository 'datastax' do
uri 'https://repo.datastax.com/community'
distribution 'stable'
components ['main']
key 'https://repo.datastax.com/repo_keys/repo_key.asc'
end
# 2. Install Cassandra and required Java
package %w(openjdk-8-jdk cassandra) do
action :install
end
# 3. Stop the service to apply configuration changes
service 'cassandra' do
action :stop
end
# 4. Dynamically determine node's private IP and seed nodes
# In a real setup, seed nodes would be discovered via Chef search or attributes.
# For simplicity, we hardcode one seed here.
private_ip = node['cloud']['private_ips'].first
seed_node_ip = '10.1.1.1' # This should be dynamic
# 5. Template the cassandra.yaml file
# This is the core of the configuration.
template '/etc/cassandra/cassandra.yaml' do
source 'cassandra.yaml.erb'
owner 'cassandra'
group 'cassandra'
mode '0644'
variables(
cluster_name: 'SagaStateCluster',
seeds: seed_node_ip,
listen_address: private_ip,
rpc_address: private_ip,
endpoint_snitch: 'GoogleCloudSnitch' # Crucial for GCP environment
)
notifies :start, 'service[cassandra]', :delayed
end
# 6. Ensure the service is enabled and started after config changes
service 'cassandra' do
supports status: true, restart: true, reload: true
action [:enable]
end
这个配方确保了每个新节点都有一致的、适合 GCP 环境的配置。
2. Cassandra Schema
数据建模是 Cassandra 的关键。我们需要一个表来存储 Saga 的状态。
-- Keyspace creation
CREATE KEYSPACE IF NOT EXISTS saga_coordinator
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
USE saga_coordinator;
-- Table to store the state of each Saga instance
CREATE TABLE saga_state (
saga_id uuid,
saga_type text, // e.g., 'DATA_PIPELINE_V1'
current_state text, // A human-readable representation of the current state
state_json text, // The full serialized XState machine state
context_json text, // The context data of the state machine
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (saga_id)
);
-- It's often useful to have an index for querying sagas by type or state,
-- but be cautious with secondary indexes in Cassandra on high-cardinality columns.
-- For real-world use, consider a separate table or search integration.
CREATE INDEX IF NOT EXISTS on saga_state (saga_type);
saga_id 作为分区键保证了单个 Saga 的所有更新都落在同一个节点上,避免了跨节点读写的复杂性。state_json 是核心,它存储了 XState 的完整状态,保证了可恢复性。
3. XState Saga Machine 定义
这是我们 Saga 的“蓝图”。注意 invoke 的使用,它清晰地定义了每个状态需要执行的异步服务,以及成功或失败后应该触发的事件。
// saga-machine.js
import { createMachine, assign } from 'xstate';
// Simulating async service calls that return Promises
const invokeStep1 = (context, event) => {
console.log('Invoking Step 1 with data:', context.initialData);
// In a real Cloud Function, this would publish a Pub/Sub message
// and the function would terminate. The result comes back as a new event.
return Promise.resolve({ result: 'Step 1 OK' });
};
const invokeStep2 = (context, event) => {
console.log('Invoking Step 2');
if (context.forceFailure) {
return Promise.reject(new Error('Step 2 deliberately failed'));
}
return Promise.resolve({ result: 'Step 2 OK' });
};
const compensateStep1 = (context, event) => {
console.log('Compensating Step 1. Reason:', event.data.message);
// Logic to undo what Step 1 did
return Promise.resolve();
};
export const sagaMachine = createMachine({
id: 'dataProcessingSaga',
initial: 'processingStep1',
// Context stores the data shared across the saga lifecycle
context: {
sagaId: null,
initialData: null,
step1Result: null,
step2Result: null,
failureReason: null,
forceFailure: false, // For testing purposes
},
states: {
processingStep1: {
invoke: {
id: 'invokeStep1',
src: invokeStep1,
onDone: {
target: 'processingStep2',
actions: assign({ step1Result: (context, event) => event.data })
},
onError: {
target: 'failed',
actions: assign({ failureReason: (context, event) => event.data.message })
}
}
},
processingStep2: {
invoke: {
id: 'invokeStep2',
src: invokeStep2,
onDone: {
target: 'completed',
actions: assign({ step2Result: (context, event) => event.data })
},
onError: {
target: 'compensatingStep1',
actions: assign({ failureReason: (context, event) => event.data.message })
}
}
},
compensatingStep1: {
invoke: {
id: 'compensateStep1',
src: compensateStep1,
onDone: { target: 'failed' },
onError: { target: 'compensationFailed' } // Critical state, needs manual intervention
}
},
completed: {
type: 'final'
},
failed: {
type: 'final'
},
compensationFailed: {
type: 'final' // Terminal state indicating a major problem
}
}
});
4. Cloud Function Orchestrator
这是所有组件的粘合剂。它接收外部事件,加载 Saga 状态,驱动状态机,然后持久化新状态。
// index.js (Orchestrator Cloud Function)
import { v4 as uuidv4 } from 'uuid';
import { interpret } from 'xstate';
import { sagaMachine } from './saga-machine.js';
import { CassandraClient } from './cassandra-client.js'; // A wrapper for the datastax driver
const client = new CassandraClient(); // Assume configured connection details
// This is the main entry point for the orchestrator logic
async function handleSagaEvent(sagaId, event) {
console.log(`Handling event ${event.type} for saga ${sagaId}`);
// 1. Load previous state from Cassandra
const savedState = await client.getSagaState(sagaId);
if (!savedState) {
throw new Error(`Saga with id ${sagaId} not found.`);
}
// 2. Restore the machine to its previous state
const restoredState = JSON.parse(savedState.state_json);
const service = interpret(sagaMachine).start(restoredState);
// 3. Send the new event to the restored machine
service.send(event);
const newState = service.state;
// 4. Persist the new state
await client.updateSagaState({
sagaId: sagaId,
currentState: newState.value,
stateJson: JSON.stringify(newState),
contextJson: JSON.stringify(newState.context),
});
console.log(`Saga ${sagaId} transitioned to ${newState.value}`);
// 5. Trigger next side-effect (e.g., publish to Pub/Sub)
// This part is crucial. The state machine definition itself is pure,
// but the orchestrator is responsible for executing the actions
// that the state transitions dictate. A real implementation would parse
// newState.actions to decide which Pub/Sub topic to publish to.
if(newState.done) {
console.log(`Saga ${sagaId} has finished with state: ${newState.value}`);
}
}
// HTTP Trigger to start a new Saga
export const startSaga = async (req, res) => {
const sagaId = uuidv4();
const initialData = req.body;
const machineWithContext = sagaMachine.withContext({
...sagaMachine.context,
sagaId: sagaId,
initialData: initialData,
forceFailure: initialData.forceFailure || false,
});
const service = interpret(machineWithContext).start();
const initialState = service.state;
// Persist initial state
await client.saveNewSagaState({
sagaId: sagaId,
sagaType: 'DATA_PIPELINE_V1',
currentState: initialState.value,
stateJson: JSON.stringify(initialState),
contextJson: JSON.stringify(initialState.context),
});
console.log(`New saga started with ID ${sagaId}`);
// Trigger first action
// In a real system, you would publish to 'execute_step1' topic here
res.status(202).send({ sagaId });
};
// Pub/Sub Trigger for subsequent steps
export const processSagaEvent = async (message, context) => {
// The message payload should contain sagaId and the event details
const payload = JSON.parse(Buffer.from(message.data, 'base64').toString());
const { sagaId, event } = payload;
try {
await handleSagaEvent(sagaId, event);
} catch (error) {
console.error(`Failed to process event for saga ${sagaId}:`, error);
// Implement a dead-letter queue mechanism here
}
};
这里的代码展示了核心的 load -> transition -> persist 循环。一个常见的错误是在 Cloud Function 中直接 await 状态机 invoke 的 Promise。这是错误的,因为 Serverless 函数应该是短暂的。正确的模式是:状态机转换后,协调器发出一个消息到 Pub/Sub,然后函数立即结束。另一个 worker function 会异步执行任务,完成后再发一个消息回来,触发协调器下一次执行。
方案的局限性与未来展望
这套架构解决了我们最初的数据一致性问题,但它并非银弹。
首先,引入了显著的复杂性。调试一个跨越多个函数、通过 Pub/Sub 通信的分布式流程,比调试单体应用要困难得多。必须依赖完善的分布式追踪和结构化日志,为每个 Saga 实例附加一个唯一的 traceId 是必不可少的。
其次,对 Cassandra 的运维依赖。虽然 Chef 实现了自动化,但集群的监控、备份、修复和容量规划依然需要投入人力。如果团队不具备这方面的专业知识,或者业务对延迟不那么敏感,选择一个全托管的数据库(如 DynamoDB 或 Firestore)可能是更明智的、权衡后的选择。
未来的一个优化方向是改进协调器的性能。当前,每次事件都会触发一次完整的“读-反序列化-处理-序列化-写”流程。对于状态转换非常频繁的 Saga,这可能成为瓶頸。可以探索批处理事件,或者在协调器内部使用更高效的内存缓存来减少对 Cassandra 的重复读取,但这也会增加状态一致性的风险,需要谨慎设计。另一个方向是探索使用持久化 actor 模型(如 Akka 或 Dapr)来实现,这可能会提供一种更原生的方式来管理有状态的、长生命周期的实体。