2025/12/29 4:33:19
网站建设
项目流程
中山网站快照优化公司,软文范文大全,芜湖市建设银行支行网站,海城做网站SpringBoot 集成 Spring AMQP RabbitMQ 一文搞定多种工作模式#xff08;点对点、工作队列、发布订阅、路由、主题、RPC、死信队列#xff09;与各种自定义配置详解 文中代码见 Don212 一、快速开始
1、在 Spring Boot 项目中引入一个依赖
!-- Starter for using Spri…SpringBoot 集成 Spring AMQP RabbitMQ 一文搞定多种工作模式点对点、工作队列、发布订阅、路由、主题、RPC、死信队列与各种自定义配置详解文中代码见 Don212一、快速开始1、在 Spring Boot 项目中引入一个依赖!-- Starter for using Spring AMQP and Rabbit MQ --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency引入依赖以后Spring Boot 会自动创建RabbitMQ的连接工厂ConnectionFactory并创建RabbitTemplate对象并注册到Spring 容器中。2、在 Spring Boot 项目配置 RabbitMQ 信息spring:##### RabbitMQ 配置rabbitmq:host:127.0.0.1port:5672username:testpassword:test# 连接到代理时使用的虚拟主机virtual-host:/经过上述 2 步骤配置以后Spring Boot 项目就可以使用RabbitMQ了。3、实现发收消息创建消费者com.don.learn.springboot.rabbitmq.consumer.SimpleConsumer、com.don.learn.springboot.rabbitmq.consumer.SimpleConsumer2注解RabbitListener声明消息消费者、支持多种配置注解Queue定义队列的注解name / value 指定队列的名称durable 队列是否持久化服务重启后队列是否还存在默认持久化exclusive 是否为排他队列是否只能被一个连接使用只有声明该队列的连接才能使用它连接断开时会自动删除这个队列默认非排他autoDelete 当最后一个消费者断开连接后队列是否自动删除默认不自动删除ignoreDeclarationExceptions 是否忽略队列声明时的异常默认抛出异常arguments 设置队列的额外参数declare 是否在应用启动时声明队列默认自动声明队列admins 指定管理队列的 RabbitAdmin Bean 名称创建生产者com.don.learn.springboot.rabbitmq.producer.SimpleProducer使用RabbitTemplate给 RabbitMQ 发送消息注解RabbitListener既可以标注在方法上也可以标注在类上。标记在类上时方法上的RabbitHandler注解起作用。当接收到消息以后会调用RabbitHandler注解标注的方法具体哪个方法被调用需要根据消息类型与方法参数的类型进行匹配。4、生产者启动声明队列BeanpublicQueuequeue(){returnnewQueue(simple_queue,true,false,false);}5、自动配置org.springframework.boot.autoconfigure.amqp.RabbitAutoConfigurationorg.springframework.boot.autoconfigure.amqp.RabbitPropertiesConnectionFactory自动配置连接工厂RabbitProperties封装了RabbitMQ的配置RabbitTemplate给RabbitMQ发送和接受消息AmqpAdminRabbitMQ系统管理功能组件二、RabbitMQ 多种工作模式RabbitMQ 支持多种消息传递模式也称为交换器类型每种模式都有不同的路由逻辑和适用场景。以下是 RabbitMQ 的 6 种主要工作模式点对点模式简单模式结构特点一个生产者、一个队列、一个消费者路由规则直接发送到指定队列适用场景简单的点对点通信示例代码配置类RabbitMQSimpleConfig生产者SimpleMessageProducer消费者SimpleMessageConsumerAPISimpleMessageController工作队列模式结构特点一个生产者、一个队列、多个消费者竞争关系路由规则轮询默认或公平分发适用场景任务分发、负载均衡示例代码和简单模式配置一样只是多个消费者。一次性发送多个消息到队列后消费者会轮询接收该队列的消息。发布订阅模式Publish/Subscribe结构特点一个生产者、一个 Fanout 交换器、多个队列、多个消费者竞争关系路由规则广播到所有绑定的队列适用场景广播通知、日志收集、事件广播、监控告警广播机制一个事件被所有绑定的队列接收示例代码配置类RabbitMQFanoutConfig生产者FanoutMessageProducer消费者FanoutMessageConsumerAPIFanoutMessageController路由模式Routing结构特点一个生产者、一个 Direct 交换器、多个队列按路由键精确绑定、多个消费者竞争关系路由规则精确匹配路由键适用场景按消息类型分类处理、系统日志分级示例代码配置类RabbitMQDirectConfig生产者DirectMessageProducer消费者DirectMessageConsumerAPIDirectMessageController主题模式Topics结构特点一个生产者、一个 Topic 交换器、多个队列使用通配符模式匹配绑定、多个消费者竞争关系通配符路由规则*匹配一个单词用.分隔的段、#匹配零个或多个单词适用场景消息分类、多维度筛选、复杂事件路由示例代码配置类RabbitMQTopicConfig生产者TopicMessageProducer消费者TopicMessageConsumerAPITopicMessageControllerRPC 模式Remote Procedure CallRPC 是远程过程调用Remote Procedure Call的缩写形式简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。RabbitMQ 文档结构特点客户端发送请求到请求队列服务端处理并返回响应到回调队列适用场景远程调用、分布式服务调用RPC 的工作流程Client 先发送一条消息和普通的消息相比消息多了两个关键内容一个是 correlation_id表示这条消息的唯一 id一个是reply_to表示回复队列的名字Server 从消息发送队列获取消息并处理相应的业务逻辑处理完成后将处理结果发送到 reply_to 指定的回调队列中并回传correlation_idClient 从回调队列中读取消息从消息头中获取 correlation_id 并比较可确认是否一致消息内容为执行结果手写 RPC 回调与 Spring AMQP 内置 RPC 实现的代码示例配置类RabbitMQRpcConfigRPC 客户端RpcMessageClientRPC 服务端RpcMessageServerAPIRpcMessageController消息头路由 HeadersHeaders 类型的 Exchange 使用的较少是忽略 routingKey 的一种路由方式是使用 Headers 参数来匹配的Headers 参数是一个键值对。消费者在绑定时需要关联额外 Headers 键值对参数生产者在发消息时传入键值对两者匹配的话则对应的队列就可以收到消息。匹配有两种方式all和any在消费者端必须要用键值x-mactch定义。all代表定义的多个键值对都满足any则只要满足一个即可。示例代码配置类RabbitMQHeadersConfig生产者HeadersMessageProducer消费者HeadersMessageConsumerAPIHeadersMessageController死信队列模式Dead Letter QueueDLQ消息进入死信队列的 4 种核心场景消息被消费者拒绝Reject/Nack且不重新入队则消息会被直接丢弃进入死信队列。消息在队列中存活时间超过 TTLTime-To-Live。消息不会因处理时间超过TTL而进入死信队列TTL机制在消息投递后就已失效。队列达到最大长度限制队列已满新消息会进入死信队列 ❌ 需要额外配置备用交换器或手动处理才能路由到死信队列消息无法路由到任何队列消息发送到 Exchange 但没有匹配的队列时如果配置备用交换器AlternateExchange消息会通过备用交换器进入死信队列。备用交换器使用 FanoutExchange 或 TopicExchange#通配路由达到最大重试次数后企业级最常用通过 Spring AMQP 的重试机制和 MessageRecoverer 实现代码示例配置类RabbitMQDLXConfig生产者DLXMessageProducer消费者DLXMessageConsumerAPIDLXMessageController延迟队列模式延迟队列消息发送到队列以后延迟指定时间后消息被消费三、RabbitMQ 配置消息转换器在org.springframework.amqp.rabbit.core.RabbitTemplate中消息转换器privateMessageConvertermessageConverternewSimpleMessageConverter();默认使用org.springframework.amqp.support.converter.SimpleMessageConverter自定义消息转换器BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}消息监听启动类上开启EnableRabbit基于注解的RabbitMQ模式启动时会自动创建队列、交换器并绑定队列到交换器。使用RabbitListener监听队列并指定队列名称、交换器名称、路由键名称。四、AmqpAdmin 管理组件AmqpAdmin 是一个管理组件用于创建、修改、删除交换器、队列、绑定规则。TestpublicvoidtestAmqpAdmin(){System.out.println(创建交换器 DirectExchange);amqpAdmin.declareExchange(newDirectExchange(amqpadmin.exchange));System.out.println(创建队列 Queue);amqpAdmin.declareQueue(newQueue(amqpadmin.queue,true));System.out.println(创建绑定规则 Binding);amqpAdmin.declareBinding(newBinding(amqpadmin.queue,Binding.DestinationType.QUEUE,amqpadmin.exchange,amqp.test,null));}五、参考文档文档资料 RabbitMQ文档资料 Spring AMQP文档资料 Spring Boot AMQP