15.1、RocketMQ消息队列
RocketMQ 消息队列
RocketMQ 是 Spring Cloud Alibaba 的消息队列组件。本节将学习 RocketMQ 消息队列。
本节将学习:RocketMQ 简介、安装部署、生产者实现、消费者实现,以及消息类型。
RocketMQ 简介
定义
RocketMQ 是阿里巴巴开源的分布式消息中间件,具有高性能、高可靠、高实时、分布式特点。
核心特性
RocketMQ 核心特性:
- 高性能
- 高可用
- 丰富的消息类型
- 顺序消息
- 事务消息
在商城项目中集成 RocketMQ
步骤1:本地搭建 RocketMQ
文件路径: mall-microservices/docker/rocketmq/docker-compose.yml
version: '3.8' services: rocketmq-nameserver: image: apache/rocketmq:5.1.4 container_name: rocketmq-nameserver ports: - "9876:9876" environment: - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: sh mqnamesrv networks: - mall-network rocketmq-broker: image: apache/rocketmq:5.1.4 container_name: rocketmq-broker ports: - "10909:10909" - "10911:10911" environment: - NAMESRV_ADDR=rocketmq-nameserver:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: sh mqbroker -n rocketmq-nameserver:9876 depends_on: - rocketmq-nameserver networks: - mall-network rocketmq-console: image: styletang/rocketmq-console-ng:latest container_name: rocketmq-console ports: - "8080:8080" environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq-nameserver:9876 depends_on: - rocketmq-nameserver networks: - mall-network networks: mall-network: driver: bridge
启动 RocketMQ:
cd mall-microservices/docker/rocketmq docker-compose up -d # 访问控制台:http://localhost:8080
安装部署
部署步骤
RocketMQ 部署步骤:
-
使用 Docker Compose 启动(推荐):
cd mall-microservices/docker/rocketmq docker-compose up -d -
验证部署:
# 查看容器状态 docker ps | grep rocketmq # 查看日志 docker logs rocketmq-nameserver docker logs rocketmq-broker
步骤2:在订单服务中实现消息生产者
添加 RocketMQ 依赖
文件路径: mall-microservices/order-service/pom.xml
<!-- RocketMQ --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId> </dependency>
配置 RocketMQ
文件路径: mall-microservices/order-service/src/main/resources/application.yml
spring: cloud: stream: rocketmq: binder: name-server: localhost:9876 bindings: order-output: producer: group: order-producer-group topic: order-topic
订单创建后发送消息
文件路径: mall-microservices/order-service/src/main/java/com/mall/orderservice/service/impl/OrderServiceImpl.java
package com.mall.orderservice.service.impl; import com.mall.orderservice.entity.Order; import com.mall.orderservice.mapper.OrderMapper; import com.mall.orderservice.service.OrderService; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.HashMap; import java.util.Map; @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService { @Autowired private RocketMQTemplate rocketMQTemplate; private static final String ORDER_TOPIC = "order-topic"; private static final String ORDER_CREATED_TAG = "order-created"; @Override @Transactional(rollbackFor = Exception.class) public Order createOrder(Order order) { // 1. 创建订单 order.setOrderNo(generateOrderNo()); order.setStatus(0); save(order); // 2. 发送订单创建消息 Map<String, Object> message = new HashMap<>(); message.put("orderId", order.getId()); message.put("orderNo", order.getOrderNo()); message.put("userId", order.getUserId()); message.put("totalAmount", order.getTotalAmount()); message.put("timestamp", System.currentTimeMillis()); rocketMQTemplate.convertAndSend( ORDER_TOPIC + ":" + ORDER_CREATED_TAG, message ); return order; } private String generateOrderNo() { return "ORD" + System.currentTimeMillis() + UUID.randomUUID().toString().substring(0, 8).toUpperCase(); } }
消费者实现
消费者代码
@RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer-group" ) @Component public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { // 处理消息 } }
消息类型
类型说明
消息类型:
- 普通消息
- 顺序消息
- 事务消息
- 延时消息
官方资源
根据 RocketMQ 官方文档 和 GitHub 仓库,RocketMQ 的核心特性包括:
-
多种消息类型:RocketMQ 支持普通消息、顺序消息、事务消息、延时消息、批量消息等多种消息类型。官方文档详细说明了每种消息类型的使用场景和实现原理,帮助开发者根据业务需求选择合适的消息类型。
-
高可靠和高性能:RocketMQ 采用分布式架构,支持主从复制、多副本机制,保证消息不丢失。官方文档显示,RocketMQ 单机可以支持百万级消息吞吐,延迟低至毫秒级别,适合高并发、大流量的业务场景。
-
丰富的功能特性:RocketMQ 提供了消息过滤、消息轨迹、消息查询、定时消息、消息重试等丰富的功能。官方文档中详细介绍了这些功能的使用方法和最佳实践。
参考资源:
- RocketMQ 官方文档:https://rocketmq.apache.org/
- RocketMQ GitHub:https://github.com/apache/rocketmq
本节小结
在本节中,我们学习了:
第一个是 RocketMQ 简介。 RocketMQ 是分布式消息中间件。
第二个是安装部署。 如何部署 RocketMQ。
第三个是生产者实现。 如何实现消息生产者。
第四个是消费者实现。 如何实现消息消费者。
第五个是消息类型。 RocketMQ 支持的消息类型。
这就是 RocketMQ 消息队列。使用 RocketMQ,可以实现异步消息处理。
在下一节,我们将学习 RocketMQ 事务消息。