本文目录导读:

实现一个带状态机的工作流引擎,核心在于将业务流程抽象为状态转换,每个任务或整个流程实例都处于一个明确的状态,只有满足特定条件(事件)时,才能通过执行特定动作(Action)转移到下一个状态。
以下是一个从设计到代码实现(以 Java 为例)的完整指南。
第一部分:核心概念与设计
- 状态 (State):流程或节点所处的稳定阶段。
待提交、审批中、已通过、已驳回。 - 事件 (Event):触发状态转换的外部或内部动作。
提交申请、审批通过、驳回。 - 转换 (Transition):定义了从源状态到目标状态的路径,以及触发的条件。
- 公式:
源状态 + 事件 + 条件(可选) = 目标状态 + 执行动作
- 公式:
- 动作 (Action):状态转换时执行的业务逻辑,发送邮件、更新数据库、调用第三方 API。
- 上下文 (Context):贯穿整个流程的数据载体,例如申请表、审批意见、用户信息。
第二部分:架构分层
为了保持可维护性和灵活性,建议分层设计:
| 层次 | 职责 | 关键组件 |
|---|---|---|
| 定义层 | 定义工作流的元数据(状态、事件、转换规则)。 | StateMachineDefinition, State, Event |
| 引擎层 | 核心调度逻辑,解析规则,执行转换。 | StateMachineEngine, StateMachineExecutor |
| 持久层 | 存储工作流实例状态、快照、历史记录。 | WorkflowInstance, TransitionHistory |
| 业务层 | 实现具体的 Action,绑定到流程节点。 | ActionHandler 接口实现, ConditionEvaluator |
第三部分:代码实现步骤(Java 示例)
我们将使用 Spring Boot + JPA 作为基础框架,并避免引入重型外部依赖(如 Activiti),从零搭建。
Step 1:定义枚举与接口
// 1. 定义状态和事件枚举(根据业务调整)
public enum OrderState {
DRAFT, PENDING_APPROVAL, APPROVED, REJECTED, CLOSED
}
public enum OrderEvent {
SUBMIT, APPROVE, REJECT, CLOSE
}
// 2. 定义 Action 接口
public interface StateAction {
void execute(WorkflowInstance instance, Object payload);
}
// 3. 定义条件接口(决定是否能走某条路径)
public interface ConditionEvaluator {
boolean evaluate(WorkflowInstance instance, OrderEvent event);
}
Step 2:构建状态机定义(DSL 风格)
这是最核心的部分,使用 Builder 模式构建转换图谱。
import java.util.*;
public class StateMachineDefinition<S, E> {
// 核心存储:源状态 -> (事件 -> 包含目标状态、动作、条件的对象)
private final Map<S, Map<E, TransitionConfig<S>>> transitions = new HashMap<>();
// 内部类:配置一条转换
public static class TransitionConfig<S> {
private final S targetState;
private StateAction action;
private ConditionEvaluator condition;
public TransitionConfig(S targetState) {
this.targetState = targetState;
}
// getters ...
}
// Builder 模式
public static class Builder<S, E> {
private StateMachineDefinition<S, E> definition = new StateMachineDefinition<>();
public Builder<S, E> from(S source) {
return new BuilderStage<>(this, source);
}
public StateMachineDefinition<S, E> build() {
return definition;
}
}
// 链式调用的中间阶段
public static class BuilderStage<S, E> {
private final Builder<S, E> builder;
private final S source;
public BuilderStage(Builder<S, E> builder, S source) {
this.builder = builder;
this.source = source;
}
public OnStage<S, E> on(E event) { return new OnStage<>(builder, source, event); }
}
public static class OnStage<S, E> {
// ... 省略实现细节,最终调用 definition.addTransition(source, event, target, action, condition)
}
// 添加一条转换规则
public void addTransition(S source, E event, S target, StateAction action) {
transitions.computeIfAbsent(source, k -> new HashMap<>())
.put(event, new TransitionConfig<>(target).setAction(action));
}
// 获取转换配置
public TransitionConfig<S> getTransition(S source, E event) {
return transitions.getOrDefault(source, new HashMap<>()).get(event);
}
}
Step 3:创建持久化实体
@Entity
@Table(name = "workflow_instance")
public class WorkflowInstance {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String bizId; // 业务单据 ID(如订单号)
private String currentState; // 当前状态
private String machineName; // 状态机名称(用于区分不同流程)
@OneToMany(mappedBy = "instance")
private List<TransitionHistory> histories;
// getters, setters ...
}
@Entity
@Table(name = "transition_history")
public class TransitionHistory {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String fromState;
private String toState;
private String event;
private String operator;
private LocalDateTime createdAt;
@ManyToOne
private WorkflowInstance instance;
// getters, setters ...
}
Step 4:核心引擎实现
@Service
public class StateMachineEngine {
// 假设 machineRegistry 维护了各种流程的定义
private final Map<String, StateMachineDefinition<OrderState, OrderEvent>> machineRegistry = new HashMap<>();
@Autowired
private WorkflowInstanceRepository instanceRepo;
// 1. 触发事件
public WorkflowInstance fireEvent(String instanceId, OrderEvent event, Object payload) {
WorkflowInstance instance = instanceRepo.findById(instanceId)
.orElseThrow(() -> new RuntimeException("Instance not found"));
StateMachineDefinition<OrderState, OrderEvent> machine = machineRegistry.get(instance.getMachineName());
OrderState currentState = OrderState.valueOf(instance.getCurrentState());
// 2. 获取转换配置
TransitionConfig<OrderState> config = machine.getTransition(currentState, event);
if (config == null) {
throw new IllegalStateException(
String.format("No transition from state [%s] on event [%s]", currentState, event));
}
// 3. 执行条件检查
if (config.getCondition() != null && !config.getCondition().evaluate(instance, event)) {
throw new IllegalStateException("Condition not met for transition");
}
// 4. **执行状态转换(关键操作)**
OrderState targetState = config.getTargetState();
instance.setCurrentState(targetState.name());
// 5. 执行绑定的 Action
if (config.getAction() != null) {
config.getAction().execute(instance, payload);
}
// 6. 持久化(注意并发安全)
saveWithHistory(instance, currentState.name(), targetState.name(), event.name());
return instance;
}
// 使用 @Transactional 保证原子性,并记录历史
@Transactional
protected void saveWithHistory(WorkflowInstance instance, String from, String to, String event) {
TransitionHistory history = new TransitionHistory();
history.setFromState(from);
history.setToState(to);
history.setEvent(event);
history.setInstance(instance);
history.setCreatedAt(LocalDateTime.now());
instance.getHistories().add(history);
instanceRepo.save(instance);
}
}
Step 5:定义具体流程与业务 Action
@Component
public class OrderWorkflowConfigurer {
@PostConstruct
public void configure() {
StateMachineDefinition<OrderState, OrderEvent> machine =
new StateMachineDefinition.Builder<OrderState, OrderEvent>()
.from(OrderState.DRAFT)
.on(OrderEvent.SUBMIT)
.to(OrderState.PENDING_APPROVAL)
.withAction(new SubmitAction()) // 提交后执行的动作
.from(OrderState.PENDING_APPROVAL)
.on(OrderEvent.APPROVE)
.to(OrderState.APPROVED)
.withAction(new NotifyAction()) // 审批通过后通知
.from(OrderState.PENDING_APPROVAL)
.on(OrderEvent.REJECT)
.to(OrderState.DRAFT) // 驳回回到草稿
.from(OrderState.APPROVED)
.on(OrderEvent.CLOSE)
.to(OrderState.CLOSED)
.build();
// 注册到引擎
StateMachineRegistry.register("ORDER_FLOW", machine);
}
}
// 具体 Action 实现
@Component
public class SubmitAction implements StateAction {
@Override
public void execute(WorkflowInstance instance, Object payload) {
System.out.println("订单 " + instance.getBizId() + " 已提交,执行库存预占...");
// 实际调用库存服务
}
}
第四部分:注意事项与优化
-
并发安全:
- 使用
@Version(乐观锁)或SELECT ... FOR UPDATE。 - 关键路径:先查询当前状态,再尝试更新,如果状态已被其他线程修改,应抛出异常或重试。
- 使用
-
幂等性:
- 事件处理函数应该可重入,可以通过历史表(
TransitionHistory)引入事件唯一 ID,检测重复事件。
- 事件处理函数应该可重入,可以通过历史表(
-
可观测性:
- 记录详细的状态迁移日志、耗时、失败原因。
- 提供 API 查询当前状态、下一步可执行的事件。
-
流程复杂度:
- 子状态机:当节点内部有复杂逻辑时,可以使用子状态机(状态嵌套)。
- 并行分支:使用
Fork/Join节点,需要引入令牌(Token)机制。
-
性能优化:
- 缓存状态机定义(通常不会变)。
- 使用异步 Action 执行器,避免阻塞主流程。
第五部分:总结与扩展
| 场景 | 实现思路 |
|---|---|
| 简单流程(订单、审批流) | 上述基于内存的状态机 + JPA 持久化即可,清晰高效。 |
| 复杂流程(BPMN 2.0 标准) | 建议使用成熟工具库,如 Activiti / Flowable,它们原生支持了并行网关、子流程、表达式脚本(Groovy)等。 |
| 分布式工作流 | 关键点在于 分布式事务,Action 调用了外部服务,需要设计补偿回滚(Saga模式) 或重试幂等,此时状态机模型应结合事件驱动架构(如 Kafka + CQRS)。 |
最终建议:如果你的目标是构建一个健壮的生产级工作流,并且团队有精力维护,从零实现状态机引擎可以带来极大的灵活性,但如果业务复杂度高(如需要可视化设计器、支持 BPEL/BPMN 标准),直接基于 Flowable 或 Camunda 进行二次开发会是更高效的选择。