Spring Boot 整合 RabbitMQ 实现延迟消息

Posted by 暮夏有五 on 2021-01-23
Estimated Reading Time 8 Minutes
Words 1.9k In Total
Viewed Times

本文主要讲解整合 RabbitMQ 实现延迟消息的过程,以发送延迟消息取消超时订单为例。RabbitMQ 是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

RabbitMQ 简介


RabbitMQ 是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

RabbitMQ 的安装和使用

安装 Erlang,下载地址:erlang.org/download/ot…

安装 RabbitMQ,下载地址:dl.bintray.com/rabbitmq/al…

安装完成后,进入 RabbitMQ 安装目录下的 sbin 目录:

img

在地址栏输入 cmd 并回车启动命令行,然后输入以下命令启动管理功能:

1
$ rabbitmq-plugins enable rabbitmq_management

img

访问地址查看是否安装成功:http://localhost:15672/,输入账号密码并登录:guest guest:

img

创建帐号并设置其角色为管理员:mall mall

img

创建一个新的虚拟 host为:/mall

img

点击 mall 用户进入用户配置页面:

img

给 mall 用户配置该虚拟 host 的权限:

img

至此,RabbitMQ 的安装和配置完成。

RabbitMQ的消息模型

img

标志 中文名 英文名 描述
P 生产者 Producer 消息的发送者,可以将消息发送到交换机
C 消费者 Consumer 消息的接收者,从队列中获取消息进行消费
X 交换机 Exchange 接收生产者发送的消息,并根据路由键发送给指定队列
Q 队列 Queue 存储从交换机发来的消息
type 交换机类型 type direct表示直接根据路由键(orange/black)发送消息

业务场景说明


用于解决用户下单以后,订单超时如何取消订单的问题。

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

整合 RabbitMQ 实现延迟消息

引入依赖

在 pom.xml 中添加相关依赖:

1
2
3
4
5
6
7
8
9
10
11
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

相关配置

修改 application.yml 文件,在 spring 节点下添加 rabbitmq 相关配置:

1
2
3
4
5
6
7
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true

创建消息队列的枚举配置类 QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 消息队列枚举配置
*/
@Getter
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;

QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}

创建 RabbitMQ 配置

用于配置交换机、队列及队列与交换机的绑定关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* 消息队列配置
*/
@Configuration
public class RabbitMqConfig {

/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}

/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}

/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}

/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}

}

在 RabbitMQ 管理页面可以看到以下交换机和队列:

img

img

img

img

交换机及队列说明

  • mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为 mall.order.cancel,一旦有消息以 mall.order.cancel 为路由键发过来,会发送到此队列。
  • mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为 mall.order.cancel.ttl,一旦有消息以 mall.order.cancel.ttl 为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到 mall.order.cancel(取消订单消息消费队列)。

创建消息发送者

创建 CancelOrderSender 类,用于向订单延迟消息队列(mall.order.cancel.ttl)里发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 取消订单消息的发出者
*/
@Component
public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;

public void sendMessage(Long orderId,final long delayTimes){
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}

创建消息接收者

创建 CancelOrderReceiver 类,用于从取消订单的消息队列(mall.order.cancel)里接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 取消订单消息的处理者
*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}

创建 OmsPortalOrderService 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 前台订单管理Service
*/
public interface OmsPortalOrderService {

/**
* 根据提交信息生成订单
*/
@Transactional
CommonResult generateOrder(OrderParam orderParam);

/**
* 取消单个超时订单
*/
@Transactional
void cancelOrder(Long orderId);
}

创建 OmsPortalOrderServiceImpl 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 前台订单管理Service
*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;

@Override
public CommonResult generateOrder(OrderParam orderParam) {
//todo 执行一系类下单操作,具体参考mall项目
LOGGER.info("process generateOrder");
//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
sendDelayMessageCancelOrder(11L);
return CommonResult.success(null, "下单成功");
}

@Override
public void cancelOrder(Long orderId) {
//todo 执行一系类取消订单操作,具体参考mall项目
LOGGER.info("process cancelOrder orderId:{}",orderId);
}

private void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间,假设为60分钟
long delayTimes = 30 * 1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}

}

创建 OmsPortalOrderController 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 订单管理Controller
*/
@Controller
@Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;

@ApiOperation("根据购物车信息生成订单")
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}

进行接口测试


调用下单接口

注意:已经将延迟消息时间设置为 30 秒。

img

img

img

更多干货请移步:https://antoniopeng.com


If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !