SpringBoot整合RabbitMQ实战附加死信交换机

虚幻大学 xuhss 165℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

复制代码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

xml `

org.springframework.bootgroupId>
spring-boot-starter-amqpartifactId>
2.7.0version>
dependency>`
## 配置

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16

yml`spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: admin\_host
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true #开启失败重试
max-attempts: 3 #最大重试次数
initial-interval: 1000 #重试间隔时间 毫秒`
## 配置文件

RabbitConfig

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
* 56
* 57
* 58
* 59
* 60
* 61
* 62
* 63
* 64
* 65
* 66
* 67
* 68
* 69
* 70
* 71
* 72
* 73
* 74
* 75
* 76
* 77
* 78
* 79
* 80
* 81
* 82
* 83
* 84
* 85
* 86
* 87
* 88
* 89
* 90
* 91
* 92
* 93
* 94
* 95
* 96
* 97
* 98
* 99
* 100
* 101
* 102
* 103
* 104
* 105
* 106
* 107
* 108
* 109
* 110
* 111
* 112
* 113
* 114
* 115
* 116
* 117
* 118
* 119
* 120
* 121
* 122
* 123
* 124
* 125
* 126
* 127
* 128
* 129
* 130
* 131
* 132
* 133
* 134
* 135
* 136
* 137
* 138
* 139
* 140
* 141
* 142
* 143
* 144
* 145
* 146
* 147
* 148
* 149
* 150
* 151
* 152
* 153
* 154
* 155
* 156
* 157
* 158
* 159
* 160
* 161
* 162
* 163
* 164
* 165
* 166
* 167
* 168
* 169
* 170
* 171
* 172
* 173
* 174
* 175
* 176
* 177
* 178
* 179
* 180
* 181
* 182

java`package com.example.rabitmqdemo.mydemo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
* Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
* Queue:消息的载体,每个消息都会被投到一个或多个队列。
* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
* Producer:消息生产者,就是投递消息的程序.
* Consumer:消息消费者,就是接受消息的程序.
* Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Slf4j
@Component
public class RabbitConfig {
//业务交换机
public static final String EXCHANGE\_PHCP = "phcp";
//业务队列1
public static final String QUEUE\_COMPANY = "company";
//业务队列1的key
public static final String ROUTINGKEY\_COMPANY = "companyKey";
//业务队列2
public static final String QUEUE\_PROJECT = "project";
//业务队列2的key
public static final String ROUTINGKEY\_PROJECT = "projectKey";

//死信交换机
public static final String EXCHANGE\_PHCP\_DEAD = "phcp\_dead";
//死信队列1
public static final String QUEUE\_COMPANY\_DEAD = "company\_dead";
//死信队列2
public static final String QUEUE\_PROJECT\_DEAD = "project\_dead";
//死信队列1的key
public static final String ROUTINGKEY\_COMPANY\_DEAD = "companyKey\_dead";
//死信队列2的key
public static final String ROUTINGKEY\_PROJECT\_DEAD = "projectKey\_dead";

// /**
// * 解决重复确认报错问题,如果没有报错的话,就不用启用这个
// *
// * @param connectionFactory
// * @return
// */
// @Bean
// public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return factory;
// }
/**
* 声明业务交换机
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean("exchangePhcp")
public DirectExchange exchangePhcp() {
return new DirectExchange(EXCHANGE_PHCP);
}

/**
* 声明死信交换机
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean("exchangePhcpDead")
public DirectExchange exchangePhcpDead() {
return new DirectExchange(EXCHANGE_PHCP_DEAD);
}

/**
* 声明业务队列1
*
* @return
*/
@Bean("queueCompany")
public Queue queueCompany() {
Map arguments = new HashMap(2);
arguments.put("x-dead-letter-exchange",EXCHANGE\_PHCP\_DEAD);
//绑定该队列到死信交换机的队列1
arguments.put("x-dead-letter-routing-key",ROUTINGKEY\_COMPANY\_DEAD);
return QueueBuilder.durable(QUEUE\_COMPANY).withArguments(arguments).build();
}
/**
* 声明业务队列2
*
* @return
*/
@Bean("queueProject")
public Queue queueProject() {
Map arguments = new HashMap(2);
arguments.put("x-dead-letter-exchange",EXCHANGE\_PHCP\_DEAD);
//绑定该队列到死信交换机的队列2
arguments.put("x-dead-letter-routing-key",ROUTINGKEY\_PROJECT\_DEAD);
return QueueBuilder.durable(QUEUE\_PROJECT).withArguments(arguments).build();
}

/**
* 声明死信队列1
*
* @return
*/
@Bean("queueCompanyDead")
public Queue queueCompanyDead() {
return new Queue(QUEUE\_COMPANY\_DEAD);
}
/**
* 声明死信队列2
*
* @return
*/
@Bean("queueProjectDead")
public Queue queueProjectDead() {
return new Queue(QUEUE\_PROJECT\_DEAD);
}

/**
* 绑定业务队列1和业务交换机
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY\_COMPANY);
}

/**
* 绑定业务队列2和业务交换机
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY\_PROJECT);
}

/**
* 绑定死信队列1和死信交换机
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY\_COMPANY\_DEAD);
}

/**
* 绑定死信队列2和死信交换机
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY\_PROJECT\_DEAD);
}
}`
## 生产者

RabbltProducer

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
* 56
* 57
* 58
* 59
* 60
* 61
* 62
* 63
* 64
* 65
* 66
* 67
* 68
* 69
* 70
* 71
* 72
* 73
* 74
* 75
* 76
* 77
* 78
* 79
* 80
* 81
* 82
* 83
* 84
* 85
* 86
* 87

java`package com.example.rabitmqdemo.mydemo.producer;

import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Resource
private RabbitTemplate rabbitTemplate;

/**
* 初始化消息确认函数
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);

}

/**
* 发送消息服务器确认函数
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}

/**
* 消息发送失败,消息回调函数
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
String str = new String(returnedMessage.getMessage().getBody());
System.out.println("消息发送失败:" + str);
}

/**
* 处理消息发送到队列1
* @param str
*/
public void sendCompany(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE\_PHCP,RabbitConfig.ROUTINGKEY\_COMPANY,str,correlationData);
}

/**
* 处理消息发送到队列2
* @param str
*/
public void sendProject(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE\_PHCP,RabbitConfig.ROUTINGKEY\_PROJECT,str,correlationData);
}
}`
## 业务消费者

RabbitConsumer

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
* 55
* 56
* 57
* 58
* 59
* 60
* 61
* 62
* 63
* 64
* 65
* 66
* 67
* 68
* 69
* 70
* 71
* 72
* 73
* 74
* 75
* 76
* 77
* 78
* 79
* 80
* 81
* 82

java`package com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 监听业务交换机
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 监听业务队列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company")
public void company(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 监听业务队列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project")
public void project(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}`
## 死信消费者

RabbitConsumer

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54

java`package com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 监听死信交换机
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumerDead {
/**
* 处理死信队列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company\_dead")
public void company\_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
/**
* 处理死信队列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project\_dead")
public void project\_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
}`
## 测试

MqController

```
复制代码
```
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22

java`package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
@Resource
private RabbltProducer rabbltProducer;

@RequestMapping("/handleCompany")
public void handleCompany(@RequestBody String jsonStr){
rabbltProducer.sendCompany(jsonStr);
}

}`

转载请注明:xuhss » SpringBoot整合RabbitMQ实战附加死信交换机

喜欢 (6)

您必须 登录 才能发表评论!