asp 网站访问统计个人网站域名用什么好
2026/2/21 8:53:17 网站建设 项目流程
asp 网站访问统计,个人网站域名用什么好,wordpress不支持中文,专业的网站建设流程RocketMQ 作为阿里开源的分布式消息中间件#xff0c;凭借高吞吐量、低延迟、高可靠性等特性#xff0c;被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。Spring Boot 作为主流的微服务开发框架#xff0c;其自动配置机制能极大简化与第三方组件的整合过程。本文将…RocketMQ 作为阿里开源的分布式消息中间件凭借高吞吐量、低延迟、高可靠性等特性被广泛应用于分布式系统的异步通信、解耦、削峰填谷等场景。Spring Boot 作为主流的微服务开发框架其自动配置机制能极大简化与第三方组件的整合过程。本文将从实战角度出发详细讲解 Spring Boot 如何整合 RocketMQ覆盖普通消息发送、消息消费、消息重试等核心流程并提供完整的代码示例。一、环境准备1. 基础环境依赖JDK 8 及以上RocketMQ 对 JDK 版本有一定要求推荐 8/11Maven 3.6项目构建工具RocketMQ 4.9.7本文使用的稳定版本可根据需求选择最新版本Spring Boot 2.7.15与 RocketMQ Starter 适配的版本2. RocketMQ 服务部署首先需要搭建 RocketMQ 服务环境可选择本地单机部署或集群部署本文以本地单机为例从 RocketMQ 官网 下载对应版本的安装包解压到本地目录。启动 NameServer# 进入 RocketMQ 解压目录的 bin 文件夹 cd rocketmq-all-4.9.7-bin-release/bin # 启动 NameServerWindows 系统执行 mqnamesrv.cmd nohup sh mqnamesrv 启动 Broker# 启动 BrokerWindows 系统执行 mqbroker.cmd nohup sh mqbroker -n localhost:9876 autoCreateTopicEnabletrue 注autoCreateTopicEnabletrue表示自动创建主题方便测试生产环境建议提前手动创建主题。二、项目初始化与依赖配置1. 创建 Spring Boot 项目通过 Spring Initializrhttps://start.spring.io/创建一个 Spring Boot 项目选择基础依赖如 Spring Web也可手动创建 Maven 项目并配置 pom.xml。2. 引入 RocketMQ 依赖在 pom.xml 中添加 RocketMQ Spring Boot Starter 依赖注意版本适配本文使用 2.2.3 版本与 RocketMQ 4.9.7 适配?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version2.7.15/version relativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdspring-boot-rocketmq-demo/artifactId version0.0.1-SNAPSHOT/version namespring-boot-rocketmq-demo/name descriptionSpring Boot RocketMQ 整合示例/description properties java.version1.8/java.version rocketmq-spring-boot-starter.version2.2.3/rocketmq-spring-boot-starter.version /properties dependencies !-- Spring Web 依赖用于提供接口测试 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- RocketMQ Spring Boot Starter -- dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version${rocketmq-spring-boot-starter.version}/version /dependency !-- 测试依赖 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency /dependencies build plugins plugin groupIdorg.springframework.boot/groupId artifactIdspring-boot-maven-plugin/artifactId /plugin /plugins /build /project3. 配置 RocketMQ 连接信息在application.yml或 application.properties中配置 RocketMQ 的 NameServer 地址、生产者组等信息spring: application: name: spring-boot-rocketmq-demo # RocketMQ 配置 rocketmq: # NameServer 地址多个地址用分号分隔 name-server: localhost:9876 # 生产者配置 producer: # 生产者组名必须唯一 group: demo-producer-group # 发送消息的超时时间默认 3000ms send-message-timeout: 3000 # 消息体最大长度默认 4MB max-message-size: 4194304 # 压缩消息的阈值默认 4KB compress-message-body-threshold: 4096 # 重试次数默认 2 次 retry-times-when-send-failed: 2 # 异步发送失败时是否重试其他 Broker默认 false retry-next-server: false三、消息发送普通消息、同步 / 异步 / 单向发送RocketMQ 支持同步发送、异步发送、单向发送三种消息发送方式适用于不同的业务场景同步发送发送后等待 Broker 响应可靠性最高适用于重要消息如订单创建。异步发送发送后不阻塞通过回调函数处理响应适用于需要高吞吐量且允许少量延迟的场景。单向发送发送后不等待响应性能最高可靠性最低适用于日志收集等不重要消息。1. 封装消息发送工具类创建RocketMQProducerService类注入RocketMQTemplate由 RocketMQ Starter 提供的模板类简化消息发送实现三种发送方式package com.example.demo.service; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * RocketMQ 生产者服务 */ Service public class RocketMQProducerService { // 注入 RocketMQ 模板类 Resource private RocketMQTemplate rocketMQTemplate; /** * 同步发送消息 * param topic 主题名可携带标签格式topic:tag * param message 消息内容 * return 发送结果 */ public SendResult sendSyncMessage(String topic, String message) { // 构建消息可添加消息头如自定义 key MessageString msg MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, sync-key- System.currentTimeMillis()) .build(); // 同步发送 return rocketMQTemplate.syncSend(topic, msg); } /** * 异步发送消息 * param topic 主题名 * param message 消息内容 */ public void sendAsyncMessage(String topic, String message) { MessageString msg MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, async-key- System.currentTimeMillis()) .build(); // 异步发送通过 SendCallback 处理回调 rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { Override public void onSuccess(SendResult sendResult) { // 发送成功处理 System.out.println(异步发送消息成功 sendResult); } Override public void onException(Throwable e) { // 发送失败处理 System.err.println(异步发送消息失败 e.getMessage()); } }); } /** * 单向发送消息不关心发送结果 * param topic 主题名 * param message 消息内容 */ public void sendOneWayMessage(String topic, String message) { MessageString msg MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, oneway-key- System.currentTimeMillis()) .build(); // 单向发送 rocketMQTemplate.sendOneWay(topic, msg); } }2. 编写测试接口创建MessageSendController控制器提供 HTTP 接口测试消息发送package com.example.demo.controller; import org.apache.rocketmq.client.producer.SendResult; import com.example.demo.service.RocketMQProducerService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 消息发送测试控制器 */ RestController RequestMapping(/message) public class MessageSendController { Resource private RocketMQProducerService rocketMQProducerService; // 测试同步发送 GetMapping(/sync/send) public String sendSyncMessage(RequestParam String msg) { // 主题名demo-topic标签demo-tag标签用于消息过滤 String topic demo-topic:demo-tag; SendResult sendResult rocketMQProducerService.sendSyncMessage(topic, msg); return 同步发送消息成功 sendResult; } // 测试异步发送 GetMapping(/async/send) public String sendAsyncMessage(RequestParam String msg) { String topic demo-topic:demo-tag; rocketMQProducerService.sendAsyncMessage(topic, msg); return 异步发送消息请求已提交; } // 测试单向发送 GetMapping(/oneway/send) public String sendOneWayMessage(RequestParam String msg) { String topic demo-topic:demo-tag; rocketMQProducerService.sendOneWayMessage(topic, msg); return 单向发送消息完成; } }四、消息消费消费者配置与消息监听RocketMQ 的消费端通过消息监听器监听指定主题的消息Spring Boot 整合后可通过注解RocketMQMessageListener快速实现消费逻辑。1. 编写消息消费者创建RocketMQConsumerService类实现RocketMQListener接口通过注解配置消费者组、监听的主题和标签package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * RocketMQ 消费者服务 * 注解说明 * - consumerGroup消费者组名必须唯一 * - topic监听的主题名 * - selectorExpression标签表达式* 表示所有标签也可指定具体标签如 demo-tag * - messageModel消息模式CLUSTERING集群模式/BROADCASTING广播模式默认集群模式 * - consumeMode消费模式CONCURRENTLY并发消费/ORDERLY顺序消费默认并发消费 */ Component RocketMQMessageListener( consumerGroup demo-consumer-group, topic demo-topic, selectorExpression *, messageModel MessageModel.CLUSTERING, consumeMode ConsumeMode.CONCURRENTLY ) public class RocketMQConsumerService implements RocketMQListenerString { /** * 消息消费逻辑 * param message 消息内容 */ Override public void onMessage(String message) { System.out.println(接收到消息 message); // 模拟业务处理 // 注意消费端抛出异常会触发消息重试 // handleBusiness(message); } /** * 模拟业务处理 * param message 消息内容 */ private void handleBusiness(String message) { // 业务逻辑代码 } }2. 消费者核心配置说明消费者组consumerGroup必须唯一同一组的消费者共同消费主题的消息集群模式下。消息模式messageModelCLUSTERING集群模式同组消费者分摊消费消息一条消息仅被一个消费者消费默认模式。BROADCASTING广播模式同组消费者都会消费同一条消息适用于通知类消息如配置更新。消费模式consumeModeCONCURRENTLY并发消费多线程并发消费消费速度快默认模式。ORDERLY顺序消费单线程消费保证消息按顺序消费适用于有序消息场景如订单状态变更。标签表达式selectorExpression用于过滤消息支持*所有标签、tag1 || tag2多个标签等表达式。五、消息重试消费失败后的重试机制在实际业务中消息消费可能因网络异常、业务处理失败等原因失败RocketMQ 提供了消费重试机制确保消息被成功消费。1. 重试机制原理默认重试当消费者消费消息时抛出异常RocketMQ 会将消息重新放入队列等待重试消费默认重试次数为 16 次每次重试的间隔时间逐渐增加1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min。死信队列当消息重试 16 次后仍消费失败会被发送到死信队列DLQ死信队列的命名规则为%DLQ%消费者组名可通过消费死信队列的消息进行人工处理。2. 自定义重试配置与异常处理1配置消费重试次数在application.yml中添加消费者的重试配置也可通过注解属性配置# 消费者重试配置可在注解中覆盖 rocketmq: consumer: # 消费线程数 consume-thread-max: 20 # 批量消费的最大消息数 consume-message-batch-max-size: 1 # 消费超时时间 consume-timeout: 152手动控制重试返回消费状态上述示例中消费者实现的是RocketMQListener接口无法手动控制消费状态若需要自定义重试逻辑可实现RocketMQPushConsumerListener接口或使用MessageListenerConcurrently返回ConsumeConcurrentlyStatus枚举package com.example.demo.consumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; import java.util.List; /** * 自定义重试逻辑的消费者 */ Component RocketMQMessageListener( consumerGroup demo-consumer-group-retry, topic demo-topic, selectorExpression * ) public class RetryRocketMQConsumerService implements RocketMQListenerMessageExt, RocketMQPushConsumerLifecycleListener { Override public void onMessage(MessageExt messageExt) { String message new String(messageExt.getBody()); System.out.println(接收到消息带重试逻辑 message 重试次数 messageExt.getReconsumeTimes()); try { // 模拟业务处理失败 int a 1 / 0; // 业务处理成功无需重试 } catch (Exception e) { System.err.println(消息消费失败 e.getMessage()); // 若重试次数超过 3 次直接返回成功不再重试否则抛出异常触发重试 if (messageExt.getReconsumeTimes() 3) { System.out.println(消息重试次数已达上限不再重试); // 可将消息记录到数据库后续人工处理 return; } // 抛出异常触发重试 throw new RuntimeException(消费失败触发重试); } } Override public void prepareStart(org.apache.rocketmq.client.consumer.DefaultMQPushConsumer consumer) { // 可自定义消费者配置如设置重试次数 try { // 设置消费线程数 consumer.setConsumeThreadMax(20); // 设置消息监听器若需要更细粒度的控制 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - { for (MessageExt msg : msgs) { onMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } catch (MQClientException e) { throw new RuntimeException(e); } } }3. 死信队列处理当消息进入死信队列后可创建专门的消费者监听死信队列进行人工干预处理package com.example.demo.consumer; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 死信队列消费者 * 死信队列名称%DLQ% 原消费者组名 */ Component RocketMQMessageListener( consumerGroup dlq-consumer-group, topic %DLQ%demo-consumer-group, selectorExpression * ) public class DlqRocketMQConsumerService implements RocketMQListenerString { Override public void onMessage(String message) { System.out.println(接收到死信队列消息 message); // 人工处理逻辑如记录日志、通知运维、手动重试等 } }六、测试验证1. 启动项目运行 Spring Boot 项目的主类确保 RocketMQ NameServer 和 Broker 已启动。2. 发送消息测试通过 HTTP 接口发送消息例如同步发送http://localhost:8080/message/sync/send?msgHello RocketMQ Sync异步发送http://localhost:8080/message/async/send?msgHello RocketMQ Async单向发送http://localhost:8080/message/oneway/send?msgHello RocketMQ OneWay3. 验证消费与重试观察控制台输出可看到消费者成功接收消息若在消费端模拟业务异常如除以 0可看到消息重试的日志重试次数达到上限后进入死信队列。七、生产环境注意事项主题与消费者组规划提前手动创建主题关闭自动创建消费者组名需唯一且有明确的业务含义。消息重试配置根据业务场景调整重试次数和间隔避免无效重试占用资源。死信队列处理建立死信队列的监控和处理机制防止消息丢失。消息幂等性由于消息重试消费端需保证幂等性如通过消息 key 去重。监控与告警接入 RocketMQ 监控平台如 RocketMQ Dashboard监控消息发送 / 消费情况设置告警机制。集群部署生产环境中 RocketMQ 需采用集群部署保证高可用。八、总结本文详细介绍了 Spring Boot 整合 RocketMQ 的全流程包括环境搭建、依赖配置、消息发送同步 / 异步 / 单向、消息消费集群 / 广播、并发 / 顺序、消息重试与死信队列处理并提供了完整的代码示例。通过本文的实战内容你可以快速掌握 RocketMQ 在 Spring Boot 项目中的核心使用方式并根据实际业务场景进行扩展和优化。RocketMQ 的功能远不止于此后续还可深入学习顺序消息、事务消息、延迟消息等高级特性进一步满足复杂的业务需求。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询