SpringBoot整合RabbitMQ及生产全场景高级特性实战

网友投稿 334 2022-09-21


SpringBoot整合RabbitMQ及生产全场景高级特性实战

目录摘要整合依赖与配置生产者配置消息队列规则生产者发布消息消费者监听消息

摘要

整合场景含 topic 工作模式(通过 routingKey 可满足简单/工作队列/发布订阅/路由等四种工作模式)和 confirm(消息确认)、return(消息返回)、basicAck(消息签收)、basicNack(拒绝签收)、DLX(Dead Letter Exchange死信队列)实现延时/定时任务等。

整合

依赖与配置

以下内容消费者同生产者

org.springframework.boot

spring-boot-starter-parent

2.1.4.RELEASE

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

server.port=8090

spring.rabbitmq.host=192.168.168.10

spring.rabbitmq.port=5672

spring.rabbitmq.username=zheng123

spring.rabbitmq.password=zheng123

spring.rabbitmq.virtual-host=/zheng

spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.listener.direct.acknowledge-mode=manual

生产者配置消息队列规则

下边是两种配置方式,本次整合示例中使用第一个配置

@Configuration

public class TopicConfig {

// 声明队列

@Bean

public Queue topicQ1() {

return new Queue("topic_sb_mq_q1");

}

// 声明队列并绑定该队列到死信交换机(返回值有两种写法,任选一种都可以)

// 测试死信需要关闭原队列的监听

@Bean

public Queue topicQ2() {

return QueueBuilder.durable("topic_sb_mq_q2")

.withArgument("x-dead-letter-exchange", "topicExchange")

.withArgument("x-dead-letter-routing-key", "changsha.f")

.withArgument("x-message-ttl", 10000)

.build();

Map arguments = new HashMap<>(2);

arguments.put("x-dead-letter-exchange","topicExchange");

arguments.put("x-dead-letter-routing-key","changsha.f");

arguments.put("x-message-ttl",10000);

return new Queue("topic_sb_mq_q2",true,false,false,arguments);

}

//声明exchange

@Bean

public TopicExchange setTopicExchange() {

return new TopicExchange("topicExchange");

}

//声明binding,需要声明一个routingKey

@Bean

public Binding bindTopicHebei1() {

return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");

}

@Bean

public Binding bindTopicHebei2() {

return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");

}

}

@Configuration

public class RabbitMqConfig {

//定义交换机的名字

public static final String EXCHANGE_NAME = "boot_topic_exchange";

//定义队列的名字

public static final String QUEUE_NAME = "boot_queue";

//1、声明交换机

@Bean("bootExchange")

public Exchange bootExchange(){

return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();

}

//2、声明队列

@Bean("bootQueue")

public Queue bootQueue(){

return QueueBuilder.durable(QUEUE_NAME).build();

}

//3、队列与交换机进行绑定

@Bean

public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){

// topic模式兼容广播模式,路由模式。with("#")则类似广播模式匹配所有订阅者;with("boot.1")则类似路由模式匹配指定订阅者

return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();

}

}

生产者发布消息

@RestController

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping(value="/topicSend")

public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {

// 定义 confirm 回调

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

// confirmed

} else {

// nack-ed

}

});

rabbitTemplate.setMandatory(true);

rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{

// return message

});

if(null == routingKey) {

routingKey="changsha.kf";

}

MessageProperties messageProperties = new MessageProperties();

messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

//fanout模式只往exchange里发送消息。分发到exchange下的所有queue

rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));

return "message sended : routingKey >"+routingKey+";message > "+message;

}

}

消费者监听消息

@Component

public class ConcumerReceiver {

//topic 模式

//注意这个模式会有优先匹配原则。例如发送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.ITd

@RabbitListener(queues="topic_sb_mq_q1")

public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException {

// 消息id

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// message.getBody() todosomething

// 签收消息

channel.basicAck(deliveryTag, true);

} catch (Exception e) {

// 拒绝签收

// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端

channel.basicNack(deliveryTag, true, true);

}

}

@RabbitListener(queues="topic_sb_mq_q2")

public void topicReceiveq2(String message) {

System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message);

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:软件接口设计怎么做?前后端分离软件接口设计思路
下一篇:路由交换学习笔记(二十四)IPv6基础(ipv6路由协议)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~