微服务简单实现最终一致性

虚幻大学 xuhss 164℃ 0评论

Python微信订餐小程序课程视频

https://edu.csdn.net/course/detail/36074

Python实战量化交易理财系统

https://edu.csdn.net/course/detail/35475
有花时间去研究masstransit的saga,英文水平不过关,始终无法实现上手他的代码编排的业务,遗憾。

本文通过rabbit和sqlserver实现下单,更新库存,更新产品,模拟数据最终一致性。

项目结构如下,reportService可有可无,这里就相当一个链条,只要两节走通了后面可以接龙,本文有用到不省略。流程:orderservice=>eComm=>reportservice 。

9b8d553b0b01ae32b280d5d33e37a1bf - 微服务简单实现最终一致性

下面先看看order的配置,通过控制器新增订单同时发布订单信息到order_exchange交换机,Key是"order.created,这样就把订单推送到了队列,等到库存服务获取订单去更新库存。

386388494e2e75d08fe0392013ba84c5 - 微服务简单实现最终一致性

?

| 12345678910111213 | // POST api/[HttpPost]public async Task Post([FromBody] OrderDetail orderDetail){var id = await orderCreator.Create(orderDetail);publisher.Publish(JsonConvert.SerializeObject(new OrderRequest { OrderId = id,ProductId = orderDetail.ProductId,Quantity = orderDetail.Quantity }), "order.created", null);} |

更新库存的代码,然后再发送消息告诉order服务,这里有哪个try包裹,如果这里有失败会触发catch,发送减库存失败的消息。order服务消费到这条消息就会执行相应的删除订单操作。代码如下:

?

| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 | using Ecomm.DataAccess;using Ecomm.Models;using Microsoft.Extensions.Hosting;using Newtonsoft.Json;using Plain.RabbitMQ;using System;using System.Collections.Generic;using System.Linq;using System.Threading;using System.Threading.Tasks; namespace Ecomm{public class OrderCreatedListener : IHostedService{private readonly IPublisher publisher;private readonly ISubscriber subscriber;private readonly IInventoryUpdator inventoryUpdator; public OrderCreatedListener(IPublisher publisher, ISubscriber subscriber, IInventoryUpdator inventoryUpdator){this.publisher = publisher;this.subscriber = subscriber;this.inventoryUpdator = inventoryUpdator;} public Task StartAsync(CancellationToken cancellationToken){subscriber.Subscribe(Subscribe);return Task.CompletedTask;} private bool Subscribe(string message, IDictionary<string, object> header){var response = JsonConvert.DeserializeObject(message);try{inventoryUpdator.Update(response.ProductId, response.Quantity).GetAwaiter().GetResult();publisher.Publish(JsonConvert.SerializeObject(new InventoryResponse { OrderId = response.OrderId, IsSuccess = true }), "inventory.response", null);}catch (Exception){publisher.Publish(JsonConvert.SerializeObject(new InventoryResponse { OrderId = response.OrderId, IsSuccess = false }), "inventory.response", null);} return true;} public Task StopAsync(CancellationToken cancellationToken){return Task.CompletedTask;}}} |

  

?

| 12345678910111213141516171819202122232425262728293031323334353637383940414243 | using Ecomm.Models;using Microsoft.Extensions.Hosting;using Newtonsoft.Json;using Plain.RabbitMQ;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks; namespace OrderService{public class InventoryResponseListener : IHostedService{private readonly ISubscriber subscriber;private readonly IOrderDeletor orderDeletor; public InventoryResponseListener(ISubscriber subscriber, IOrderDeletor orderDeletor){this.subscriber = subscriber;this.orderDeletor = orderDeletor;} public Task StartAsync(CancellationToken cancellationToken){subscriber.Subscribe(Subscribe);return Task.CompletedTask;} private bool Subscribe(string message, IDictionary<string, object> header){var response = JsonConvert.DeserializeObject(message);if (!response.IsSuccess){orderDeletor.Delete(response.OrderId).GetAwaiter().GetResult();}return true;} public Task StopAsync(CancellationToken cancellationToken){return Task.CompletedTask;}}} |

  上面的代码是整个服务的核心业务,也很简单就是队列相互通信相互确认操作是否顺利,失败就执行回归操作,而这里我们都会写好对应补偿代码:

using Dapper;
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;

namespace OrderService
{
 public class OrderDeletor : IOrderDeletor
 {
 private readonly string connectionString;

 public OrderDeletor(string connectionString)
 {
 this.connectionString = connectionString;
 }

 public async Task Delete(int orderId)
 {
 using var connection = new SqlConnection(connectionString);
 connection.Open();
 using var transaction = connection.BeginTransaction();
 try
 {
 await connection.ExecuteAsync("DELETE FROM OrderDetail WHERE OrderId = @orderId", new { orderId }, transaction: transaction);
 await connection.ExecuteAsync("DELETE FROM [Order] WHERE Id = @orderId", new { orderId }, transaction: transaction);
 transaction.Commit();
 }
 catch
 {
 transaction.Rollback();
 }
 }
 }
}

库存服务里有发布产品的接口,这里没有做过多的处理,只是把产品新增放到队列,供后面的ReportService服务获取,该服务拿到后会执行产品数量扣除:fa40ad115f302c5dfd0ac067e07fe27c - 微服务简单实现最终一致性

using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ReportService
{
 public class ReportDataCollector : IHostedService
 {
 private const int DEFAULT\_QUANTITY = 100;
 private readonly ISubscriber subscriber;
 private readonly IMemoryReportStorage memoryReportStorage;

 public ReportDataCollector(ISubscriber subscriber, IMemoryReportStorage memoryReportStorage)
 {
 this.subscriber = subscriber;
 this.memoryReportStorage = memoryReportStorage;
 }

 public Task StartAsync(CancellationToken cancellationToken)
 {
 subscriber.Subscribe(Subscribe);
 return Task.CompletedTask;
 }
 private bool Subscribe(string message, IDictionary<string, object> header)
 //private bool ProcessMessage(string message, IDictionary header)
 {
 if (message.Contains("Product"))
 {
 var product = JsonConvert.DeserializeObject(message);
 if (memoryReportStorage.Get().Any(r => r.ProductName == product.ProductName))
 {
 return true;
 }
 else
 {
 memoryReportStorage.Add(new Report
 {
 ProductName = product.ProductName,
 Count = DEFAULT\_QUANTITY
 });
 }
 }
 else
 {
 var order = JsonConvert.DeserializeObject(message);
 if(memoryReportStorage.Get().Any(r => r.ProductName == order.Name))
 {
 memoryReportStorage.Get().First(r => r.ProductName == order.Name).Count -= order.Quantity;
 }
 else
 {
 memoryReportStorage.Add(new Report
 {
 ProductName = order.Name,
 Count = DEFAULT\_QUANTITY - order.Quantity
 });
 }
 }
 return true;
 }

 public Task StopAsync(CancellationToken cancellationToken)
 {
 return Task.CompletedTask;
 }
 }
}

到这里整个流程大概如此。只要理清楚了订单和库存更新这里的业务,后面差不多一样,可以无限递归。代码文末有链接供下载。

这里有一个地方的代码如下,新增库存的时候同时发布消息。假如新增完订单后面崩掉了,这里是个原子操作最佳。

?

| 123456789101112 | [HttpPost]public async Task Post([FromBody] OrderDetail orderDetail){var id = await orderCreator.Create(orderDetail);publisher.Publish(JsonConvert.SerializeObject(new OrderRequest { OrderId = id,ProductId = orderDetail.ProductId,Quantity = orderDetail.Quantity }), "order.created", null);} |

  很遗憾masstransit的saga还没有整明白,那就上cap,完成业务一致性。加了点cap代码因为之前是dapper,所以加了dbcontext和cap相关代码有点小乱。核心代码如下:

?

| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 | using DotNetCore.CAP;using MediatR;using OrderService.Command;using System.Threading;using Ecomm.Models;using System.Collections.Generic; namespace OrderService.Handler{public class InsertOrderDetailHandler : IRequestHandler{private readonly OrderDbContext context;private readonly ICapPublisher cap;public InsertOrderDetailHandler(OrderDbContext context, ICapPublisher cap){this.context = context;this.cap = cap;}public async System.Threading.Tasks.Task Handle(InsertOrderDetailCommand request, CancellationToken cancellationToken){using(var trans =context.Database.BeginTransaction(cap)){var order = context.Orders.Add(new Order{UpdatedTime = System.DateTime.Today,UserId = request.UserId,UserName = request.UserName});var orderDetail = context.OrderDetails.Add(new OrderDetail{OrderId = order.Entity.Id,ProductId = request.ProductId,Quantity = request.Quantity,ProductName = request.ProductName,});context.SaveChanges(); cap.Publish("order.created", new OrderRequest{OrderId = order.Entity.Id,ProductId = orderDetail.Entity.ProductId,Quantity = orderDetail.Entity.Quantity}, new Dictionary<string,string>()) ;trans.Commit();return new InsertOrderDetailModel { OrderDetailid = orderDetail.Entity.Id, OrderId = order.Entity.Id, Success = true };}}}} |

 到这里差不多要结束了,这里的代码都可以调试运行的。因为加了cap,order服务有两套rabbitmq的配置,有冗余,而且有点坑。调试的时候注意,Plain.RabbitMQ支持的交换机不是持久化的,而cap是持久化的,所以有点不兼容。第一次运行可以先确保Plain.RabbitMQ正常,再删掉交换机,cap跑起来了再建持久化交换机,这样cap消息就会被rabbitmq接收,后面就会被库存服务消费。因为我这里cap不会自动绑定队列,Plain.RabbitMQ是可以的。所以需要新建交换机后再绑定队列。而且这里队列以Plain.RabbitMQ生成的名字来绑定。要不然又可能会调试踩坑无法出坑。 用cap不注意你连消息队列都看不到,看到了队列也看不到消费数据,这点不知道是我不会还是cap有什么难的配置。结束。。。

上例项目demo:

liuzhixin405/SimpleOrders_Next (github.com)

超简单微服务demo

liuzhixin405/SimpleOrders (github.com)

转载请注明:xuhss » 微服务简单实现最终一致性

喜欢 (0)

您必须 登录 才能发表评论!