领域驱动模型DDD(三)——使用Saga管理事务

虚幻大学 xuhss 377℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

前言

虽然一直说想写一篇关于Saga模式,在多次尝试后不得不承认这玩意儿的仿制代码真不是我一个菜鸟就能完成的,所以还是妥协般地引用现成的Eventuate Tram Saga框架(虽然我对它一直很反感)和Seata的Saga模式。有一说一,我极其不愿意采用这种封装好的框架和解决方案对知识进行讲解,因为庞大的架构和源码对读者来说跨度太大,可是如果想把它内部流程讲清又要花费很大的精力进行详解,而且太考验文章的叙述文字功底了。

这有违我一直提倡的:“架构”是一种理论思想而非具体百搭的代码段。鉴于此,此章我会竭尽所能把基于框架的Saga模式讲解清晰,希望读者能够反复阅读,有不解的地方尽情在评论区询问。

Saga是什么?

分布式事务的挑战

在分布式系统中会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下的保证事务特性(ACID)的机制运转称之为分布式事务。

多个服务、数据库、消息代理之间在很多时候是必须维持数据一致性。特别是涉及金融交易的模块,即使可能只是出现1的偏差,也会因为蝴蝶效应而导致巨大损失。最原始的分布式事务管理的实施标准是XA模式(Seata AT和XA模式),其采用了两段式提交来保证事务中所有参与者都是健康且同时完成提交,或则在事务失败时同时进行回滚。

但XA模式也衍生出相关的问题:1、许多新技术包括NoSQL、RabbitMq并不支持XA标准的分布式事务;2、其性能上因为采用阻塞性协议,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态,这就容易出现响应时间增长、死锁等问题;3、由于分布式事务采用的时同步进程间通信(发送请求就必须收到应答,否则不进行下次传输),这导致一旦分布式事务中的一个参与方出现问题就会让整个系统无法正常使用,降低了系统总体的可用性。

Eric Brewer在著名的CAP理论中证明了系统只要在一致性、可用性、分区容错性中保证两个就行。

分区容错性:在分布式系统中因为网络通讯故障而导致整个整体被分割成一个个分区,而这些分区仍可以继续对外供满足一致性和可用性的服务。(必要的)

可用性:系统一直处于可用状态,能正常响应数据,但是不保证响应数据为最新数据。

一致性:数据在多个副本之间能够保持一致的特性(Mysql主从表)。

今天,大部分架构师会倾向保证系统可用性,而将数据强制一致性的要求降低至满足最终一致便可。Saga正是为了解决微服务架构下数据一致性的问题,构建在松耦合、异步服务之上的机制。

Saga的机制

当本地事务完成时服务就会发布消息(图中Create order完成时发布消息),然后触发Saga中的下一个步骤(触发库存服务中预减库存)。通过消息发送的方式可以确保所有参与服务之间的松耦合,也可以让整个业务流程实现异步通信,即使其中一些消息的接收方不可用,消息代理也会先进性缓存,直到消息被接受为止。

695e3580caeac6703f2f8a08ddf697e6 - 领域驱动模型DDD(三)——使用Saga管理事务

Saga的补偿式事务回滚

用八个字即可概括:有则继续,无则补偿。例如:当我从钱包中拿出5块钱跟柜员说从冰箱里拿一根巧克力甜筒,如果柜员发现冰箱中有巧克力甜筒,就把甜筒拿出给我,把钱放入收银柜中。如果柜员发现冰箱没有巧克力甜筒,则把钱返回给我放回钱包

以上例子中从 ①我拿出钱支付->②柜员接过钱确认有存货->③给我甜筒->④将钱放入收银柜中->⑤我离开店铺 是一整个活动流程。其中每一小段都是一个事务(不可分割),当没有存货时就把钱返还给我确保钱包的钱没有少,这里的还钱便是给钱的补偿事务。

其中①-③被称为可补偿性事务,因为它们后面跟着的步骤可能失败,例如对①来说失败情况就是没有存货,那么补偿事务就是还钱;对③来说失败情况可能是给了假钞,那么补偿事务就是要回甜筒放回货柜里。第④步被称为关键性事务,因为后面跟着的是不可能失败的步骤(如果第④步已经成功完成,其实已经说明整个流程是成功的,因为我肯定能离开店铺)。对于最后的步骤成为可重复性事务,因为它们一定会成功。

Saga的协调模式

·协同式:把Saga的决策和决定执行顺序的逻辑分布在Saga的每一个参与方上,让它们各自通过交换事件的方式进行沟通。(各管各的)

·编排器:把Saga的决策和决定执行顺序的逻辑集中在一个Saga编排器类(理解成一个管理中心)上,Saga编排器发出命令式的消息给每一个参与方,指挥这些参与方完成具体操作。(中央调度)

协同式Saga

在下图中是简单的订单创建->支付成功->完成订单的流程,其中库存服务和支付服务通过订阅相关的消息通道接受事件消息,从而能够按顺序完成需要自己参与的事务工作。

07379882cd86d5776edf3b2ebe3eacee - 领域驱动模型DDD(三)——使用Saga管理事务

以上是支付成功情况下,支付服务就发布PaySucceed事件,之后库存服务和订单服务就陆续完成减库存和完成订单的操作。

而出现支付失败情况下,支付服务就发布PayFail事件,库存服务会消费该事件并进行Inventory cancelReduction(减库存取消)并发布ReductionFail(减库存失败),订单服务就会消费该事件并进行Cancel order(取消订单)操作。

8f0deba6dfab8d54a637cb02606b785a - 领域驱动模型DDD(三)——使用Saga管理事务

协同式的好处:

·松耦合:一眼看去就知道啦,参与方与订阅方除了消息事件的通信,并不会产生耦合;

·简单:只需要在创建、更新、删除业务对象时发布事件即可;
弊端:

·过度分散:代码中没有集中定义Saga的地方,因为分布在各个服务中,所以在进行编写和理解时难度较大;

·服务之间循环依赖:参考上图,参与方和订阅方身份一直在相互转换相互订阅,虽然不一定引发问题,但这种循环依赖是一种不好的设计风格;

编排式Saga

编排式时实现Saga的另一种方式,使用此方式时开发人员定义一个编排器类,这个类职责就是对外告诉Saga的参与方需要做什么。Saga编排器采用同步/异步方式相应方式进行通信。

cd541e6121eb8bebda898b3a90a68b52 - 领域驱动模型DDD(三)——使用Saga管理事务

上图很明显与协同式Saga相比,流程更加简洁明了。当然它仅仅只是描述了正常情况下整个运行流程,而涉及到失败情况需要进行实物补偿时我们就得借助状态机模型(有点类似流程图,成功时怎么样,失败了怎么样)对所有可能出现的场景进行判断描述。

7ab227a25a5f5dfa43691a65e45b4d2a - 领域驱动模型DDD(三)——使用Saga管理事务

上图就是状态机,例如当状态机从Inventory reduction(预锁库存)状态可以转化为Order Pay或者是Reject Inventory reduction状态,当它收到正常回复Inventory Reduction(锁库存成功),那状态转化为Order Pay。而如果收到Inventory Reduction Failed回复(锁库存失败),则状态转为Reject Inventory reduction。(注:状态、消息回复、命令都可以根据自己习惯定义)

出上面例举以外,阿里推出的分布式事务管理Seata中的Saga模式也是基于状态机引擎的Saga实现,此处我引用官网的图片简单讲解下(就按流程图来理解):

b2ac93dac804dd3ebde60d71bd3bb761 - 领域驱动模型DDD(三)——使用Saga管理事务

图中CompensateReduceInventory与CompensateReduceBalance是补偿式事务,当ReduceInventory在理想情况下到达ReduceBalance后触发了CompensationTrigger(补偿触发器)时,CompensateReduceInventory与CompensateReduceBalance两个补偿式事务就会分别对ReduceInventory和ReduceBalance进行补偿,并最终返回Fail的状态。

可以看到状态机的存在是为了让我们更直观地看清楚Saga整个运行流程,能够很好地帮助开发人员进行理解,简化了业务逻辑让我们的目光更加注重如何改善隔离性问题。并且编排式的Saga参与者之间的依赖关系更加简单,不存在以来循环的问题。

基于Eventuate Tram框架的Saga模式讲解

此小章节为了读者能够更好地理解Saga模式在实际落地的应用,本人摘选了《微服务架构设计模式》中的一部分代码,并对代码进行精简优化以便保证读者可以从业务流程带入到代码运行程的连贯性,如果希望了解详细代码各位可以去看原著,百度也有免费的电子书。

我们先看一副状态机:

20a9ee532758b45f39c7eb48d7e05ea8 - 领域驱动模型DDD(三)——使用Saga管理事务

那么我们现在创建一个Saga编排器(CreateOrderSaga),用看图写作的思维,将整个状态机转化为一篇流水账式的代码:

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

java`public class CreateOrderSaga implements SimpleSaga {

private Logger logger = LoggerFactory.getLogger(getClass());

private SagaDefinition sagaDefinition;

public CreateOrderSaga(OrderServiceProxy orderService, ConsumerServiceProxy consumerService, KitchenServiceProxy kitchenService,
AccountingServiceProxy accountingService) {
this.sagaDefinition =
step()
//补偿式事务,makeRejectOrderCommand创建“拒绝订单命令”,使用rderService.reject中配置的规则发送和回复
.withCompensation(orderService.reject, CreateOrderSagaState::makeRejectOrderCommand)
.step()
//这一步调用参与者,makeValidateOrderByConsumerCommand创建“根据消费者命令进行验证订单”的命令,使用consumerService.validateOrder中配置的规则发送和回复
.invokeParticipant(consumerService.validateOrder, CreateOrderSagaState::makeValidateOrderByConsumerCommand)
.step()
//这一步调用参与者,makeCreateTicketCommand创建“创建收据”的命令,使用kitchenService.create中配置的规则发送和回复
.invokeParticipant(kitchenService.create, CreateOrderSagaState::makeCreateTicketCommand)
//接受到上一步成功回复“CreateTicketReply”的命令后,执行handleCreateTicketReply的命令
.onReply(CreateTicketReply.class, CreateOrderSagaState::handleCreateTicketReply的命令)
//补偿式事务,makeCancelCreateTicketCommand创建“取消创建票据”的命令,使用kitchenService.cancel中的配置发送和回复
.withCompensation(kitchenService.cancel, CreateOrderSagaState::makeCancelCreateTicketCommand)
.step()
//这一步调用参与者,makeAuthorizeCommand创建“授权”命令,使用accountingService.authorize中的配置发送和回复
.invokeParticipant(accountingService.authorize, CreateOrderSagaState::makeAuthorizeCommand)
.step()
//这一步调用参与者,makeConfirmCreateTicketCommand创建“确认创建票”,使用kitchenService.confirmCreate中的配置发送和回复
.invokeParticipant(kitchenService.confirmCreate, CreateOrderSagaState::makeConfirmCreateTicketCommand)
.step()
//这一步调用参与者,makeApproveOrderCommand创建“批准订单”的命令,使用orderService.approve中的配置发送和回复
.invokeParticipant(orderService.approve, CreateOrderSagaState::makeApproveOrderCommand)
.build();
}

@Override
public SagaDefinition getSagaDefinition() {
return sagaDefinition;
}
}`
OrderServiceProxy适配器:

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

java`public class OrderServiceProxy {

public final CommandEndpoint reject = CommandEndpointBuilder
//要发送的“拒绝订单”的命令
.forCommand(RejectOrderCommand.class)
//发送命令到命令通讯通道:OrderServiceChannels.orderServiceChannel
.withChannel(OrderServiceChannels.orderServiceChannel)
//回复Saga编排器执行成功的应答
.withReply(Success.class)
.build();

public final CommandEndpoint approve = CommandEndpointBuilder
//执行批准订单的命令
.forCommand(ApproveOrderCommand.class)
.withChannel(OrderServiceChannels.orderServiceChannel)
.withReply(Success.class)
.build();
}`
KitchenServiceProxy适配器:

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

java`public class KitchenServiceProxy {

public final CommandEndpoint create = CommandEndpointBuilder
//要发送的“创建票据”的命令
.forCommand(CreateTicket.class)
//发送命令到命令通信通道:KitchenServiceChannels.kitchenServiceChannel
.withChannel(KitchenServiceChannels.kitchenServiceChannel)
//回复编排器CreateTicketReply的应答
.withReply(CreateTicketReply.class)
.build();

public final CommandEndpoint confirmCreate = CommandEndpointBuilder
.forCommand(ConfirmCreateTicket.class)
.withChannel(KitchenServiceChannels.kitchenServiceChannel)
.withReply(Success.class)
.build();
public final CommandEndpoint cancel = CommandEndpointBuilder
.forCommand(CancelCreateTicket.class)
.withChannel(KitchenServiceChannels.kitchenServiceChannel)
.withReply(Success.class)
.build();

}`
命令传输的通信通道:

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

java`public class OrderServiceChannels {
public static final String orderServiceChannel = "orderService";
}

public class KitchenServiceChannels {
public static final String kitchenServiceChannel = "kitchenService";
}`
OrderCommandHandlers定义了接受到命令消息后的具体操作:

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

java`public class OrderCommandHandlers {

@Autowired
private OrderService orderService;

public CommandHandlers commandHandlers() {
return SagaCommandHandlersBuilder
//从此命令通道中获取命令消息
.fromChannel("orderService")
//接受到批准订单命令后执行approveOrder方法
.onMessage(ApproveOrderCommand.class, this::approveOrder)
//接受到拒绝订单命令后执行rejectOrder方法,后面都是如此,自己理解
.onMessage(RejectOrderCommand.class, this::rejectOrder)
.onMessage(BeginCancelCommand.class, this::beginCancel)
......
.build();

}

public Message approveOrder(CommandMessage cm) {
long orderId = cm.getCommand().getOrderId();
orderService.approveOrder(orderId);
return withSuccess();
}

public Message rejectOrder(CommandMessage cm) {
long orderId = cm.getCommand().getOrderId();
orderService.rejectOrder(orderId);
return withSuccess();
}
.........
}`
至此,整个关于Saga基础内容讲解完毕,大家可以根据上面的状态机和代码试着画一下Saga编排器的设计图,看看是否真地对Saga内容有进一步的了解。

结语

本文内容结构参照了《微服务架构设计模式》,是在对此书阅读以及结合网络上其他的相关案例总结成连贯,翻译更准确的知识点,其中因为个人水平问题可能存在歧义或疏漏,希望各位读者存在疑问或觉得文章中有需要改正的地方尽情指出。之后将在Saga的基础上讲解事件溯源和CQRS,敬请期待。

转载请注明:xuhss » 领域驱动模型DDD(三)——使用Saga管理事务

喜欢 (0)

您必须 登录 才能发表评论!