【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

虚幻大学 xuhss 496℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

前景回顾

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

【mq】从零开始实现 mq-05-实现优雅停机

【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

【mq】从零开始实现 mq-07-负载均衡 load balance

【mq】从零开始实现 mq-08-配置优化 fluent

【mq】从零开始实现 mq-09-消费者拉取消息 pull message

【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

状态回执

大家好,我是老马。

上一节我们只实现了拉取消息的实现,但是缺少了消费状态回执。

这一节我们一起来学习下如何实现状态回执。

2cf7307d35334c6ca1cbf27e0a93f246 - 【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

代码实现

回执状态的设计

我们规定如下几种回执状态:

package com.github.houbb.mq.common.constant;

/**
 * @author binbin.hou
 * @since 0.0.3
 */
public final class MessageStatusConst {

    private MessageStatusConst(){}

    /**
 * 待消费
 * ps: 生产者推送到 broker 的初始化状态
 */
    public static final String WAIT\_CONSUMER = "W";

    /**
 * 推送给消费端处理中
 * ps: broker 准备推送时,首先将状态更新为 P,等待推送结果
 * @since 0.1.0
 */
    public static final String TO\_CONSUMER\_PROCESS = "TCP";

    /**
 * 推送给消费端成功
 * @since 0.1.0
 */
    public static final String TO\_CONSUMER\_SUCCESS = "TCS";

    /**
 * 推送给消费端失败
 * @since 0.1.0
 */
    public static final String TO\_CONSUMER\_FAILED = "TCF";

    /**
 * 消费完成
 */
    public static final String CONSUMER\_SUCCESS = "CS";

    /**
 * 消费失败
 */
    public static final String CONSUMER\_FAILED = "CF";

    /**
 * 稍后消费
 * @since 0.1.0
 */
    public static final String CONSUMER\_LATER = "CL";

}

消费者状态回执

我们在消费之后,添加状态回执:

for(MqMessage mqMessage : mqMessageList) {
    IMqConsumerListenerContext context = new MqConsumerListenerContext();
    final String messageId = mqMessage.getTraceId();
    ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
    log.info("消息:{} 消费结果 {}", messageId, consumerStatus);

    // 状态同步更新
    MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
    log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));
}

回执实现,根据 messageId 更新对应的消息消费状态。

public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
    final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
    req.setMessageId(messageId);
    req.setMessageStatus(consumerStatus.getCode());
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_CONSUMER_STATUS);

    // 重试
    return Retryer.newInstance()
 .maxAttempt(consumerStatusMaxAttempt)
 .callable(new Callable() {
 @Override
 public MqCommonResp call() throws Exception {
 Channel channel = getChannel(null);
 MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
 if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
 throw new MqException(ConsumerRespCode.CONSUMER\_STATUS\_ACK\_FAILED);
 }
 return resp;
 }
 }).retryCall();
}

Broker 回执处理

消息分发

// 消费者消费状态 ACK
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {
    MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
    final String messageId = req.getMessageId();
    final String messageStatus = req.getMessageStatus();
    return mqBrokerPersist.updateStatus(messageId, messageStatus);
}

简单实现

这里是基于本地 map 更新状态的,性能比较差。

后续会以 mysql 实现。

public MqCommonResp updateStatus(String messageId, String status) {
    // 这里性能比较差,所以不可以用于生产。仅作为测试验证
    for(List list : map.values()) {
 for(MqMessagePersistPut put : list) {
 MqMessage mqMessage = put.getMqMessage();
 if(mqMessage.getTraceId().equals(messageId)) {
 put.setMessageStatus(status);
 break;
 }
 }
 }

 MqCommonResp commonResp = new MqCommonResp();
 commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
 commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
 return commonResp;
}

小结

对于消息状态的细化,更加便于我们后续的管理,和问题的定位。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

转载请注明:xuhss » 【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

喜欢 (0)

您必须 登录 才能发表评论!